diff --git a/mw/has_task_msg_manager/has_task_msg.h b/mw/has_task_msg_manager/has_task_msg.h index c974b14..428c8de 100644 --- a/mw/has_task_msg_manager/has_task_msg.h +++ b/mw/has_task_msg_manager/has_task_msg.h @@ -40,13 +40,18 @@ typedef enum { */ typedef int (*has_msg_handle_cb)(unsigned char module_id, const unsigned char *buf, unsigned int len); + +/* -------------------- 注意 Attention -------------------- */ +/* 对于同一个id,不能同时在不同线程中调用 *_handle*, *_receive* 和 *delete_all* 的接口 */ +/* For the same id, the *_handle*, *_receive* and *delete_all* API cannot be called in different threads at the same time */ + /* 基础功能 */ int has_msg_init(void); +int has_msg_os_init(void); + int has_msg_init_module(has_module_ID_e module_id); // HAS OS调用 int has_msg_publish(has_module_ID_e module_id, void *buffer, unsigned int length); -int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb); - -int has_msg_os_init(void); +int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb, int ms_timeout); unsigned int has_msg_is_message_empty(has_module_ID_e module_id); unsigned int has_msg_get_message_number(has_module_ID_e module_id); @@ -54,9 +59,9 @@ int has_msg_delete_all_message(has_module_ID_e module_id); /* 扩展功能 */ int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb, has_module_ID_e pub_module_id); -int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb); -unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out); -unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out); +int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb, int ms_timeout); +unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out, int ms_timeout); +unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out, int ms_timeout); int has_msg_printf_subscribe(has_module_ID_e module_id); // int has_msg_subscribe(has_module_ID_e module_id, has_module_ID_e sub_id); // int has_msg_unsubscribe(has_module_ID_e module_id, has_module_ID_e sub_id); diff --git a/mw/has_task_msg_manager/has_task_msg_mamanger.c b/mw/has_task_msg_manager/has_task_msg_mamanger.c index e8fe096..97a1b37 100644 --- a/mw/has_task_msg_manager/has_task_msg_mamanger.c +++ b/mw/has_task_msg_manager/has_task_msg_mamanger.c @@ -46,7 +46,7 @@ typedef struct { /* 静态发布列表 */ typedef struct { unsigned char pub_module[MODULE_MAX - 1]; // 发布表 -#ifdef MSG_OPT_USE_MUTEX +#ifdef MSG_OPT_MUTEX_SEM msg_mutex_t buffer_mutex; // 消息互斥量 #endif unsigned char pub_num; @@ -64,8 +64,9 @@ typedef struct { const unsigned char sub_module_cnt; // 订阅数量 const unsigned char sub_module[MODULE_MAX - 1]; // 订阅表 #endif -#ifdef MSG_OPT_USE_MUTEX +#ifdef MSG_OPT_MUTEX_SEM msg_mutex_t msg_mutex; // 消息互斥量 + msg_sem_t sem; #endif struct list_head msg_list; // 消息链表 } has_static_sub_list_t; @@ -145,6 +146,9 @@ int has_msg_init() } msg_mutex_init(&g_sub_list[i].msg_mutex); // 初始化该模块互斥量 INIT_LIST_HEAD(&g_sub_list[i].msg_list); // 初始化订阅链表 +#ifdef MSG_OPT_MUTEX_SEM + msg_sem_init(&g_sub_list[i].sem); // 初始化同步量 +#endif } #ifdef MSG_OPT_DEBUG // msg_printf("show index:\n"); @@ -234,6 +238,9 @@ int has_msg_init_module(has_module_ID_e module_id) } msg_mutex_init(&g_sub_list[i].msg_mutex); // 初始化该模块互斥量 INIT_LIST_HEAD(&g_sub_list[i].msg_list); // 初始化订阅链表 +#ifdef MSG_OPT_MUTEX_SEM + msg_sem_init(&g_sub_list[i].sem); // 初始化同步量 +#endif break; } } @@ -259,6 +266,27 @@ int has_msg_os_init() } } +/* TODO:更合理的方式是每个消息一个互斥量,但是可能会导致每个消息malloc和free的时候频繁init和deinit */ + +/* 检查计数判断该消息是否被所有订阅模块处理完,处理完就释放内存 */ +static inline void msg_check_and_free_message(has_msg_node_t *node) +{ + has_msg_node_t *temp = NULL; + unsigned char id = node->msg_buff->module_id; +#ifdef MSG_OPT_MUTEX_SEM + msg_mutex_t *target_mutex = &g_pub_list[id].buffer_mutex; +#endif + msg_mutex_lock(target_mutex); + node->msg_buff->node_cnt --; + if (node->msg_buff->node_cnt == 0) { + temp = ((has_msg_node_t *)node->msg_buff - g_pub_list[id].pub_num); + } + msg_mutex_unlock(target_mutex); + if (temp) { + msg_debug("free node addr:0x%p\n", temp); + msg_free(temp); + } +} /** * @brief 发布一个消息 @@ -306,41 +334,31 @@ int has_msg_publish(has_module_ID_e module_id, void *buffer, unsigned int length node[i].msg_buff = msg_buffer; msg_mutex_lock(&g_sub_list[sub_index].msg_mutex); list_add_tail(&node[i].list, &g_sub_list[sub_index].msg_list); // 加入消息链表 +#ifdef MSG_OPT_MUTEX_SEM + /* 通知等待线程唤醒 */ + if (msg_sem_notify(&g_sub_list[sub_index].sem) == -1) { + list_del(&node[i].list); // 添加失败,删除链表 + msg_mutex_unlock(&g_sub_list[sub_index].msg_mutex); + msg_check_and_free_message(&node[i]); + msg_printf("warning:Failed to add msg on module:%d !!!\n" + , g_sub_list[sub_index].module_id); + continue; + } +#endif msg_mutex_unlock(&g_sub_list[sub_index].msg_mutex); } return 0; } -/* TODO:更合理的方式是每个消息一个互斥量,但是可能会导致每个消息malloc和free的时候频繁init和deinit */ - -/* 检查计数判断该消息是否被所有订阅模块处理完,处理完就释放内存 */ -static inline void msg_check_and_free_message(has_msg_node_t *node) -{ - has_msg_node_t *temp = NULL; - unsigned char id = node->msg_buff->module_id; -#ifdef MSG_OPT_USE_MUTEX - msg_mutex_t *target_mutex = &g_pub_list[id].buffer_mutex; -#endif - msg_mutex_lock(target_mutex); - node->msg_buff->node_cnt --; - if (node->msg_buff->node_cnt == 0) { - temp = ((has_msg_node_t *)node->msg_buff - g_pub_list[id].pub_num); - } - msg_mutex_unlock(target_mutex); - if (temp) { - msg_debug("free node addr:0x%p\n", temp); - msg_free(temp); - } -} - /** - * @brief 处理一个消息,先入先出 + * @brief 处理一个消息,先入先出,(TODO:互斥)禁止在多个线程同时调用同一id的handle * * @param module_id:处理消息的模块id号 * @param cb:消息处理回调 + * @param ms_timeout:超时时间(ms)其中0:消息直接返回;-1:一直阻塞 * @return 0:成功 -1:失败 */ -int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb) +int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb, int ms_timeout) { has_msg_node_t *node; has_static_sub_list_t *sub_list; @@ -348,6 +366,12 @@ int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb) sub_list = &g_sub_list[g_sub_list_index[module_id]]; +#ifdef MSG_OPT_MUTEX_SEM + if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) { + return -1; // 没有消息需要处理或发生错误 + } +#endif + /* 处理一个消息,先入先出 */ msg_mutex_lock(&sub_list->msg_mutex); if (!list_empty(&sub_list->msg_list)) { @@ -392,6 +416,13 @@ int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb, if (!list_empty(&sub_list->msg_list)) { list_for_each_entry_safe(node, temp, &sub_list->msg_list, list) { if (node->msg_buff->module_id == pub_module_id) { +#ifdef MSG_OPT_MUTEX_SEM + if (msg_sem_wait(&sub_list->sem, 0) != WAIT_MSG_COME) { + msg_mutex_unlock(&sub_list->msg_mutex); + msg_printf("error: list number != sem number\n"); + return -1; // eventfd和list不一致,理论上一定是 WAIT_MSG_COME + } +#endif list_del(&node->list); break; } @@ -419,7 +450,7 @@ int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb, * @param cb:消息处理回调 * @return 0:成功 -1:失败 */ -int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb) +int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb, int ms_timeout) { has_msg_node_t *node; has_static_sub_list_t *sub_list; @@ -427,6 +458,12 @@ int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb) sub_list = &g_sub_list[g_sub_list_index[module_id]]; +#ifdef MSG_OPT_MUTEX_SEM + if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) { + return -1; // 没有消息需要处理或发生错误 + } +#endif + /* 处理最新的消息 */ msg_mutex_lock(&sub_list->msg_mutex); if (!list_empty(&sub_list->msg_list)) { @@ -451,7 +488,7 @@ int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb) /* 将消息复制出接口处理,不建议使用 */ #if 1 unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_module_id, - unsigned char *buf_out) + unsigned char *buf_out, int ms_timeout) { has_msg_node_t *node; has_static_sub_list_t *sub_list; @@ -464,6 +501,12 @@ unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_modul sub_list = &g_sub_list[g_sub_list_index[module_id]]; +#ifdef MSG_OPT_MUTEX_SEM + if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) { + return 0; // 没有消息需要处理或发生错误 + } +#endif + /* 处理一个消息,先入先出 */ msg_mutex_lock(&sub_list->msg_mutex); if (!list_empty(&sub_list->msg_list)) { @@ -486,7 +529,7 @@ unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_modul } unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pub_module_id, - unsigned char *buf_out) + unsigned char *buf_out, int ms_timeout) { has_msg_node_t *node; has_static_sub_list_t *sub_list; @@ -499,6 +542,12 @@ unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pu sub_list = &g_sub_list[g_sub_list_index[module_id]]; +#ifdef MSG_OPT_MUTEX_SEM + if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) { + return 0; // 没有消息需要处理或发生错误 + } +#endif + /* 处理最新的消息 */ msg_mutex_lock(&sub_list->msg_mutex); if (!list_empty(&sub_list->msg_list)) { @@ -529,9 +578,7 @@ unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pu */ unsigned int has_msg_is_message_empty(has_module_ID_e module_id) { - has_msg_node_t *node, *temp; has_static_sub_list_t *sub_list; - unsigned int num = 0; MSG_CHECK_ID_AND_INDEX(module_id, -1); sub_list = &g_sub_list[g_sub_list_index[module_id]]; @@ -595,9 +642,8 @@ int has_msg_delete_all_message(has_module_ID_e module_id) has_msg_node_t *node, *temp; has_static_sub_list_t *sub_list; struct list_head free_list; - has_msg_node_t *free_temp; - unsigned char id; INIT_LIST_HEAD(&free_list); + int ret = 0; MSG_CHECK_ID_AND_INDEX(module_id, -1); sub_list = &g_sub_list[g_sub_list_index[module_id]]; @@ -605,6 +651,13 @@ int has_msg_delete_all_message(has_module_ID_e module_id) msg_mutex_lock(&sub_list->msg_mutex); if (!list_empty(&sub_list->msg_list)) { list_for_each_entry_safe(node, temp, &sub_list->msg_list, list) { +#ifdef MSG_OPT_MUTEX_SEM + if (msg_sem_wait(&sub_list->sem, 0) != WAIT_MSG_COME) { + msg_printf("error: list number != sem number\n"); + ret = -1; + continue; // eventfd和list不一致,理论上一定是 WAIT_MSG_COME + } +#endif list_del(&node->list); list_add_tail(&node->list, &free_list); // 加入临时链表 } @@ -613,21 +666,9 @@ int has_msg_delete_all_message(has_module_ID_e module_id) msg_mutex_unlock(&sub_list->msg_mutex); /* 释放内存 */ list_for_each_entry_safe(node, temp, &free_list, list) { - /* msg_check_and_free_message(node); */ - free_temp = NULL; - id = node->msg_buff->module_id; - msg_mutex_lock(&g_pub_list[id].buffer_mutex); - node->msg_buff->node_cnt --; - if (node->msg_buff->node_cnt == 0) { - free_temp = ((has_msg_node_t *)node->msg_buff - g_pub_list[id].pub_num); - } - msg_mutex_unlock(&g_pub_list[id].buffer_mutex); - if (free_temp) { - msg_debug("id:%d free addr:0x%p\n", module_id, free_temp); - msg_free(free_temp); - } + msg_check_and_free_message(node); } - return 0; + return ret; } #ifdef MSG_OPT_DYNAMIC_SUB diff --git a/mw/has_task_msg_manager/has_task_msg_os_port.c b/mw/has_task_msg_manager/has_task_msg_os_port.c new file mode 100644 index 0000000..6a25b6a --- /dev/null +++ b/mw/has_task_msg_manager/has_task_msg_os_port.c @@ -0,0 +1,102 @@ +/* + * has_task_msg_manager/has_task_os_port.c + * + * Author: zhangzhaopeng + * Version: 1.0 + * Created on : 2026-01-05 + * 系统适配层 + */ +#include +#include "has_task_msg_os_port.h" + +#ifdef MSG_OPT_MUTEX_SEM +/** + * @brief 信号量初始化 + * + * @param sem:信号量 + * @return 0:成功 -1:失败 + */ +int msg_sem_init(msg_sem_t *sem) +{ +#if (MSG_OPT_OS == MSG_OPT_OS_LINUX) + sem->poll_fd.fd = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK); + if (sem->poll_fd.fd == -1) { + printf("create eventfd error!\n"); + return -1; + } + sem->poll_fd.events = POLLIN; // 监控可读事件 + sem->poll_fd.revents = 0; + return 0; +#elif (MSG_OPT_OS == MSG_OPT_OS_freeRTOS) +#elif (MSG_OPT_OS == MSG_OPT_OS_RT_THREAD) +#elif (MSG_OPT_OS == MSG_OPT_OS_NONE_OR_OSAL) +#elif (MSG_OPT_OS == MSG_OPT_OS_USER_DEFINED) +#endif +} + +/** + * @brief 消息通知 + * + * @param sem:信号量 + * @return 0:成功 -1:失败 + */ +int msg_sem_notify(msg_sem_t *sem) +{ +#if (MSG_OPT_OS == MSG_OPT_OS_LINUX) + uint64_t val = 1; + if (write(sem->poll_fd.fd, &val, sizeof(val)) != sizeof(val)) { + printf("notify thread err!\n"); + return -1; + } + return 0; +#elif (MSG_OPT_OS == MSG_OPT_OS_freeRTOS) +#elif (MSG_OPT_OS == MSG_OPT_OS_RT_THREAD) +#elif (MSG_OPT_OS == MSG_OPT_OS_NONE_OR_OSAL) +#elif (MSG_OPT_OS == MSG_OPT_OS_USER_DEFINED) +#endif +} + +/** + * @brief 等待消息 + * + * @param sem:信号量 + * @param ms_timeout:超时时间(ms)其中0:消息直接返回;-1:一直阻塞 + * @return MSG_WAIT_RET: + * WAIT_FAIL = -1, 等待失败 + * WAIT_TIMEOUT = -1, 等待超时 + * WAIT_MSG_COME = -1, 有新消息 + */ +enum MSG_WAIT_RET msg_sem_wait(msg_sem_t *sem, int ms_timeout) +{ +#if (MSG_OPT_OS == MSG_OPT_OS_LINUX) + uint64_t val; + int ret = poll(&sem->poll_fd, 1, ms_timeout); + if (ret == -1) { + perror("poll fail\n"); + return WAIT_FAIL; + } else if (ret == 0) { + // printf("poll timeout\n"); + return WAIT_TIMEOUT; + } + + if (sem->poll_fd.revents & POLLIN) { + if (read(sem->poll_fd.fd, &val, sizeof(val)) == sizeof(val)) { // 清零 + // printf("read %ld\n", val); + return WAIT_MSG_COME; + } + else { + printf("read error\n"); + } + } + + // 检查错误事件 + printf("something err happened,revents:%x\n", sem->poll_fd.revents); + return WAIT_FAIL; +#elif (MSG_OPT_OS == MSG_OPT_OS_freeRTOS) +#elif (MSG_OPT_OS == MSG_OPT_OS_RT_THREAD) +#elif (MSG_OPT_OS == MSG_OPT_OS_NONE_OR_OSAL) +#elif (MSG_OPT_OS == MSG_OPT_OS_USER_DEFINED) +#endif +} + +#endif diff --git a/mw/has_task_msg_manager/has_task_msg_os_port.h b/mw/has_task_msg_manager/has_task_msg_os_port.h index e256492..c369495 100644 --- a/mw/has_task_msg_manager/has_task_msg_os_port.h +++ b/mw/has_task_msg_manager/has_task_msg_os_port.h @@ -14,8 +14,15 @@ // #define MSG_OPT_OS MSG_OPT_OS_NONE_OR_OSAL // 裸机或osal // #define MSG_OPT_OS MSG_OPT_OS_USER_DEFINED // 自定义 -#define MSG_OPT_USE_MUTEX // 支持互斥量 +#define MSG_OPT_MUTEX_SEM // 支持互斥量和信号量 +#ifdef MSG_OPT_MUTEX_SEM +enum MSG_WAIT_RET{ + WAIT_FAIL = -1, // 等待失败 + WAIT_TIMEOUT, // 等待超时 + WAIT_MSG_COME, // 需要处理消息 +}; +#endif /* ----------------------- for Linux ----------------------- */ #if (MSG_OPT_OS == MSG_OPT_OS_LINUX) @@ -24,11 +31,22 @@ #include #include -#ifdef MSG_OPT_USE_MUTEX +#ifdef MSG_OPT_MUTEX_SEM +#include +#include +#include #define msg_mutex_t pthread_mutex_t #define msg_mutex_init(m) pthread_mutex_init(m, NULL) #define msg_mutex_lock(m) pthread_mutex_lock(m) #define msg_mutex_unlock(m) pthread_mutex_unlock(m) +typedef struct { + struct pollfd poll_fd; +} sync_t; +#define msg_sem_t sync_t + +int msg_sem_init(msg_sem_t *sem); +int msg_sem_notify(msg_sem_t *sem); +enum MSG_WAIT_RET msg_sem_wait(msg_sem_t *sem, int ms_timeout); #else #define msg_mutex_t #define msg_mutex_init(m) @@ -46,7 +64,7 @@ #include #include "FreeRTOS.h" #include "semphr.h" -#ifdef MSG_OPT_USE_MUTEX +#ifdef MSG_OPT_MUTEX_SEM #define msg_mutex_t SemaphoreHandle_t static inline void msg_mutex_init(void *mutex) { *mutex = xSemaphoreCreateMutex(); @@ -64,6 +82,13 @@ static inline void msg_mutex_lock(void *mutex) { static inline void msg_mutex_unlock(void *mutex) { xSemaphoreGive(*mutex); } + +#define msg_sem_t int + +int msg_sem_init(msg_sem_t *sem); +int msg_sem_notify(msg_sem_t *sem); +enum MSG_WAIT_RET msg_sem_wait(msg_sem_t *sem, int ms_timeout); + #else #define msg_mutex_t #define msg_mutex_init(m) @@ -80,7 +105,7 @@ static inline void msg_mutex_unlock(void *mutex) { #include #include #include -#ifdef MSG_OPT_USE_MUTEX +#ifdef MSG_OPT_MUTEX_SEM #define msg_mutex_t rt_mutex_t static inline void msg_mutex_init(void *mutex) { *mutex = rt_mutex_create("has_msg", RT_IPC_FLAG_FIFO); @@ -98,6 +123,13 @@ static inline void msg_mutex_lock(void *mutex) { static inline void msg_mutex_unlock(void *mutex) { rt_mutex_release(*mutex); } + +#define msg_sem_t int + +int msg_sem_init(msg_sem_t *sem); +int msg_sem_notify(msg_sem_t *sem); +enum MSG_WAIT_RET msg_sem_wait(msg_sem_t *sem, int ms_timeout); + #else #define msg_mutex_t #define msg_mutex_init(m) @@ -113,7 +145,7 @@ static inline void msg_mutex_unlock(void *mutex) { #elif (MSG_OPT_OS == MSG_OPT_OS_NONE_OR_OSAL) #include #include -#undef MSG_OPT_USE_MUTEX +#undef MSG_OPT_MUTEX_SEM #define msg_mutex_t #define msg_mutex_init(m) @@ -129,7 +161,7 @@ static inline void msg_mutex_unlock(void *mutex) { #include #include -#ifdef MSG_OPT_USE_MUTEX +#ifdef MSG_OPT_MUTEX_SEM #define msg_mutex_t int/* to be done */ static inline void msg_mutex_init(void *mutex) { /* to be done */ @@ -142,6 +174,13 @@ static inline void msg_mutex_lock(void *mutex) { static inline void msg_mutex_unlock(void *mutex) { /* to be done */ } + +#define msg_sem_t int // to be done + +int msg_sem_init(msg_sem_t *sem); +int msg_sem_notify(msg_sem_t *sem); +enum MSG_WAIT_RET msg_sem_wait(msg_sem_t *sem, int ms_timeout); + #else #define msg_mutex_t #define msg_mutex_init(m)