#include #include "list.h" #include "has_task_msg.h" #include "has_task_msg_os_port.h" // #define MSG_OPT_DYNAMIC_SUB // 允许动态订阅 // #define MSG_OPT_DEBUG // 调试 #define MSG_OPT_PUB_LIST // 使用静态发布表(多条消息单次分配内存) /* 暂时不支持发布列表和动态订阅同时使用,后续需要再更新支持 */ #if defined(MSG_OPT_DYNAMIC_SUB) && defined(MSG_OPT_PUB_LIST) #error "These two functions are not currently supported to be used together." #endif #ifdef MSG_OPT_DEBUG #define msg_debug(fmt, ...) msg_printf("[msg]"fmt, ##__VA_ARGS__) #else #define msg_debug(fmt, ...) #endif #define MSG_ASSERT {while(1);} #pragma pack(1) #ifdef MSG_OPT_PUB_LIST /* 消息数据 */ typedef struct { unsigned char module_id; // 发布的模块ID unsigned int node_cnt; // 被引用节点数 unsigned int length; unsigned char buffer[]; } has_msg_buffer_t; /* 消息节点 */ typedef struct { struct list_head list; // 消息链表 has_msg_buffer_t *msg_buff; } has_msg_node_t; #else /* 消息节点和数据 */ typedef struct { unsigned char module_id; // 发布的模块ID struct list_head list; // 消息链表 unsigned int length; unsigned char buffer[]; } has_msg_node_t; #endif #pragma pack() #ifdef MSG_OPT_PUB_LIST /* 静态发布列表 */ typedef struct { unsigned char pub_module[MODULE_MAX - 1]; // 发布表 #ifdef MSG_OPT_USE_MUTEX msg_mutex_t buffer_mutex; // 消息互斥量 #endif unsigned char pub_num; } has_static_pub_list_t; static has_static_pub_list_t g_pub_list[MODULE_MAX] = {0}; #endif /* 静态订阅列表 */ typedef struct { #if defined(MSG_OPT_DYNAMIC_SUB) unsigned char module_id; // 当前模块ID unsigned char sub_module_cnt; // 订阅数量 unsigned char sub_module[MODULE_MAX - 1]; // 订阅表 #else const unsigned char module_id; // 当前模块ID const unsigned char sub_module_cnt; // 订阅数量 const unsigned char sub_module[MODULE_MAX - 1]; // 订阅表 #endif #ifdef MSG_OPT_USE_MUTEX msg_mutex_t msg_mutex; // 消息互斥量 #endif struct list_head msg_list; // 消息链表 } has_static_sub_list_t; static has_static_sub_list_t g_sub_list[] = SUBSCIBE_INFO; static unsigned char g_sub_list_index[MODULE_MAX] = {0}; // 订阅表id索引 /* 检查ID是否合法,id:检查的ID,ret:不合法时return的值 */ #define MSG_CHECK_MODULE_ID(id, ret) \ if (((id) >= MODULE_MAX) || ((id) == INVALID_ID)) { \ msg_printf("%s ID wrong:%d\n", __func__, (id)); return (ret); } /* 检查ID和索引是否合法,id:检查的ID,ret:不合法时return的值 */ #define MSG_CHECK_ID_AND_INDEX(id, ret) \ if (((id) >= MODULE_MAX) || ((id) == INVALID_ID)) { \ msg_printf("%s ID wrong:%d\n", __func__, (id)); return (ret); } \ if (g_sub_list_index[(id)] == MODULE_MAX) { \ return (ret);} int has_msg_init() { int j; memset(g_sub_list_index, MODULE_MAX, sizeof(g_sub_list_index)); #ifdef MSG_OPT_PUB_LIST memset(g_pub_list, 0, sizeof(g_pub_list)); #endif for (int i = 0; i < (sizeof(g_sub_list) / sizeof(g_sub_list[0])); i++) { /* 检查各模块ID是否配置正确 */ if ((g_sub_list[i].module_id >= MODULE_MAX) || (g_sub_list[i].module_id == INVALID_ID)) { msg_printf("module_id:%d is invalid, please check the macro:SUBSCIBE_INFO row:%d\n", g_sub_list[i].module_id, i); MSG_ASSERT; // 配置都能写错,必须进断言 return -1; } /* 检查订阅表ID是否配置正确 */ for (j = 0; j < (MODULE_MAX - 1); j++) { if (g_sub_list[i].sub_module[j] == INVALID_ID) { break; } else if (g_sub_list[i].sub_module[j] >= MODULE_MAX) { msg_printf("sub_module id:%d is invalid, module:id %d\nplease check the macro:" " SUBSCIBE_INFO row:%d, sub list:%d\n", g_sub_list[i].sub_module[j], g_sub_list[i].module_id, i, j); MSG_ASSERT; // 配置都能写错,必须进断言 return -1; } } /* 检查订阅数量是否配置正确 */ if (g_sub_list[i].sub_module_cnt != j) { msg_printf("sub_module_cnt is config as %d, which actually is %d, please check the" " macro: SUBSCIBE_INFO row:%d\n", g_sub_list[i].sub_module_cnt, j, i); MSG_ASSERT; // 配置都能写错,必须进断言 return -1; } g_sub_list_index[g_sub_list[i].module_id] = i; // 索引表赋值 #ifdef MSG_OPT_PUB_LIST /* 构建发布表 */ for (int k = 0; k < g_sub_list[i].sub_module_cnt; k++) { if (g_pub_list[g_sub_list[i].sub_module[k]].pub_num == 0) { msg_mutex_init(&g_pub_list[g_sub_list[i].sub_module[k]].buffer_mutex); } g_pub_list[g_sub_list[i].sub_module[k]].pub_module[g_pub_list[g_sub_list[i].sub_module[k]].pub_num] = g_sub_list[i].module_id; g_pub_list[g_sub_list[i].sub_module[k]].pub_num ++; } #endif msg_mutex_init(&g_sub_list[i].msg_mutex); // 初始化该模块互斥量 INIT_LIST_HEAD(&g_sub_list[i].msg_list); // 初始化订阅链表 } #ifdef MSG_OPT_DEBUG // msg_printf("show index:\n"); // for (int i = 0; i < MODULE_MAX; i++) // { // msg_printf("sub id:%d, index:%d\n", i, g_sub_list_index[i]); // } // for (int i = 0; i < (sizeof(g_sub_list) / sizeof(g_sub_list[0])); i++) { // msg_printf("module:%d subscribes to:\n", g_sub_list[i].module_id); // for (int x = 0; x < g_sub_list[i].sub_module_cnt; x++) { // msg_printf("%d ", g_sub_list[i].sub_module[x]); // } // msg_printf("\n"); // } #ifdef MSG_OPT_PUB_LIST // msg_printf("\n"); // for (int i = 1; i < MODULE_MAX; i++) { // msg_printf("module:%d publish to:\n", i); // for (int x = 0; x < g_pub_list[i].pub_num; x++) { // msg_printf("%d ", g_pub_list[i].pub_module[x]); // } // msg_printf("\n"); // } #endif #endif return 0; } int has_msg_publish(has_module_ID_e module_id, void *buffer, unsigned int length) { has_msg_node_t *node; if (buffer == NULL) { return -1; } // if (length == 0) { // return -1; // } #ifdef MSG_OPT_PUB_LIST has_msg_buffer_t *msg_buffer; unsigned char sub_index; MSG_CHECK_MODULE_ID(module_id, -1); if (g_pub_list[module_id].pub_num == 0) { // msg_printf("no one subscribe you:%d\n", module_id); return 0; } /* 根据订阅表分配内存 */ node = (has_msg_node_t *)msg_malloc((sizeof(has_msg_node_t) * g_pub_list[module_id].pub_num) + sizeof(has_msg_buffer_t) + length); if (node == NULL) { msg_printf("malloc fail,pub id:%d\n", module_id); return -1; } msg_debug("publish: id:%d node addr:0x%p\n", module_id, node); msg_buffer = (has_msg_buffer_t *)(&node[g_pub_list[module_id].pub_num]); msg_buffer->node_cnt = g_pub_list[module_id].pub_num; msg_buffer->length = length; msg_buffer->module_id = module_id; memcpy(msg_buffer->buffer, buffer, length); /* 查订阅表添加消息 */ for (int i = 0; i < g_pub_list[module_id].pub_num; i++) { sub_index = g_sub_list_index[g_pub_list[module_id].pub_module[i]]; node[i].msg_buff = msg_buffer; msg_mutex_lock(&g_sub_list[sub_index].msg_mutex); list_add_tail(&node[i].list, &g_sub_list[sub_index].msg_list); // 加入消息链表 msg_mutex_unlock(&g_sub_list[sub_index].msg_mutex); } #else int i; msg_debug("publish: id:%d\n", module_id); for (i = 0; i < (sizeof(g_sub_list) / sizeof(g_sub_list[0])); i++) { // 遍历模块 #ifdef MSG_OPT_DYNAMIC_SUB msg_mutex_lock(&g_sub_list[i].msg_mutex); #endif for (int j = 0; j < g_sub_list[i].sub_module_cnt; j++) { // 查找订阅表 if ((unsigned char)module_id == g_sub_list[i].sub_module[j]) { // 找到订阅信息 node = msg_malloc(sizeof(has_msg_node_t) + length); msg_debug("node addr:0x%p\n", node); if (node == NULL) { msg_printf("malloc fail,pub id:%d\n", module_id); #ifdef MSG_OPT_DYNAMIC_SUB msg_mutex_unlock(&g_sub_list[i].msg_mutex); #endif return -1; } node->length = length; node->module_id = module_id; memcpy(node->buffer, buffer, length); #ifndef MSG_OPT_DYNAMIC_SUB msg_mutex_lock(&g_sub_list[i].msg_mutex); #endif list_add_tail(&node->list, &g_sub_list[i].msg_list); // 加入消息链表 #ifndef MSG_OPT_DYNAMIC_SUB msg_mutex_unlock(&g_sub_list[i].msg_mutex); #endif break; // 不可重复订阅 } } #ifdef MSG_OPT_DYNAMIC_SUB msg_mutex_unlock(&g_sub_list[i].msg_mutex); #endif } #endif return 0; } /* TODO:更合理的方式是每个消息一个互斥量,但是可能会导致每个消息malloc和free的时候频繁init和deinit */ #ifdef MSG_OPT_PUB_LIST /* 检查计数判断该消息是否被所有订阅模块处理完,处理完就释放内存 */ static inline void msg_check_and_free_message(has_msg_node_t *node) { has_msg_node_t *temp = NULL; unsigned char id = node->msg_buff->module_id; #ifdef MSG_OPT_USE_MUTEX msg_mutex_t *target_mutex = &g_pub_list[id].buffer_mutex; #endif msg_mutex_lock(target_mutex); node->msg_buff->node_cnt --; if (node->msg_buff->node_cnt == 0) { temp = ((has_msg_node_t *)node->msg_buff - g_pub_list[id].pub_num); } msg_mutex_unlock(target_mutex); if (temp) { msg_debug("free node addr:0x%p\n", temp); msg_free(temp); } } #endif int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb) { has_msg_node_t *node; has_static_sub_list_t *sub_list; MSG_CHECK_ID_AND_INDEX(module_id, -1); sub_list = &g_sub_list[g_sub_list_index[module_id]]; /* 处理一个消息,先入先出 */ msg_mutex_lock(&sub_list->msg_mutex); if (!list_empty(&sub_list->msg_list)) { node = list_first_entry(&sub_list->msg_list, has_msg_node_t, list); list_del(&node->list); msg_mutex_unlock(&sub_list->msg_mutex); } else { // 没有消息需要处理 msg_mutex_unlock(&sub_list->msg_mutex); return -1; } #ifdef MSG_OPT_PUB_LIST if (cb != NULL) { cb(node->msg_buff->module_id, node->msg_buff->buffer, node->msg_buff->length); } msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id); msg_check_and_free_message(node); #else if (cb != NULL) { cb(node->module_id, node->buffer, node->length); } msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node); msg_free(node); #endif return 0; } int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb, has_module_ID_e pub_module_id) { has_msg_node_t *node, *temp; has_static_sub_list_t *sub_list; MSG_CHECK_ID_AND_INDEX(module_id, -1); MSG_CHECK_MODULE_ID(pub_module_id, -1); sub_list = &g_sub_list[g_sub_list_index[module_id]]; /* 处理一个指定模块的消息,先入先出 */ msg_mutex_lock(&sub_list->msg_mutex); if (!list_empty(&sub_list->msg_list)) { list_for_each_entry_safe(node, temp, &sub_list->msg_list, list) { #ifdef MSG_OPT_PUB_LIST if (node->msg_buff->module_id == pub_module_id) { #else if (node->module_id == pub_module_id) { #endif list_del(&node->list); break; } } msg_mutex_unlock(&sub_list->msg_mutex); } else { // 没有消息需要处理 msg_mutex_unlock(&sub_list->msg_mutex); return -1; } #ifdef MSG_OPT_PUB_LIST if (cb != NULL) { cb(node->msg_buff->module_id, node->msg_buff->buffer, node->msg_buff->length); } msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id); msg_check_and_free_message(node); #else if (cb != NULL) { cb(node->module_id, node->buffer, node->length); } msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node); msg_free(node); #endif return 0; } int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb) { has_msg_node_t *node; has_static_sub_list_t *sub_list; MSG_CHECK_ID_AND_INDEX(module_id, -1); sub_list = &g_sub_list[g_sub_list_index[module_id]]; /* 处理最新的消息 */ msg_mutex_lock(&sub_list->msg_mutex); if (!list_empty(&sub_list->msg_list)) { node = list_last_entry(&sub_list->msg_list, has_msg_node_t, list); list_del(&node->list); msg_mutex_unlock(&sub_list->msg_mutex); } else { // 没有消息需要处理 msg_mutex_unlock(&sub_list->msg_mutex); return -1; } #ifdef MSG_OPT_PUB_LIST if (cb != NULL) { cb(node->msg_buff->module_id, node->msg_buff->buffer, node->msg_buff->length); } msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id); msg_check_and_free_message(node); #else if (cb != NULL) { cb(node->module_id, node->buffer, node->length); } msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node); msg_free(node); #endif return 0; } /* 将消息复制出接口处理,不建议使用 */ #if 1 unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out) { has_msg_node_t *node; has_static_sub_list_t *sub_list; unsigned int len = 0; if (buf_out == NULL) { msg_printf("%s buff is NULL\n", __func__); return 0; } MSG_CHECK_ID_AND_INDEX(module_id, 0); sub_list = &g_sub_list[g_sub_list_index[module_id]]; /* 处理一个消息,先入先出 */ msg_mutex_lock(&sub_list->msg_mutex); if (!list_empty(&sub_list->msg_list)) { node = list_first_entry(&sub_list->msg_list, has_msg_node_t, list); list_del(&node->list); msg_mutex_unlock(&sub_list->msg_mutex); } else { // 没有消息需要处理 msg_mutex_unlock(&sub_list->msg_mutex); return 0; } #ifdef MSG_OPT_PUB_LIST memcpy(buf_out, node->msg_buff->buffer, node->msg_buff->length); if (pub_module_id != NULL) { *pub_module_id = node->msg_buff->module_id; } len = node->msg_buff->length; msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id); msg_check_and_free_message(node); #else memcpy(buf_out, node->buffer, node->length); if (pub_module_id != NULL) { *pub_module_id = node->module_id; } len = node->length; msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node); msg_free(node); #endif return len; } unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out) { has_msg_node_t *node; has_static_sub_list_t *sub_list; unsigned int len = 0; if (buf_out == NULL) { msg_printf("%s buff is NULL\n", __func__); return 0; } MSG_CHECK_ID_AND_INDEX(module_id, 0); sub_list = &g_sub_list[g_sub_list_index[module_id]]; /* 处理最新的消息 */ msg_mutex_lock(&sub_list->msg_mutex); if (!list_empty(&sub_list->msg_list)) { node = list_last_entry(&sub_list->msg_list, has_msg_node_t, list); list_del(&node->list); msg_mutex_unlock(&sub_list->msg_mutex); } else { // 没有消息需要处理 msg_mutex_unlock(&sub_list->msg_mutex); return 0; } #ifdef MSG_OPT_PUB_LIST memcpy(buf_out, node->msg_buff->buffer, node->msg_buff->length); if (pub_module_id != NULL) { *pub_module_id = node->msg_buff->module_id; } len = node->msg_buff->length; msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id); msg_check_and_free_message(node); #else memcpy(buf_out, node->buffer, node->length); if (pub_module_id != NULL) { *pub_module_id = node->module_id; } len = node->length; msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node); msg_free(node); #endif return len; } #endif unsigned int has_msg_is_message_empty(has_module_ID_e module_id) { has_msg_node_t *node, *temp; has_static_sub_list_t *sub_list; unsigned int num = 0; MSG_CHECK_ID_AND_INDEX(module_id, -1); sub_list = &g_sub_list[g_sub_list_index[module_id]]; msg_mutex_lock(&sub_list->msg_mutex); if (list_empty(&sub_list->msg_list)) { msg_mutex_unlock(&sub_list->msg_mutex); return 0; } else { msg_mutex_unlock(&sub_list->msg_mutex); return -1; } } unsigned int has_msg_get_message_number(has_module_ID_e module_id) { has_msg_node_t *node, *temp; has_static_sub_list_t *sub_list; unsigned int num = 0; MSG_CHECK_ID_AND_INDEX(module_id, 0); sub_list = &g_sub_list[g_sub_list_index[module_id]]; msg_mutex_lock(&sub_list->msg_mutex); list_for_each_entry_safe(node, temp, &sub_list->msg_list, list) { num ++; } msg_mutex_unlock(&sub_list->msg_mutex); return num; } int has_msg_printf_subscribe(has_module_ID_e module_id) { has_static_sub_list_t *sub_list; MSG_CHECK_ID_AND_INDEX(module_id, -1); sub_list = &g_sub_list[g_sub_list_index[module_id]]; msg_printf("module:%d subscribes to:\n", module_id); for (int i = 0; i < sub_list->sub_module_cnt; i++) { msg_printf("%d ", sub_list->sub_module[i]); } msg_printf("\n"); return 0; } int has_msg_delete_all_message(has_module_ID_e module_id) { has_msg_node_t *node, *temp; has_static_sub_list_t *sub_list; #ifdef MSG_OPT_PUB_LIST struct list_head free_list; has_msg_node_t *free_temp; unsigned char id; INIT_LIST_HEAD(&free_list); #endif MSG_CHECK_ID_AND_INDEX(module_id, -1); sub_list = &g_sub_list[g_sub_list_index[module_id]]; msg_mutex_lock(&sub_list->msg_mutex); if (!list_empty(&sub_list->msg_list)) { list_for_each_entry_safe(node, temp, &sub_list->msg_list, list) { list_del(&node->list); #ifdef MSG_OPT_PUB_LIST list_add_tail(&node->list, &free_list); // 加入临时链表 #else msg_debug("id:%d free addr:0x%p\n", module_id, node); msg_free(node); #endif } } INIT_LIST_HEAD(&sub_list->msg_list); msg_mutex_unlock(&sub_list->msg_mutex); #ifdef MSG_OPT_PUB_LIST /* 释放内存 */ list_for_each_entry_safe(node, temp, &free_list, list) { /* msg_check_and_free_message(node); */ free_temp = NULL; id = node->msg_buff->module_id; msg_mutex_lock(&g_pub_list[id].buffer_mutex); node->msg_buff->node_cnt --; if (node->msg_buff->node_cnt == 0) { free_temp = ((has_msg_node_t *)node->msg_buff - g_pub_list[id].pub_num); } msg_mutex_unlock(&g_pub_list[id].buffer_mutex); if (free_temp) { msg_debug("id:%d free addr:0x%p\n", module_id, free_temp); msg_free(free_temp); } } #endif return 0; } #ifdef MSG_OPT_DYNAMIC_SUB int has_msg_subscribe(has_module_ID_e module_id, has_module_ID_e sub_id) { has_static_sub_list_t *sub_list; int j; if ((sub_id >= MODULE_MAX) || (sub_id == INVALID_ID)) { return -1; } MSG_CHECK_ID_AND_INDEX(module_id, -1); sub_list = &g_sub_list[g_sub_list_index[module_id]]; msg_mutex_lock(&sub_list->msg_mutex); if (sub_list->sub_module_cnt < (MODULE_MAX - 1)) { /* 检查是否已经订阅 */ for (j = 0; j < sub_list->sub_module_cnt; j++) { if (sub_id == sub_list->sub_module[j]) { msg_mutex_unlock(&sub_list->msg_mutex); msg_printf("ID:%d has been sub by module:%d\n", sub_id, module_id); return -1; } } /* 加入该模块的订阅表中 */ sub_list->sub_module[j] = sub_id; sub_list->sub_module_cnt ++; msg_mutex_unlock(&sub_list->msg_mutex); return 0; } else { msg_mutex_unlock(&sub_list->msg_mutex); msg_printf("module:%d subscribe fail\n", module_id); return -1; } } int has_msg_unsubscribe(has_module_ID_e module_id, has_module_ID_e sub_id) { has_static_sub_list_t *sub_list; if ((sub_id >= MODULE_MAX) || (sub_id == INVALID_ID)) { return -1; } MSG_CHECK_ID_AND_INDEX(module_id, -1); sub_list = &g_sub_list[g_sub_list_index[module_id]]; msg_mutex_lock(&sub_list->msg_mutex); for (int j = 0; j < sub_list->sub_module_cnt; j++) { if (sub_id == sub_list->sub_module[j]) { sub_list->sub_module[j] = INVALID_ID; sub_list->sub_module_cnt --; } } msg_mutex_unlock(&sub_list->msg_mutex); return 0; } #endif