sdk-hwV1.3/external/fast-user-adapter/rt_media/api_adapter/aw_message_queue.c

298 lines
6.1 KiB
C
Raw Normal View History

2024-05-07 10:09:20 +00:00
/*
* Copyright (c) 2008-2016 Allwinner Technology Co. Ltd.
* All rights reserved.
*
* File : messageQueue.c
* Description : message queue
* History :
*
*/
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <malloc.h>
#include <semaphore.h>
#include <string.h>
#include <time.h>
#include "aw_util.h"
#include "aw_message_queue.h"
//* should include other '*.h' before CdcMalloc.h
typedef struct MessageNode MessageNode;
struct MessageNode
{
MessageNode* next;
int valid;
XmMessage *msg;
};
typedef struct MessageQueueContext
{
char* pName;
MessageNode* pHead;
int nCount;
MessageNode* Nodes;
int nMaxMessageNum;
size_t nMessageSize;
pthread_mutex_t mutex;
sem_t sem;
}MessageQueueContext;
XmMessageQueue* aw_message_queue_create(int nMaxMessageNum, const char* pName)
{
MessageQueueContext* mqCtx;
size_t nMessageSize = sizeof(XmMessage);
aw_logv("nMessageSize = %d",(int)nMessageSize);
mqCtx = (MessageQueueContext*)malloc(sizeof(MessageQueueContext));
if(mqCtx == NULL)
{
aw_loge("%s, allocate memory fail.", pName);
return NULL;
}
memset(mqCtx, 0, sizeof(MessageQueueContext));
if(pName != NULL)
mqCtx->pName = strdup(pName);
mqCtx->Nodes = (MessageNode*)calloc(nMaxMessageNum, sizeof(MessageNode));
if(mqCtx->Nodes == NULL)
{
aw_loge("%s, allocate memory for message nodes fail.", mqCtx->pName);
if(mqCtx->pName != NULL)
free(mqCtx->pName);
free(mqCtx);
return NULL;
}
int i;
for (i = 0; i < nMaxMessageNum; i++)
{
mqCtx->Nodes[i].msg = calloc(1, nMessageSize);
if (mqCtx->Nodes[i].msg == NULL)
{
int j;
for (j = 0; j < i; j++)
free(mqCtx->Nodes[j].msg);
free(mqCtx->pName);
free(mqCtx->Nodes);
free(mqCtx);
return NULL;
}
}
mqCtx->nMaxMessageNum = nMaxMessageNum;
mqCtx->nMessageSize = nMessageSize;
pthread_mutex_init(&mqCtx->mutex, NULL);
sem_init(&mqCtx->sem, 0, 0);
return (XmMessageQueue*)mqCtx;
}
void aw_message_queue_destroy(XmMessageQueue* mq)
{
MessageQueueContext* mqCtx;
mqCtx = (MessageQueueContext*)mq;
int i;
for (i = 0; i < mqCtx->nMaxMessageNum; i++)
free(mqCtx->Nodes[i].msg);
if(mqCtx->Nodes != NULL)
{
free(mqCtx->Nodes);
}
pthread_mutex_destroy(&mqCtx->mutex);
sem_destroy(&mqCtx->sem);
if(mqCtx->pName != NULL)
free(mqCtx->pName);
free(mqCtx);
return;
}
int aw_message_queue_postMessage(XmMessageQueue* mq, XmMessage* m)
{
MessageQueueContext* mqCtx;
MessageNode* node;
MessageNode* ptr;
int i;
mqCtx = (MessageQueueContext*)mq;
pthread_mutex_lock(&mqCtx->mutex);
if(mqCtx->nCount >= mqCtx->nMaxMessageNum)
{
aw_loge("%s, message count exceed, current message count = %d, max message count = %d",
mqCtx->pName, mqCtx->nCount, mqCtx->nMaxMessageNum);
pthread_mutex_unlock(&mqCtx->mutex);
return -1;
}
node = NULL;
ptr = mqCtx->Nodes;
for(i=0; i<mqCtx->nMaxMessageNum; i++, ptr++)
{
if(ptr->valid == 0)
{
node = ptr;
break;
}
}
if (NULL == node)
{
pthread_mutex_unlock(&mqCtx->mutex);
aw_loge("node is NULL!");
return -1;
}
memcpy(node->msg, m, mqCtx->nMessageSize);
node->valid = 1;
node->next = NULL;
ptr = mqCtx->pHead;
if(ptr == NULL)
mqCtx->pHead = node;
else
{
while(ptr->next != NULL)
ptr = ptr->next;
ptr->next = node;
}
mqCtx->nCount++;
pthread_mutex_unlock(&mqCtx->mutex);
sem_post(&mqCtx->sem);
return 0;
}
int aw_message_queue_waitMessage(XmMessageQueue* mq, int64_t timeout)
{
if (aw_rt_SemTimedWait(&mq->sem, timeout) < 0)
return -1;
sem_post(&mq->sem);
return mq->nCount;
}
int aw_message_queue_getMessage(XmMessageQueue* mq, XmMessage* m)
{
return aw_message_queue_tryGetMessage(mq, m, -1);
}
int aw_message_queue_tryGetMessage(XmMessageQueue* mq, XmMessage* m, int64_t timeout_ms)
{
MessageQueueContext* mqCtx;
MessageNode* node;
mqCtx = (MessageQueueContext*)mq;
if(timeout_ms != 0)
{
if(aw_rt_SemTimedWait(&mqCtx->sem, timeout_ms) < 0)
{
return -1;
}
}
pthread_mutex_lock(&mqCtx->mutex);
if(mqCtx->nCount <= 0)
{
aw_logv("%s, no message.", mqCtx->pName);
pthread_mutex_unlock(&mqCtx->mutex);
return -1;
}
node = mqCtx->pHead;
mqCtx->pHead = node->next;
memcpy(m, node->msg, mqCtx->nMessageSize);
node->valid = 0;
mqCtx->nCount--;
pthread_mutex_unlock(&mqCtx->mutex);
return 0;
}
int aw_message_queue_flush(XmMessageQueue* mq)
{
MessageQueueContext* mqCtx;
int i;
mqCtx = (MessageQueueContext*)mq;
aw_logv("%s, flush messages.", mqCtx->pName);
pthread_mutex_lock(&mqCtx->mutex);
mqCtx->pHead = NULL;
mqCtx->nCount = 0;
for(i=0; i<mqCtx->nMaxMessageNum; i++)
{
mqCtx->Nodes[i].valid = 0;
}
do
{
if(sem_getvalue(&mqCtx->sem, &i) != 0 || i == 0)
break;
sem_trywait(&mqCtx->sem);
} while(1);
pthread_mutex_unlock(&mqCtx->mutex);
return 0;
}
int aw_message_queue_getCount(XmMessageQueue* mq)
{
MessageQueueContext* mqCtx;
mqCtx = (MessageQueueContext*)mq;
return mqCtx->nCount;
}
int aw_rt_SemTimedWait(sem_t* sem, int64_t time_ms)
{
int err;
if(time_ms == -1)
{
err = sem_wait(sem);
}
else
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_nsec += time_ms % 1000 * 1000 * 1000;
ts.tv_sec += time_ms / 1000 + ts.tv_nsec / (1000 * 1000 * 1000);
ts.tv_nsec = ts.tv_nsec % (1000*1000*1000);
err = sem_timedwait(sem, &ts);
}
return err;
}