添加消息同步处理逻辑,添加Linux信号量的实现

This commit is contained in:
2026-01-06 15:40:03 +08:00
parent 83b013c678
commit 49db80b5fe
4 changed files with 246 additions and 59 deletions

View File

@@ -46,7 +46,7 @@ typedef struct {
/* 静态发布列表 */
typedef struct {
unsigned char pub_module[MODULE_MAX - 1]; // 发布表
#ifdef MSG_OPT_USE_MUTEX
#ifdef MSG_OPT_MUTEX_SEM
msg_mutex_t buffer_mutex; // 消息互斥量
#endif
unsigned char pub_num;
@@ -64,8 +64,9 @@ typedef struct {
const unsigned char sub_module_cnt; // 订阅数量
const unsigned char sub_module[MODULE_MAX - 1]; // 订阅表
#endif
#ifdef MSG_OPT_USE_MUTEX
#ifdef MSG_OPT_MUTEX_SEM
msg_mutex_t msg_mutex; // 消息互斥量
msg_sem_t sem;
#endif
struct list_head msg_list; // 消息链表
} has_static_sub_list_t;
@@ -145,6 +146,9 @@ int has_msg_init()
}
msg_mutex_init(&g_sub_list[i].msg_mutex); // 初始化该模块互斥量
INIT_LIST_HEAD(&g_sub_list[i].msg_list); // 初始化订阅链表
#ifdef MSG_OPT_MUTEX_SEM
msg_sem_init(&g_sub_list[i].sem); // 初始化同步量
#endif
}
#ifdef MSG_OPT_DEBUG
// msg_printf("show index:\n");
@@ -234,6 +238,9 @@ int has_msg_init_module(has_module_ID_e module_id)
}
msg_mutex_init(&g_sub_list[i].msg_mutex); // 初始化该模块互斥量
INIT_LIST_HEAD(&g_sub_list[i].msg_list); // 初始化订阅链表
#ifdef MSG_OPT_MUTEX_SEM
msg_sem_init(&g_sub_list[i].sem); // 初始化同步量
#endif
break;
}
}
@@ -259,6 +266,27 @@ int has_msg_os_init()
}
}
/* TODO:更合理的方式是每个消息一个互斥量但是可能会导致每个消息malloc和free的时候频繁init和deinit */
/* 检查计数判断该消息是否被所有订阅模块处理完,处理完就释放内存 */
static inline void msg_check_and_free_message(has_msg_node_t *node)
{
has_msg_node_t *temp = NULL;
unsigned char id = node->msg_buff->module_id;
#ifdef MSG_OPT_MUTEX_SEM
msg_mutex_t *target_mutex = &g_pub_list[id].buffer_mutex;
#endif
msg_mutex_lock(target_mutex);
node->msg_buff->node_cnt --;
if (node->msg_buff->node_cnt == 0) {
temp = ((has_msg_node_t *)node->msg_buff - g_pub_list[id].pub_num);
}
msg_mutex_unlock(target_mutex);
if (temp) {
msg_debug("free node addr:0x%p\n", temp);
msg_free(temp);
}
}
/**
* @brief 发布一个消息
@@ -306,41 +334,31 @@ int has_msg_publish(has_module_ID_e module_id, void *buffer, unsigned int length
node[i].msg_buff = msg_buffer;
msg_mutex_lock(&g_sub_list[sub_index].msg_mutex);
list_add_tail(&node[i].list, &g_sub_list[sub_index].msg_list); // 加入消息链表
#ifdef MSG_OPT_MUTEX_SEM
/* 通知等待线程唤醒 */
if (msg_sem_notify(&g_sub_list[sub_index].sem) == -1) {
list_del(&node[i].list); // 添加失败,删除链表
msg_mutex_unlock(&g_sub_list[sub_index].msg_mutex);
msg_check_and_free_message(&node[i]);
msg_printf("warning:Failed to add msg on module:%d !!!\n"
, g_sub_list[sub_index].module_id);
continue;
}
#endif
msg_mutex_unlock(&g_sub_list[sub_index].msg_mutex);
}
return 0;
}
/* TODO:更合理的方式是每个消息一个互斥量但是可能会导致每个消息malloc和free的时候频繁init和deinit */
/* 检查计数判断该消息是否被所有订阅模块处理完,处理完就释放内存 */
static inline void msg_check_and_free_message(has_msg_node_t *node)
{
has_msg_node_t *temp = NULL;
unsigned char id = node->msg_buff->module_id;
#ifdef MSG_OPT_USE_MUTEX
msg_mutex_t *target_mutex = &g_pub_list[id].buffer_mutex;
#endif
msg_mutex_lock(target_mutex);
node->msg_buff->node_cnt --;
if (node->msg_buff->node_cnt == 0) {
temp = ((has_msg_node_t *)node->msg_buff - g_pub_list[id].pub_num);
}
msg_mutex_unlock(target_mutex);
if (temp) {
msg_debug("free node addr:0x%p\n", temp);
msg_free(temp);
}
}
/**
* @brief 处理一个消息,先入先出
* @brief 处理一个消息,先入先出(TODO:互斥)禁止在多个线程同时调用同一id的handle
*
* @param module_id:处理消息的模块id号
* @param cb:消息处理回调
* @param ms_timeout:超时时间ms其中0消息直接返回-1一直阻塞
* @return 0:成功 -1失败
*/
int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb)
int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb, int ms_timeout)
{
has_msg_node_t *node;
has_static_sub_list_t *sub_list;
@@ -348,6 +366,12 @@ int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb)
sub_list = &g_sub_list[g_sub_list_index[module_id]];
#ifdef MSG_OPT_MUTEX_SEM
if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) {
return -1; // 没有消息需要处理或发生错误
}
#endif
/* 处理一个消息,先入先出 */
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
@@ -392,6 +416,13 @@ int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb,
if (!list_empty(&sub_list->msg_list)) {
list_for_each_entry_safe(node, temp, &sub_list->msg_list, list) {
if (node->msg_buff->module_id == pub_module_id) {
#ifdef MSG_OPT_MUTEX_SEM
if (msg_sem_wait(&sub_list->sem, 0) != WAIT_MSG_COME) {
msg_mutex_unlock(&sub_list->msg_mutex);
msg_printf("error: list number != sem number\n");
return -1; // eventfd和list不一致理论上一定是 WAIT_MSG_COME
}
#endif
list_del(&node->list);
break;
}
@@ -419,7 +450,7 @@ int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb,
* @param cb:消息处理回调
* @return 0:成功 -1失败
*/
int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb)
int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb, int ms_timeout)
{
has_msg_node_t *node;
has_static_sub_list_t *sub_list;
@@ -427,6 +458,12 @@ int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb)
sub_list = &g_sub_list[g_sub_list_index[module_id]];
#ifdef MSG_OPT_MUTEX_SEM
if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) {
return -1; // 没有消息需要处理或发生错误
}
#endif
/* 处理最新的消息 */
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
@@ -451,7 +488,7 @@ int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb)
/* 将消息复制出接口处理,不建议使用 */
#if 1
unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_module_id,
unsigned char *buf_out)
unsigned char *buf_out, int ms_timeout)
{
has_msg_node_t *node;
has_static_sub_list_t *sub_list;
@@ -464,6 +501,12 @@ unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_modul
sub_list = &g_sub_list[g_sub_list_index[module_id]];
#ifdef MSG_OPT_MUTEX_SEM
if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) {
return 0; // 没有消息需要处理或发生错误
}
#endif
/* 处理一个消息,先入先出 */
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
@@ -486,7 +529,7 @@ unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_modul
}
unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pub_module_id,
unsigned char *buf_out)
unsigned char *buf_out, int ms_timeout)
{
has_msg_node_t *node;
has_static_sub_list_t *sub_list;
@@ -499,6 +542,12 @@ unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pu
sub_list = &g_sub_list[g_sub_list_index[module_id]];
#ifdef MSG_OPT_MUTEX_SEM
if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) {
return 0; // 没有消息需要处理或发生错误
}
#endif
/* 处理最新的消息 */
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
@@ -529,9 +578,7 @@ unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pu
*/
unsigned int has_msg_is_message_empty(has_module_ID_e module_id)
{
has_msg_node_t *node, *temp;
has_static_sub_list_t *sub_list;
unsigned int num = 0;
MSG_CHECK_ID_AND_INDEX(module_id, -1);
sub_list = &g_sub_list[g_sub_list_index[module_id]];
@@ -595,9 +642,8 @@ int has_msg_delete_all_message(has_module_ID_e module_id)
has_msg_node_t *node, *temp;
has_static_sub_list_t *sub_list;
struct list_head free_list;
has_msg_node_t *free_temp;
unsigned char id;
INIT_LIST_HEAD(&free_list);
int ret = 0;
MSG_CHECK_ID_AND_INDEX(module_id, -1);
sub_list = &g_sub_list[g_sub_list_index[module_id]];
@@ -605,6 +651,13 @@ int has_msg_delete_all_message(has_module_ID_e module_id)
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
list_for_each_entry_safe(node, temp, &sub_list->msg_list, list) {
#ifdef MSG_OPT_MUTEX_SEM
if (msg_sem_wait(&sub_list->sem, 0) != WAIT_MSG_COME) {
msg_printf("error: list number != sem number\n");
ret = -1;
continue; // eventfd和list不一致理论上一定是 WAIT_MSG_COME
}
#endif
list_del(&node->list);
list_add_tail(&node->list, &free_list); // 加入临时链表
}
@@ -613,21 +666,9 @@ int has_msg_delete_all_message(has_module_ID_e module_id)
msg_mutex_unlock(&sub_list->msg_mutex);
/* 释放内存 */
list_for_each_entry_safe(node, temp, &free_list, list) {
/* msg_check_and_free_message(node); */
free_temp = NULL;
id = node->msg_buff->module_id;
msg_mutex_lock(&g_pub_list[id].buffer_mutex);
node->msg_buff->node_cnt --;
if (node->msg_buff->node_cnt == 0) {
free_temp = ((has_msg_node_t *)node->msg_buff - g_pub_list[id].pub_num);
}
msg_mutex_unlock(&g_pub_list[id].buffer_mutex);
if (free_temp) {
msg_debug("id:%d free addr:0x%p\n", module_id, free_temp);
msg_free(free_temp);
}
msg_check_and_free_message(node);
}
return 0;
return ret;
}
#ifdef MSG_OPT_DYNAMIC_SUB