Files
fusion/mw/has_task_msg_manager/has_task_msg_mamanger.c
2025-12-24 17:00:23 +08:00

582 lines
19 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include "list.h"
#include "has_task_msg.h"
#include "has_task_msg_os_port.h"
// #define MSG_OPT_DYNAMIC_SUB // 允许动态订阅
// #define MSG_OPT_DEBUG // 调试
#define MSG_OPT_PUB_LIST // 使用静态发布表(多条消息单次分配内存)
/* 暂时不支持发布列表和动态订阅同时使用,后续需要再更新支持 */
#if defined(MSG_OPT_DYNAMIC_SUB) && defined(MSG_OPT_PUB_LIST)
#error "These two functions are not currently supported to be used together."
#endif
#ifdef MSG_OPT_DEBUG
/* 打开检查订阅表是否配置正确 */
#define MSG_OPT_CHECK_SUBSCRIBE_LIST
#endif
#pragma pack(1)
#ifdef MSG_OPT_PUB_LIST
/* 消息数据 */
typedef struct {
unsigned char module_id; // 发布的模块ID
unsigned int node_cnt; // 被引用节点数
unsigned int length;
unsigned char buffer[];
} has_msg_buffer_t;
/* 消息节点 */
typedef struct {
struct list_head list; // 消息链表
has_msg_buffer_t *msg_buff;
} has_msg_node_t;
#else
/* 消息节点和数据 */
typedef struct {
unsigned char module_id; // 发布的模块ID
struct list_head list; // 消息链表
unsigned int length;
unsigned char buffer[];
} has_msg_node_t;
#endif
#pragma pack()
#ifdef MSG_OPT_PUB_LIST
/* 静态发布列表 */
typedef struct {
unsigned char pub_module[MODULE_MAX - 1]; // 发布表
#ifdef MSG_OPT_USE_MUTEX
msg_mutex_t buffer_mutex; // 消息互斥量
#endif
unsigned char pub_num;
} has_static_pub_list_t;
static has_static_pub_list_t g_pub_list[MODULE_MAX] = {0};
#endif
/* 静态订阅列表 */
typedef struct {
#if defined(MSG_OPT_CHECK_SUBSCRIBE_LIST) || defined(MSG_OPT_DYNAMIC_SUB)
unsigned char module_id; // 当前模块ID
unsigned char sub_module_cnt; // 订阅数量
unsigned char sub_module[MODULE_MAX - 1]; // 订阅表
#else
const unsigned char module_id; // 当前模块ID
const unsigned char sub_module_cnt; // 订阅数量
const unsigned char sub_module[MODULE_MAX - 1]; // 订阅表
#endif
#ifdef MSG_OPT_USE_MUTEX
msg_mutex_t msg_mutex; // 消息互斥量
#endif
struct list_head msg_list; // 消息链表
} has_static_sub_list_t;
static has_static_sub_list_t g_sub_list[] = SUBSCIBE_INFO;
static unsigned char g_sub_list_index[MODULE_MAX] = {0}; // 订阅表id索引
#ifdef MSG_OPT_CHECK_SUBSCRIBE_LIST
#define MSG_CHECK_SUBSCIBE_INFO(id, ret) \
if ((id) == INVALID_ID) { \
msg_printf("%s invalid id:0, please check SUBSCIBE_INFO\n", __func__); \
return (ret);}
#else
#define MSG_CHECK_SUBSCIBE_INFO(id, ret)
#endif
/* 检查ID是否合法id:检查的IDret:不合法时return的结果 */
#define MSG_CHECK_MODULE_ID(id, ret) \
if (((id) >= MODULE_MAX) || ((id) == INVALID_ID)) { \
msg_printf("%s ID wrong:%d\n", __func__, (id)); return (ret); }
int has_msg_init() {
int j;
memset(g_sub_list_index, 0, sizeof(g_sub_list_index));
#ifdef MSG_OPT_PUB_LIST
memset(g_pub_list, 0, sizeof(g_pub_list));
#endif
for (int i = 0; i < (sizeof(g_sub_list) / sizeof(g_sub_list[0])); i++) {
#ifdef MSG_OPT_CHECK_SUBSCRIBE_LIST
/* 检查各模块ID是否配置正确 */
if ((g_sub_list[i].module_id >= MODULE_MAX) || (g_sub_list[i].module_id == INVALID_ID)) {
msg_printf("module_id:%d is invalid, please check the macro:SUBSCIBE_INFO row:%d\n", g_sub_list[i].module_id, i);
memset(g_sub_list, 0, sizeof(g_sub_list));
#ifdef MSG_OPT_PUB_LIST
memset(g_pub_list, 0, sizeof(g_pub_list));
#endif
return -1;
}
/* 检查订阅表ID是否配置正确 */
for (j = 0; j < (MODULE_MAX - 1); j++) {
if (g_sub_list[i].sub_module[j] == INVALID_ID) {
break;
} else if (g_sub_list[i].sub_module[j] >= MODULE_MAX) {
msg_printf("sub_module id:%d is invalid, module:id %d\n please check the macro:\
SUBSCIBE_INFO row:%d, sub list:%d\n", g_sub_list[i].sub_module[j], g_sub_list[i].module_id, i, j);
memset(g_sub_list, 0, sizeof(g_sub_list));
#ifdef MSG_OPT_PUB_LIST
memset(g_pub_list, 0, sizeof(g_pub_list));
#endif
return -1;
}
}
/* 检查订阅数量是否配置正确 */
if (g_sub_list[i].sub_module_cnt != j) {
msg_printf("sub_module_cnt is config as %d, which actually is %d, please check the\
macro: SUBSCIBE_INFO row:%d\n", g_sub_list[i].sub_module_cnt, j, i);
memset(g_sub_list, 0, sizeof(g_sub_list));
#ifdef MSG_OPT_PUB_LIST
memset(g_pub_list, 0, sizeof(g_pub_list));
#endif
return -1;
}
#endif
g_sub_list_index[g_sub_list[i].module_id] = i; // 索引表赋值
#ifdef MSG_OPT_PUB_LIST
/* 构建发布表 */
for (int k = 0; k < g_sub_list[i].sub_module_cnt; k++) {
if (g_pub_list[g_sub_list[i].sub_module[k]].pub_num == 0) {
msg_mutex_init(&g_pub_list[g_sub_list[i].sub_module[k]].buffer_mutex);
}
g_pub_list[g_sub_list[i].sub_module[k]].pub_module[g_pub_list[g_sub_list[i].sub_module[k]].pub_num] = g_sub_list[i].module_id;
g_pub_list[g_sub_list[i].sub_module[k]].pub_num ++;
}
#endif
msg_mutex_init(&g_sub_list[i].msg_mutex); // 初始化该模块互斥量
INIT_LIST_HEAD(&g_sub_list[i].msg_list); // 初始化订阅链表
}
return 0;
}
int has_msg_publish(has_module_ID_e module_id, void *buffer, unsigned int length) {
has_msg_node_t *node;
if (buffer == NULL) {
return -1;
}
// if (length == 0) {
// return -1;
// }
#ifdef MSG_OPT_PUB_LIST
has_msg_buffer_t *msg_buffer;
unsigned char sub_index;
MSG_CHECK_MODULE_ID(module_id, -1);
if (g_pub_list[module_id].pub_num == 0) {
// msg_printf("no one subscribe you:%d\n", module_id);
return 0;
}
/* 根据订阅表分配内存 */
node = (has_msg_node_t *)msg_malloc((sizeof(has_msg_node_t) * g_pub_list[module_id].pub_num) + sizeof(has_msg_buffer_t) + length);
if (node == NULL) {
msg_printf("malloc fail,pub id:%d\n", module_id);
return -1;
}
msg_buffer = (has_msg_buffer_t *)(&node[g_pub_list[module_id].pub_num]);
msg_buffer->node_cnt = g_pub_list[module_id].pub_num;
msg_buffer->length = length;
msg_buffer->module_id = module_id;
memcpy(msg_buffer->buffer, buffer, length);
/* 查订阅表添加消息 */
for (int i = 0; i < g_pub_list[module_id].pub_num; i++) {
sub_index = g_sub_list_index[g_pub_list[module_id].pub_module[i]];
node[i].msg_buff = msg_buffer;
msg_mutex_lock(&g_sub_list[sub_index].msg_mutex);
list_add_tail(&node[i].list, &g_sub_list[sub_index].msg_list); // 加入消息链表
msg_mutex_unlock(&g_sub_list[sub_index].msg_mutex);
}
#else
int i;
for (i = 0; i < (sizeof(g_sub_list) / sizeof(g_sub_list[0])); i++) { // 遍历模块
#ifdef MSG_OPT_DYNAMIC_SUB
msg_mutex_lock(&g_sub_list[i].msg_mutex);
#endif
for (int j = 0; j < g_sub_list[i].sub_module_cnt; j++) { // 查找订阅表
if (module_id == g_sub_list[i].sub_module[j]) { // 找到订阅信息
node = msg_malloc(sizeof(has_msg_node_t) + length);
if (node == NULL) {
msg_printf("malloc fail,pub id:%d\n", module_id);
#ifdef MSG_OPT_DYNAMIC_SUB
msg_mutex_unlock(&g_sub_list[i].msg_mutex);
#endif
return -1;
}
node->length = length;
node->module_id = module_id;
memcpy(node->buffer, buffer, length);
#ifndef MSG_OPT_DYNAMIC_SUB
msg_mutex_lock(&g_sub_list[i].msg_mutex);
#endif
list_add_tail(&node->list, &g_sub_list[i].msg_list); // 加入消息链表
#ifndef MSG_OPT_DYNAMIC_SUB
msg_mutex_unlock(&g_sub_list[i].msg_mutex);
#endif
break; // 不可重复订阅
}
}
#ifdef MSG_OPT_DYNAMIC_SUB
msg_mutex_unlock(&g_sub_list[i].msg_mutex);
#endif
}
#endif
return 0;
}
/* TODO:更合理的方式是每个消息一个互斥量但是可能会导致每个消息malloc和free的时候频繁init和deinit */
#ifdef MSG_OPT_PUB_LIST
/* 检查计数判断该消息是否被所有订阅模块处理完,处理完就释放内存
msg_buff_stu消息结构体指针 np:节点结构体指针 */
#define MSG_CHECK_AND_FREE_MESSAGE(msg_buff_stu, np) \
{ \
msg_mutex_lock(&g_pub_list[msg_buff_stu->module_id].buffer_mutex); \
msg_buff_stu->node_cnt --; \
if (msg_buff_stu->node_cnt == 0) { \
np = (has_msg_node_t *)((unsigned char *)msg_buff_stu \
- g_pub_list[msg_buff_stu->module_id].pub_num * sizeof(has_msg_node_t)); \
msg_free(np);} \
msg_mutex_unlock(&g_pub_list[msg_buff_stu->module_id].buffer_mutex); \
}
#endif
int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb) {
has_msg_node_t *node;
has_static_sub_list_t *sub_list;
MSG_CHECK_MODULE_ID(module_id, -1);
sub_list = &g_sub_list[g_sub_list_index[module_id]];
MSG_CHECK_SUBSCIBE_INFO(sub_list->module_id, -1);
/* 处理一个消息,先入先出 */
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
node = list_first_entry(&sub_list->msg_list, has_msg_node_t, list);
list_del(&node->list);
msg_mutex_unlock(&sub_list->msg_mutex);
} else { // 没有消息需要处理
msg_mutex_unlock(&sub_list->msg_mutex);
return -1;
}
#ifdef MSG_OPT_PUB_LIST
has_msg_buffer_t *msg_buffer;
msg_buffer = node->msg_buff;
if (cb != NULL) {
cb(msg_buffer->module_id, msg_buffer->buffer, msg_buffer->length);
}
MSG_CHECK_AND_FREE_MESSAGE(msg_buffer, node);
#else
if (cb != NULL) {
cb(node->module_id, node->buffer, node->length);
}
#endif
msg_free(node);
return 0;
}
int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb, has_module_ID_e pub_module_id) {
has_msg_node_t *node, *temp;
has_static_sub_list_t *sub_list;
MSG_CHECK_MODULE_ID(module_id, -1);
MSG_CHECK_MODULE_ID(pub_module_id, -1);
sub_list = &g_sub_list[g_sub_list_index[module_id]];
MSG_CHECK_SUBSCIBE_INFO(sub_list->module_id, -1);
/* 处理一个指定模块的消息,先入先出 */
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
list_for_each_entry_safe(node, temp, &sub_list->msg_list, list) {
#ifdef MSG_OPT_PUB_LIST
if (node->msg_buff->module_id == pub_module_id) {
#else
if (node->module_id == pub_module_id) {
#endif
list_del(&node->list);
break;
}
}
msg_mutex_unlock(&sub_list->msg_mutex);
} else { // 没有消息需要处理
msg_mutex_unlock(&sub_list->msg_mutex);
return -1;
}
#ifdef MSG_OPT_PUB_LIST
has_msg_buffer_t *msg_buffer;
msg_buffer = node->msg_buff;
if (cb != NULL) {
cb(msg_buffer->module_id, msg_buffer->buffer, msg_buffer->length);
}
MSG_CHECK_AND_FREE_MESSAGE(msg_buffer, node);
#else
if (cb != NULL) {
cb(node->module_id, node->buffer, node->length);
}
#endif
msg_free(node);
return 0;
}
int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb) {
has_msg_node_t *node;
has_static_sub_list_t *sub_list;
MSG_CHECK_MODULE_ID(module_id, -1);
sub_list = &g_sub_list[g_sub_list_index[module_id]];
MSG_CHECK_SUBSCIBE_INFO(sub_list->module_id, -1);
/* 处理最新的消息 */
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
node = list_last_entry(&sub_list->msg_list, has_msg_node_t, list);
list_del(&node->list);
msg_mutex_unlock(&sub_list->msg_mutex);
} else { // 没有消息需要处理
msg_mutex_unlock(&sub_list->msg_mutex);
return -1;
}
#ifdef MSG_OPT_PUB_LIST
has_msg_buffer_t *msg_buffer;
msg_buffer = node->msg_buff;
if (cb != NULL) {
cb(msg_buffer->module_id, msg_buffer->buffer, msg_buffer->length);
}
MSG_CHECK_AND_FREE_MESSAGE(msg_buffer, node);
#else
if (cb != NULL) {
cb(node->module_id, node->buffer, node->length);
}
#endif
msg_free(node);
return 0;
}
/* 将消息复制出接口处理,不建议使用 */
#if 1
unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out) {
has_msg_node_t *node;
has_static_sub_list_t *sub_list;
unsigned int len = 0;
if ((module_id >= MODULE_MAX) || (module_id == INVALID_ID) || (buf_out == NULL)) {
msg_printf("%s id of buff wrong:%d\n", __func__, module_id);
return 0;
}
sub_list = &g_sub_list[g_sub_list_index[module_id]];
MSG_CHECK_SUBSCIBE_INFO(sub_list->module_id, 0);
/* 处理一个消息,先入先出 */
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
node = list_first_entry(&sub_list->msg_list, has_msg_node_t, list);
list_del(&node->list);
msg_mutex_unlock(&sub_list->msg_mutex);
} else { // 没有消息需要处理
msg_mutex_unlock(&sub_list->msg_mutex);
return 0;
}
#ifdef MSG_OPT_PUB_LIST
has_msg_buffer_t *msg_buffer;
msg_buffer = node->msg_buff;
memcpy(buf_out, msg_buffer->buffer, msg_buffer->length);
if (pub_module_id != NULL) {
*pub_module_id = msg_buffer->module_id;
}
len = msg_buffer->length;
MSG_CHECK_AND_FREE_MESSAGE(msg_buffer, node);
#else
memcpy(buf_out, node->buffer, node->length);
if (pub_module_id != NULL) {
*pub_module_id = node->module_id;
}
len = node->length;
msg_free(node);
#endif
return len;
}
unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out) {
has_msg_node_t *node;
has_static_sub_list_t *sub_list;
unsigned int len = 0;
if ((module_id >= MODULE_MAX) || (module_id == INVALID_ID) || (buf_out == NULL)) {
msg_printf("%s id of buff wrong:%d\n", __func__, module_id);
return 0;
}
sub_list = &g_sub_list[g_sub_list_index[module_id]];
MSG_CHECK_SUBSCIBE_INFO(sub_list->module_id, 0);
/* 处理最新的消息 */
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
node = list_last_entry(&sub_list->msg_list, has_msg_node_t, list);
list_del(&node->list);
msg_mutex_unlock(&sub_list->msg_mutex);
} else { // 没有消息需要处理
msg_mutex_unlock(&sub_list->msg_mutex);
return 0;
}
#ifdef MSG_OPT_PUB_LIST
has_msg_buffer_t *msg_buffer;
msg_buffer = node->msg_buff;
memcpy(buf_out, msg_buffer->buffer, msg_buffer->length);
if (pub_module_id != NULL) {
*pub_module_id = msg_buffer->module_id;
}
len = msg_buffer->length;
MSG_CHECK_AND_FREE_MESSAGE(msg_buffer, node);
#else
memcpy(buf_out, node->buffer, node->length);
if (pub_module_id != NULL) {
*pub_module_id = node->module_id;
}
len = node->length;
msg_free(node);
#endif
return len;
}
#endif
unsigned int has_msg_is_message_empty(has_module_ID_e module_id) {
has_msg_node_t *node, *temp;
has_static_sub_list_t *sub_list;
unsigned int num = 0;
MSG_CHECK_MODULE_ID(module_id, -1);
sub_list = &g_sub_list[g_sub_list_index[module_id]];
MSG_CHECK_SUBSCIBE_INFO(sub_list->module_id, -1);
msg_mutex_lock(&sub_list->msg_mutex);
if (list_empty(&sub_list->msg_list)) {
msg_mutex_unlock(&sub_list->msg_mutex);
return 0;
} else {
msg_mutex_unlock(&sub_list->msg_mutex);
return -1;
}
}
unsigned int has_msg_get_message_number(has_module_ID_e module_id) {
has_msg_node_t *node, *temp;
has_static_sub_list_t *sub_list;
unsigned int num = 0;
MSG_CHECK_MODULE_ID(module_id, 0);
sub_list = &g_sub_list[g_sub_list_index[module_id]];
MSG_CHECK_SUBSCIBE_INFO(sub_list->module_id, 0);
msg_mutex_lock(&sub_list->msg_mutex);
list_for_each_entry_safe(node, temp, &sub_list->msg_list, list) {
num ++;
}
msg_mutex_unlock(&sub_list->msg_mutex);
return num;
}
int has_msg_printf_subscribe(has_module_ID_e module_id) {
has_static_sub_list_t *sub_list;
MSG_CHECK_MODULE_ID(module_id, -1);
sub_list = &g_sub_list[g_sub_list_index[module_id]];
MSG_CHECK_SUBSCIBE_INFO(sub_list->module_id, -1);
msg_printf("module:%d subscribes to:\n", module_id);
for (int i = 0; i < sub_list->sub_module_cnt; i++) {
msg_printf("%d ", sub_list->sub_module[i]);
}
return 0;
}
int has_msg_delete_all_message(has_module_ID_e module_id) {
has_msg_node_t *node, *temp;
has_static_sub_list_t *sub_list;
MSG_CHECK_MODULE_ID(module_id, -1);
sub_list = &g_sub_list[g_sub_list_index[module_id]];
MSG_CHECK_SUBSCIBE_INFO(sub_list->module_id, -1);
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
list_for_each_entry_safe(node, temp, &sub_list->msg_list, list) {
list_del(&node->list);
msg_free(node);
}
}
INIT_LIST_HEAD(&sub_list->msg_list);
msg_mutex_unlock(&sub_list->msg_mutex);
return 0;
}
#ifdef MSG_OPT_DYNAMIC_SUB
int has_msg_subscribe(has_module_ID_e module_id, has_module_ID_e sub_id) {
has_static_sub_list_t *sub_list;
int j;
if ((module_id >= MODULE_MAX) || (sub_id >= MODULE_MAX)
|| (module_id == INVALID_ID) || (sub_id == INVALID_ID)) {
return -1;
}
sub_list = &g_sub_list[g_sub_list_index[module_id]];
MSG_CHECK_SUBSCIBE_INFO(sub_list->module_id, -1);
msg_mutex_lock(&sub_list->msg_mutex);
if (sub_list->sub_module_cnt < (MODULE_MAX - 1)) {
/* 检查是否已经订阅 */
for (j = 0; j < sub_list->sub_module_cnt; j++) {
if (sub_id == sub_list->sub_module[j]) {
msg_mutex_unlock(&sub_list->msg_mutex);
msg_printf("ID:%d has been sub by module:%d\n", sub_id, module_id);
return -1;
}
}
/* 加入该模块的订阅表中 */
sub_list->sub_module[j] = sub_id;
sub_list->sub_module_cnt ++;
msg_mutex_unlock(&sub_list->msg_mutex);
return 0;
}
else {
msg_mutex_unlock(&sub_list->msg_mutex);
msg_printf("module:%d subscribe fail\n", module_id);
return -1;
}
}
int has_msg_unsubscribe(has_module_ID_e module_id, has_module_ID_e sub_id) {
has_static_sub_list_t *sub_list;
if ((module_id >= MODULE_MAX) || (sub_id >= MODULE_MAX)
|| (module_id == INVALID_ID) || (sub_id == INVALID_ID)) {
return -1;
}
sub_list = &g_sub_list[g_sub_list_index[module_id]];
MSG_CHECK_SUBSCIBE_INFO(sub_list->module_id, -1);
msg_mutex_lock(&sub_list->msg_mutex);
for (int j = 0; j < sub_list->sub_module_cnt; j++) {
if (sub_id == sub_list->sub_module[j]) {
sub_list->sub_module[j] = INVALID_ID;
sub_list->sub_module_cnt --;
}
}
msg_mutex_unlock(&sub_list->msg_mutex);
return 0;
}
#endif