Compare commits
8 Commits
a4d1600de7
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 9df7284314 | |||
| a02dc81bbf | |||
| 2f4c00fd3d | |||
| 403447b4bd | |||
| 49db80b5fe | |||
| 83b013c678 | |||
| 79a5899717 | |||
| 3f378c0531 |
5
mw/has_task_msg_manager/README
Normal file
5
mw/has_task_msg_manager/README
Normal file
@@ -0,0 +1,5 @@
|
||||
这个软件用于代码模块之间的消息传递,采用发布-订阅的方式传递消息
|
||||
使用方法:
|
||||
1. 在 has_task_msg_os_port.h 选择操作系统,更改宏MSG_OPT_OS。或选择自定义自己实现接口,如果自己定义需要自己实现has_task_msg_os_port.c的相应接口。
|
||||
2. 在 has_task_msg.h 中添加自己负责的软件模块id号(has_module_ID_e),并且完成订阅表 SUBSCIBE_INFO ,要把模块的id、模块要订阅的id等信息填好。
|
||||
3. 代码中使用:初始化、发布、订阅和消息回调即可。接口在has_task_msg.h中声明,请参考示例 msg_example.c
|
||||
@@ -15,23 +15,42 @@ typedef enum {
|
||||
MODULE_MAX
|
||||
} has_module_ID_e;
|
||||
|
||||
#define CHECK_SUBSCIBE_INFO // 如果打开,代码初始化时会检查订阅表是否写错,开发时应该打开
|
||||
/* 格式:模块 订阅数量 订阅模块 */
|
||||
#define SUBSCIBE_INFO \
|
||||
{ \
|
||||
{GUI, 4, {ACM, SENSOR, WIFI, VOICE}}, \
|
||||
{SENSOR, 1, {GUI}}, \
|
||||
{ACM, 1, {WIFI}}, \
|
||||
{SENSOR, 2, {GUI, WIFI}}, \
|
||||
{WIFI, 0}, \
|
||||
{VOICE, 3, {ACM, VOICE, GUI}}, \
|
||||
}
|
||||
|
||||
/* 消息处理回调 */
|
||||
/*
|
||||
#define SUBSCIBE_INFO \
|
||||
{ \
|
||||
{GUI, 4, {ACM, WIFI, VOICE, SENSOR}}, \
|
||||
{SENSOR, 5, {ACM, GUI, WIFI, VOICE, SENSOR}}, \
|
||||
{ACM, 4, {GUI, WIFI, VOICE, SENSOR}}, \
|
||||
{WIFI, 4, {ACM, GUI, VOICE, SENSOR}}, \
|
||||
{VOICE, 4, {ACM, WIFI, GUI, SENSOR}}, \
|
||||
}
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief 消息处理回调类型
|
||||
* @param module_id:发布者的id号
|
||||
* @param buf:消息数据指针
|
||||
* @param len:消息数据长度
|
||||
* @return 0:成功 -1:失败
|
||||
*/
|
||||
typedef int (*has_msg_handle_cb)(unsigned char module_id, const unsigned char *buf, unsigned int len);
|
||||
|
||||
/* 基础功能 */
|
||||
int has_msg_init(void);
|
||||
int has_msg_os_init(void);
|
||||
|
||||
int has_msg_init_module(has_module_ID_e module_id); // HAS OS调用
|
||||
int has_msg_publish(has_module_ID_e module_id, void *buffer, unsigned int length);
|
||||
int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb);
|
||||
int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb, int ms_timeout);
|
||||
|
||||
unsigned int has_msg_is_message_empty(has_module_ID_e module_id);
|
||||
unsigned int has_msg_get_message_number(has_module_ID_e module_id);
|
||||
@@ -39,9 +58,9 @@ int has_msg_delete_all_message(has_module_ID_e module_id);
|
||||
|
||||
/* 扩展功能 */
|
||||
int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb, has_module_ID_e pub_module_id);
|
||||
int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb);
|
||||
unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out);
|
||||
unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out);
|
||||
int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb, int ms_timeout);
|
||||
unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out, int ms_timeout);
|
||||
unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out, int ms_timeout);
|
||||
int has_msg_printf_subscribe(has_module_ID_e module_id);
|
||||
// int has_msg_subscribe(has_module_ID_e module_id, has_module_ID_e sub_id);
|
||||
// int has_msg_unsubscribe(has_module_ID_e module_id, has_module_ID_e sub_id);
|
||||
|
||||
@@ -1,15 +1,22 @@
|
||||
/*
|
||||
* has_task_msg_manager/has_task_msg_manager.c
|
||||
*
|
||||
* Author: zhangzhaopeng
|
||||
* Version: 1.0
|
||||
* Created on : 2025-12-29
|
||||
* 这个软件用于代码模块之间的消息传递,采用发布-订阅的方式传递消息
|
||||
*
|
||||
*/
|
||||
#include <string.h>
|
||||
#include "list.h"
|
||||
#include "has_task_msg.h"
|
||||
#include "has_task_msg_os_port.h"
|
||||
|
||||
// #define MSG_OPT_DYNAMIC_SUB // 允许动态订阅
|
||||
// #define MSG_OPT_DEBUG // 调试
|
||||
#define MSG_OPT_PUB_LIST // 使用静态发布表(多条消息单次分配内存)
|
||||
|
||||
/* 暂时不支持发布列表和动态订阅同时使用,后续需要再更新支持 */
|
||||
#if defined(MSG_OPT_DYNAMIC_SUB) && defined(MSG_OPT_PUB_LIST)
|
||||
#error "These two functions are not currently supported to be used together."
|
||||
#if defined(MSG_OPT_DYNAMIC_SUB)
|
||||
#error "These functions is not currently supported to be used together."
|
||||
#endif
|
||||
|
||||
#ifdef MSG_OPT_DEBUG
|
||||
@@ -18,10 +25,10 @@
|
||||
#define msg_debug(fmt, ...)
|
||||
#endif
|
||||
|
||||
#define MSG_ASSERT {while(1);}
|
||||
#define MSG_ASSERT \
|
||||
{msg_printf("!!!!!wrong config!!!!! file:%s line:%d\n", __FILE__, __LINE__);while(1);}
|
||||
|
||||
#pragma pack(1)
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
/* 消息数据 */
|
||||
typedef struct {
|
||||
unsigned char module_id; // 发布的模块ID
|
||||
@@ -34,28 +41,17 @@ typedef struct {
|
||||
struct list_head list; // 消息链表
|
||||
has_msg_buffer_t *msg_buff;
|
||||
} has_msg_node_t;
|
||||
#else
|
||||
/* 消息节点和数据 */
|
||||
typedef struct {
|
||||
unsigned char module_id; // 发布的模块ID
|
||||
struct list_head list; // 消息链表
|
||||
unsigned int length;
|
||||
unsigned char buffer[];
|
||||
} has_msg_node_t;
|
||||
#endif
|
||||
#pragma pack()
|
||||
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
/* 静态发布列表 */
|
||||
typedef struct {
|
||||
unsigned char pub_module[MODULE_MAX - 1]; // 发布表
|
||||
#ifdef MSG_OPT_USE_MUTEX
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
msg_mutex_t buffer_mutex; // 消息互斥量
|
||||
#endif
|
||||
unsigned char pub_num;
|
||||
} has_static_pub_list_t;
|
||||
static has_static_pub_list_t g_pub_list[MODULE_MAX] = {0};
|
||||
#endif
|
||||
|
||||
/* 静态订阅列表 */
|
||||
typedef struct {
|
||||
@@ -68,13 +64,14 @@ typedef struct {
|
||||
const unsigned char sub_module_cnt; // 订阅数量
|
||||
const unsigned char sub_module[MODULE_MAX - 1]; // 订阅表
|
||||
#endif
|
||||
#ifdef MSG_OPT_USE_MUTEX
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
msg_mutex_t msg_mutex; // 消息互斥量
|
||||
msg_sem_t sem;
|
||||
#endif
|
||||
struct list_head msg_list; // 消息链表
|
||||
} has_static_sub_list_t;
|
||||
static has_static_sub_list_t g_sub_list[] = SUBSCIBE_INFO;
|
||||
static unsigned char g_sub_list_index[MODULE_MAX] = {0}; // 订阅表id索引
|
||||
static unsigned char g_sub_list_index[MODULE_MAX]; // 订阅表id索引,同时表示模块是否可用
|
||||
|
||||
|
||||
/* 检查ID是否合法,id:检查的ID,ret:不合法时return的值 */
|
||||
@@ -89,53 +86,85 @@ if (((id) >= MODULE_MAX) || ((id) == INVALID_ID)) { \
|
||||
if (g_sub_list_index[(id)] == MODULE_MAX) { \
|
||||
return (ret);}
|
||||
|
||||
/**
|
||||
* @brief 初始化消息表,必须在所有接口调用之前调用init
|
||||
*
|
||||
* @return 0:成功 -1:失败
|
||||
*/
|
||||
int has_msg_init()
|
||||
{
|
||||
int j;
|
||||
has_static_pub_list_t *pub_list;
|
||||
/* 设置无效,标记是否初始化 */
|
||||
memset(g_sub_list_index, MODULE_MAX, sizeof(g_sub_list_index));
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
memset(g_pub_list, 0, sizeof(g_pub_list));
|
||||
#endif
|
||||
for (int i = 0; i < (sizeof(g_sub_list) / sizeof(g_sub_list[0])); i++) {
|
||||
for (unsigned int i = 0; i < (sizeof(g_sub_list) / sizeof(g_sub_list[0])); i++) {
|
||||
#ifdef CHECK_SUBSCIBE_INFO
|
||||
/* 检查各模块ID是否配置正确 */
|
||||
if ((g_sub_list[i].module_id >= MODULE_MAX) || (g_sub_list[i].module_id == INVALID_ID)) {
|
||||
msg_printf("module_id:%d is invalid, please check the macro:SUBSCIBE_INFO row:%d\n", g_sub_list[i].module_id, i);
|
||||
msg_printf("module_id:%d is invalid, please check the macro:SUBSCIBE_INFO row:%d\n"
|
||||
, g_sub_list[i].module_id, i);
|
||||
MSG_ASSERT; // 配置都能写错,必须进断言
|
||||
return -1;
|
||||
}
|
||||
/* 已经被初始化了,配置错误 */
|
||||
if (g_sub_list_index[g_sub_list[i].module_id] != MODULE_MAX) {
|
||||
msg_printf("[msg warning]: msg two same module id:%d\n", g_sub_list[i].module_id);
|
||||
MSG_ASSERT; // 配置都能写错,必须进断言
|
||||
}
|
||||
|
||||
/* 检查订阅表ID是否配置正确 */
|
||||
for (j = 0; j < (MODULE_MAX - 1); j++) {
|
||||
if (g_sub_list[i].sub_module[j] == INVALID_ID) {
|
||||
break;
|
||||
} else if (g_sub_list[i].sub_module[j] >= MODULE_MAX) {
|
||||
msg_printf("sub_module id:%d is invalid, module:id %d\nplease check the macro:"
|
||||
" SUBSCIBE_INFO row:%d, sub list:%d\n", g_sub_list[i].sub_module[j], g_sub_list[i].module_id, i, j);
|
||||
msg_printf("sub_module id:%d is invalid, module:id %d\n"
|
||||
"please check the macro: SUBSCIBE_INFO row:%d, sub list:%d\n"
|
||||
, g_sub_list[i].sub_module[j] , g_sub_list[i].module_id, i, j);
|
||||
MSG_ASSERT; // 配置都能写错,必须进断言
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
/* 检查订阅数量是否配置正确 */
|
||||
if (g_sub_list[i].sub_module_cnt != j) {
|
||||
msg_printf("sub_module_cnt is config as %d, which actually is %d, please check the"
|
||||
" macro: SUBSCIBE_INFO row:%d\n", g_sub_list[i].sub_module_cnt, j, i);
|
||||
msg_printf("sub_module_cnt is config as %d, which actually is %d,"
|
||||
"please check the macro: SUBSCIBE_INFO row:%d\n"
|
||||
, g_sub_list[i].sub_module_cnt, j, i);
|
||||
MSG_ASSERT; // 配置都能写错,必须进断言
|
||||
return -1;
|
||||
}
|
||||
g_sub_list_index[g_sub_list[i].module_id] = i; // 索引表赋值
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
/* 构建发布表 */
|
||||
for (int k = 0; k < g_sub_list[i].sub_module_cnt; k++) {
|
||||
if (g_pub_list[g_sub_list[i].sub_module[k]].pub_num == 0) {
|
||||
msg_mutex_init(&g_pub_list[g_sub_list[i].sub_module[k]].buffer_mutex);
|
||||
}
|
||||
#endif
|
||||
|
||||
g_pub_list[g_sub_list[i].sub_module[k]].pub_module[g_pub_list[g_sub_list[i].sub_module[k]].pub_num] = g_sub_list[i].module_id;
|
||||
g_pub_list[g_sub_list[i].sub_module[k]].pub_num ++;
|
||||
/* 通知发布者,需要发给此模块 */
|
||||
for (int k = 0; k < g_sub_list[i].sub_module_cnt; k++) { // 寻找发布者
|
||||
pub_list = &g_pub_list[g_sub_list[i].sub_module[k]];
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
if (pub_list->pub_num == 0) {
|
||||
if (msg_mutex_init(&pub_list->buffer_mutex) != 0) {
|
||||
msg_printf("[msg warning]:pub mutex init err! pub id:%d\n"
|
||||
, g_sub_list[i].sub_module[k]);
|
||||
/* 使用计数没有加锁,释放msg的内存时有可能出错 */
|
||||
continue;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/* 注册所找到发布者的发布表 */
|
||||
pub_list->pub_module[pub_list->pub_num] = g_sub_list[i].module_id;
|
||||
pub_list->pub_num ++;
|
||||
}
|
||||
INIT_LIST_HEAD(&g_sub_list[i].msg_list); // 初始化订阅链表
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
if (msg_sem_init(&g_sub_list[i].sem) != 0) { // 初始化该模块同步量
|
||||
msg_printf("[msg warning]:sub sem init err! sub id:%d\n", i);
|
||||
continue;
|
||||
}
|
||||
if (msg_mutex_init(&g_sub_list[i].msg_mutex) != 0) { // 初始化该模块互斥量
|
||||
msg_printf("[msg warning]:sub mutex init err! sub id:%d\n", i);
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
msg_mutex_init(&g_sub_list[i].msg_mutex); // 初始化该模块互斥量
|
||||
INIT_LIST_HEAD(&g_sub_list[i].msg_list); // 初始化订阅链表
|
||||
g_sub_list_index[g_sub_list[i].module_id] = i; // 索引表赋值,相当于enable
|
||||
}
|
||||
#ifdef MSG_OPT_DEBUG
|
||||
// msg_printf("show index:\n");
|
||||
@@ -150,7 +179,6 @@ int has_msg_init()
|
||||
// }
|
||||
// msg_printf("\n");
|
||||
// }
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
// msg_printf("\n");
|
||||
// for (int i = 1; i < MODULE_MAX; i++) {
|
||||
// msg_printf("module:%d publish to:\n", i);
|
||||
@@ -160,99 +188,126 @@ int has_msg_init()
|
||||
// msg_printf("\n");
|
||||
// }
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief 根据模块分别初始化消息表
|
||||
*
|
||||
* @param module_id:模块id号
|
||||
* @return 0:成功 -1:失败
|
||||
* 用于HAS OS中的初始化
|
||||
*/
|
||||
int has_msg_init_module(has_module_ID_e module_id)
|
||||
{
|
||||
unsigned int i;
|
||||
int j;
|
||||
has_static_pub_list_t *pub_list;
|
||||
/* 检查ID */
|
||||
#ifdef CHECK_SUBSCIBE_INFO
|
||||
if ((module_id >= MODULE_MAX) || (module_id == INVALID_ID)) {
|
||||
msg_printf("%s:module_id:%d is invalid\n", __func__, module_id);
|
||||
MSG_ASSERT; // 配置都能写错,必须进断言
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
/* 初始化该模块订阅表 */
|
||||
for (i = 0; i < (sizeof(g_sub_list) / sizeof(g_sub_list[0])); i++)
|
||||
{
|
||||
if (g_sub_list[i].module_id == (unsigned char)module_id) {
|
||||
#ifdef CHECK_SUBSCIBE_INFO
|
||||
/* 已经被初始化了,配置错误 */
|
||||
if (g_sub_list_index[module_id] != MODULE_MAX) {
|
||||
msg_printf("[msg warning]: msg two same module id:%d\n", module_id);
|
||||
MSG_ASSERT; // 配置都能写错,必须进断言
|
||||
}
|
||||
/* 检查订阅表ID是否配置正确 */
|
||||
for (j = 0; j < (MODULE_MAX - 1); j++) {
|
||||
if (g_sub_list[i].sub_module[j] == INVALID_ID) {
|
||||
break;
|
||||
} else if (g_sub_list[i].sub_module[j] >= MODULE_MAX) {
|
||||
msg_printf("sub_module id:%d is invalid, module:id %d\n"
|
||||
"please check the macro: SUBSCIBE_INFO row:%d, sub list:%d\n"
|
||||
, g_sub_list[i].sub_module[j], module_id, i, j);
|
||||
MSG_ASSERT; // 配置都能写错,必须进断言
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/* 检查订阅数量是否配置正确 */
|
||||
if (g_sub_list[i].sub_module_cnt != j) {
|
||||
msg_printf("sub_module_cnt is config as %d, which actually is %d,"
|
||||
"please check the macro: SUBSCIBE_INFO row:%d\n"
|
||||
, g_sub_list[i].sub_module_cnt, j, i);
|
||||
MSG_ASSERT; // 配置都能写错,必须进断言
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
/* 通知发布者,需要发给此模块 */
|
||||
for (int k = 0; k < g_sub_list[i].sub_module_cnt; k++) { // 寻找发布者
|
||||
pub_list = &g_pub_list[g_sub_list[i].sub_module[k]];
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
if (pub_list->pub_num == 0) {
|
||||
if (msg_mutex_init(&pub_list->buffer_mutex) != 0) {
|
||||
msg_printf("[msg warning]:pub mutex init err! pub id:%d\n"
|
||||
, g_sub_list[i].sub_module[k]);
|
||||
/* 使用计数没有加锁,释放msg的内存时有可能出错 */
|
||||
continue;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/* 注册所找到发布者的发布表 */
|
||||
pub_list->pub_module[pub_list->pub_num] = module_id;
|
||||
pub_list->pub_num ++;
|
||||
}
|
||||
INIT_LIST_HEAD(&g_sub_list[i].msg_list); // 初始化订阅链表
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
if (msg_sem_init(&g_sub_list[i].sem) != 0) { // 初始化该模块同步量
|
||||
msg_printf("[msg warning]:sub sem init err! sub id:%d\n", i);
|
||||
break;
|
||||
}
|
||||
if (msg_mutex_init(&g_sub_list[i].msg_mutex) != 0) { // 初始化该模块互斥量
|
||||
msg_printf("[msg warning]:sub mutex init err! sub id:%d\n", i);
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
g_sub_list_index[module_id] = i; // 索引表赋值,相当于enable
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef MSG_OPT_DEBUG
|
||||
if (i == (sizeof(g_sub_list) / sizeof(g_sub_list[0]))) {
|
||||
msg_debug("module:%d never subscribe any module\n", module_id);
|
||||
}
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int has_msg_publish(has_module_ID_e module_id, void *buffer, unsigned int length)
|
||||
/* 如果是OS初始化,则用下列类似调用方式,先清除
|
||||
需要初始化的模块再调用 has_msg_init_module 传相应ID */
|
||||
int has_msg_os_init()
|
||||
{
|
||||
has_msg_node_t *node;
|
||||
if (buffer == NULL) {
|
||||
return -1;
|
||||
/* 设置无效,标记是否初始化 */
|
||||
memset(g_sub_list_index, MODULE_MAX, sizeof(g_sub_list_index));
|
||||
memset(g_pub_list, 0, sizeof(g_pub_list));
|
||||
for (int i = INVALID_ID + 1; i < MODULE_MAX; i++)
|
||||
{
|
||||
has_msg_init_module(i);
|
||||
}
|
||||
// if (length == 0) {
|
||||
// return -1;
|
||||
// }
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
has_msg_buffer_t *msg_buffer;
|
||||
unsigned char sub_index;
|
||||
|
||||
MSG_CHECK_MODULE_ID(module_id, -1);
|
||||
if (g_pub_list[module_id].pub_num == 0) {
|
||||
// msg_printf("no one subscribe you:%d\n", module_id);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* 根据订阅表分配内存 */
|
||||
node = (has_msg_node_t *)msg_malloc((sizeof(has_msg_node_t) * g_pub_list[module_id].pub_num) + sizeof(has_msg_buffer_t) + length);
|
||||
if (node == NULL) {
|
||||
msg_printf("malloc fail,pub id:%d\n", module_id);
|
||||
return -1;
|
||||
}
|
||||
msg_debug("publish: id:%d node addr:0x%p\n", module_id, node);
|
||||
msg_buffer = (has_msg_buffer_t *)(&node[g_pub_list[module_id].pub_num]);
|
||||
msg_buffer->node_cnt = g_pub_list[module_id].pub_num;
|
||||
msg_buffer->length = length;
|
||||
msg_buffer->module_id = module_id;
|
||||
memcpy(msg_buffer->buffer, buffer, length);
|
||||
/* 查订阅表添加消息 */
|
||||
for (int i = 0; i < g_pub_list[module_id].pub_num; i++) {
|
||||
sub_index = g_sub_list_index[g_pub_list[module_id].pub_module[i]];
|
||||
node[i].msg_buff = msg_buffer;
|
||||
msg_mutex_lock(&g_sub_list[sub_index].msg_mutex);
|
||||
list_add_tail(&node[i].list, &g_sub_list[sub_index].msg_list); // 加入消息链表
|
||||
msg_mutex_unlock(&g_sub_list[sub_index].msg_mutex);
|
||||
}
|
||||
#else
|
||||
int i;
|
||||
|
||||
msg_debug("publish: id:%d\n", module_id);
|
||||
for (i = 0; i < (sizeof(g_sub_list) / sizeof(g_sub_list[0])); i++) { // 遍历模块
|
||||
#ifdef MSG_OPT_DYNAMIC_SUB
|
||||
msg_mutex_lock(&g_sub_list[i].msg_mutex);
|
||||
#endif
|
||||
for (int j = 0; j < g_sub_list[i].sub_module_cnt; j++) { // 查找订阅表
|
||||
if ((unsigned char)module_id == g_sub_list[i].sub_module[j]) { // 找到订阅信息
|
||||
node = msg_malloc(sizeof(has_msg_node_t) + length);
|
||||
msg_debug("node addr:0x%p\n", node);
|
||||
if (node == NULL) {
|
||||
msg_printf("malloc fail,pub id:%d\n", module_id);
|
||||
#ifdef MSG_OPT_DYNAMIC_SUB
|
||||
msg_mutex_unlock(&g_sub_list[i].msg_mutex);
|
||||
#endif
|
||||
return -1;
|
||||
}
|
||||
node->length = length;
|
||||
node->module_id = module_id;
|
||||
memcpy(node->buffer, buffer, length);
|
||||
#ifndef MSG_OPT_DYNAMIC_SUB
|
||||
msg_mutex_lock(&g_sub_list[i].msg_mutex);
|
||||
#endif
|
||||
list_add_tail(&node->list, &g_sub_list[i].msg_list); // 加入消息链表
|
||||
#ifndef MSG_OPT_DYNAMIC_SUB
|
||||
msg_mutex_unlock(&g_sub_list[i].msg_mutex);
|
||||
#endif
|
||||
|
||||
break; // 不可重复订阅
|
||||
}
|
||||
}
|
||||
#ifdef MSG_OPT_DYNAMIC_SUB
|
||||
msg_mutex_unlock(&g_sub_list[i].msg_mutex);
|
||||
#endif
|
||||
}
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* TODO:更合理的方式是每个消息一个互斥量,但是可能会导致每个消息malloc和free的时候频繁init和deinit */
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
|
||||
/* 检查计数判断该消息是否被所有订阅模块处理完,处理完就释放内存 */
|
||||
static inline void msg_check_and_free_message(has_msg_node_t *node)
|
||||
{
|
||||
has_msg_node_t *temp = NULL;
|
||||
unsigned char id = node->msg_buff->module_id;
|
||||
#ifdef MSG_OPT_USE_MUTEX
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
msg_mutex_t *target_mutex = &g_pub_list[id].buffer_mutex;
|
||||
#endif
|
||||
msg_mutex_lock(target_mutex);
|
||||
@@ -262,13 +317,84 @@ static inline void msg_check_and_free_message(has_msg_node_t *node)
|
||||
}
|
||||
msg_mutex_unlock(target_mutex);
|
||||
if (temp) {
|
||||
msg_debug("free node addr:0x%p\n", temp);
|
||||
msg_debug("free node addr:%p\n", temp);
|
||||
msg_free(temp);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb)
|
||||
/**
|
||||
* @brief 发布一个消息
|
||||
*
|
||||
* @param module_id:发布消息的模块id号
|
||||
* @param buffer:消息数据指针
|
||||
* @param length:消息数据长度
|
||||
* @return 0:成功 -1:失败
|
||||
*/
|
||||
int has_msg_publish(has_module_ID_e module_id, void *buffer, unsigned int length)
|
||||
{
|
||||
has_msg_node_t *node;
|
||||
if (buffer == NULL) {
|
||||
return -1;
|
||||
}
|
||||
// if (length == 0) {
|
||||
// return -1;
|
||||
// }
|
||||
has_msg_buffer_t *msg_buffer;
|
||||
has_static_pub_list_t *pub_list;
|
||||
has_static_sub_list_t *sub_list;
|
||||
|
||||
MSG_CHECK_ID_AND_INDEX(module_id, -1);
|
||||
// MSG_CHECK_MODULE_ID 如果使用这个宏检查,未初始化的模块也可以发布
|
||||
pub_list = &g_pub_list[module_id];
|
||||
if (pub_list->pub_num == 0) {
|
||||
// msg_printf("no one subscribe you:%d\n", module_id);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* 根据订阅表分配内存 */
|
||||
node = (has_msg_node_t *)msg_malloc((sizeof(has_msg_node_t) * pub_list->pub_num)
|
||||
+ sizeof(has_msg_buffer_t) + length);
|
||||
if (node == NULL) {
|
||||
msg_printf("malloc fail,pub id:%d\n", module_id);
|
||||
return -1;
|
||||
}
|
||||
msg_debug("publish: id:%d node addr:%p\n", module_id, node);
|
||||
msg_buffer = (has_msg_buffer_t *)(&node[pub_list->pub_num]);
|
||||
msg_buffer->node_cnt = pub_list->pub_num;
|
||||
msg_buffer->length = length;
|
||||
msg_buffer->module_id = module_id;
|
||||
memcpy(msg_buffer->buffer, buffer, length);
|
||||
/* 查订阅表添加消息 */
|
||||
for (int i = 0; i < pub_list->pub_num; i++) {
|
||||
sub_list = &g_sub_list[g_sub_list_index[pub_list->pub_module[i]]];
|
||||
node[i].msg_buff = msg_buffer;
|
||||
msg_mutex_lock(&sub_list->msg_mutex);
|
||||
list_add_tail(&node[i].list, &sub_list->msg_list); // 加入消息链表
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
/* 通知等待线程唤醒 */
|
||||
if (msg_sem_notify(&sub_list->sem) == -1) {
|
||||
list_del(&node[i].list); // 添加失败,删除链表
|
||||
msg_mutex_unlock(&sub_list->msg_mutex);
|
||||
msg_check_and_free_message(&node[i]);
|
||||
msg_printf("[msg warning]:Failed to add msg on module:%d !!!\n"
|
||||
, sub_list->module_id);
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
msg_mutex_unlock(&sub_list->msg_mutex);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief 处理一个消息,先入先出
|
||||
*
|
||||
* @param module_id:处理消息的模块id号
|
||||
* @param cb:消息处理回调
|
||||
* @param ms_timeout:超时时间(ms)其中0:消息直接返回;-1:一直阻塞
|
||||
* @return 0:成功 -1:失败
|
||||
*/
|
||||
int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb, int ms_timeout)
|
||||
{
|
||||
has_msg_node_t *node;
|
||||
has_static_sub_list_t *sub_list;
|
||||
@@ -276,6 +402,12 @@ int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb)
|
||||
|
||||
sub_list = &g_sub_list[g_sub_list_index[module_id]];
|
||||
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) {
|
||||
return -1; // 没有消息需要处理或发生错误
|
||||
}
|
||||
#endif
|
||||
|
||||
/* 处理一个消息,先入先出 */
|
||||
msg_mutex_lock(&sub_list->msg_mutex);
|
||||
if (!list_empty(&sub_list->msg_list)) {
|
||||
@@ -287,25 +419,26 @@ int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb)
|
||||
return -1;
|
||||
}
|
||||
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
if (cb != NULL) {
|
||||
cb(node->msg_buff->module_id, node->msg_buff->buffer, node->msg_buff->length);
|
||||
}
|
||||
msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id);
|
||||
msg_check_and_free_message(node);
|
||||
#else
|
||||
if (cb != NULL) {
|
||||
cb(node->module_id, node->buffer, node->length);
|
||||
}
|
||||
msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node);
|
||||
msg_free(node);
|
||||
#endif
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb, has_module_ID_e pub_module_id)
|
||||
/**
|
||||
* @brief 处理一个指定发布者的消息,先入先出
|
||||
*
|
||||
* @param module_id:处理消息的模块id号
|
||||
* @param cb:消息处理回调
|
||||
* @param pub_module_id:指定发布者的id号
|
||||
* @return 0:成功 -1:失败
|
||||
*/
|
||||
int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb,
|
||||
has_module_ID_e pub_module_id)
|
||||
{
|
||||
has_msg_node_t *node, *temp;
|
||||
has_static_sub_list_t *sub_list;
|
||||
@@ -318,40 +451,38 @@ int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb, ha
|
||||
msg_mutex_lock(&sub_list->msg_mutex);
|
||||
if (!list_empty(&sub_list->msg_list)) {
|
||||
list_for_each_entry_safe(node, temp, &sub_list->msg_list, list) {
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
if (node->msg_buff->module_id == pub_module_id) {
|
||||
#else
|
||||
if (node->module_id == pub_module_id) {
|
||||
if (node->msg_buff->module_id == (unsigned char)pub_module_id) {
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
if (msg_sem_wait(&sub_list->sem, 0) != WAIT_MSG_COME) {
|
||||
msg_mutex_unlock(&sub_list->msg_mutex);
|
||||
msg_printf("error: list number != sem number\n");
|
||||
return -1; // eventfd和list不一致,理论上一定是 WAIT_MSG_COME
|
||||
}
|
||||
#endif
|
||||
list_del(&node->list);
|
||||
break;
|
||||
msg_mutex_unlock(&sub_list->msg_mutex);
|
||||
if (cb != NULL) {
|
||||
cb(node->msg_buff->module_id, node->msg_buff->buffer, node->msg_buff->length);
|
||||
}
|
||||
msg_debug("by handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id);
|
||||
msg_check_and_free_message(node);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
msg_mutex_unlock(&sub_list->msg_mutex);
|
||||
} else { // 没有消息需要处理
|
||||
msg_mutex_unlock(&sub_list->msg_mutex);
|
||||
return -1;
|
||||
}
|
||||
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
if (cb != NULL) {
|
||||
cb(node->msg_buff->module_id, node->msg_buff->buffer, node->msg_buff->length);
|
||||
}
|
||||
msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id);
|
||||
msg_check_and_free_message(node);
|
||||
#else
|
||||
if (cb != NULL) {
|
||||
cb(node->module_id, node->buffer, node->length);
|
||||
}
|
||||
msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node);
|
||||
msg_free(node);
|
||||
#endif
|
||||
|
||||
|
||||
return 0;
|
||||
// 没有消息需要处理
|
||||
msg_mutex_unlock(&sub_list->msg_mutex);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb)
|
||||
/**
|
||||
* @brief 处理一个最近收到的消息
|
||||
*
|
||||
* @param module_id:处理消息的模块id号
|
||||
* @param cb:消息处理回调
|
||||
* @return 0:成功 -1:失败
|
||||
*/
|
||||
int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb, int ms_timeout)
|
||||
{
|
||||
has_msg_node_t *node;
|
||||
has_static_sub_list_t *sub_list;
|
||||
@@ -359,6 +490,12 @@ int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb)
|
||||
|
||||
sub_list = &g_sub_list[g_sub_list_index[module_id]];
|
||||
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) {
|
||||
return -1; // 没有消息需要处理或发生错误
|
||||
}
|
||||
#endif
|
||||
|
||||
/* 处理最新的消息 */
|
||||
msg_mutex_lock(&sub_list->msg_mutex);
|
||||
if (!list_empty(&sub_list->msg_list)) {
|
||||
@@ -370,19 +507,11 @@ int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb)
|
||||
return -1;
|
||||
}
|
||||
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
if (cb != NULL) {
|
||||
cb(node->msg_buff->module_id, node->msg_buff->buffer, node->msg_buff->length);
|
||||
}
|
||||
msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id);
|
||||
msg_check_and_free_message(node);
|
||||
#else
|
||||
if (cb != NULL) {
|
||||
cb(node->module_id, node->buffer, node->length);
|
||||
}
|
||||
msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node);
|
||||
msg_free(node);
|
||||
#endif
|
||||
|
||||
|
||||
return 0;
|
||||
@@ -390,7 +519,8 @@ int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb)
|
||||
|
||||
/* 将消息复制出接口处理,不建议使用 */
|
||||
#if 1
|
||||
unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out)
|
||||
unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_module_id,
|
||||
unsigned char *buf_out, int ms_timeout)
|
||||
{
|
||||
has_msg_node_t *node;
|
||||
has_static_sub_list_t *sub_list;
|
||||
@@ -403,6 +533,12 @@ unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_modul
|
||||
|
||||
sub_list = &g_sub_list[g_sub_list_index[module_id]];
|
||||
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) {
|
||||
return 0; // 没有消息需要处理或发生错误
|
||||
}
|
||||
#endif
|
||||
|
||||
/* 处理一个消息,先入先出 */
|
||||
msg_mutex_lock(&sub_list->msg_mutex);
|
||||
if (!list_empty(&sub_list->msg_list)) {
|
||||
@@ -414,7 +550,6 @@ unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_modul
|
||||
return 0;
|
||||
}
|
||||
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
memcpy(buf_out, node->msg_buff->buffer, node->msg_buff->length);
|
||||
if (pub_module_id != NULL) {
|
||||
*pub_module_id = node->msg_buff->module_id;
|
||||
@@ -422,20 +557,11 @@ unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_modul
|
||||
len = node->msg_buff->length;
|
||||
msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id);
|
||||
msg_check_and_free_message(node);
|
||||
#else
|
||||
memcpy(buf_out, node->buffer, node->length);
|
||||
if (pub_module_id != NULL) {
|
||||
*pub_module_id = node->module_id;
|
||||
}
|
||||
|
||||
len = node->length;
|
||||
msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node);
|
||||
msg_free(node);
|
||||
#endif
|
||||
return len;
|
||||
}
|
||||
|
||||
unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out)
|
||||
unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pub_module_id,
|
||||
unsigned char *buf_out, int ms_timeout)
|
||||
{
|
||||
has_msg_node_t *node;
|
||||
has_static_sub_list_t *sub_list;
|
||||
@@ -448,6 +574,12 @@ unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pu
|
||||
|
||||
sub_list = &g_sub_list[g_sub_list_index[module_id]];
|
||||
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) {
|
||||
return 0; // 没有消息需要处理或发生错误
|
||||
}
|
||||
#endif
|
||||
|
||||
/* 处理最新的消息 */
|
||||
msg_mutex_lock(&sub_list->msg_mutex);
|
||||
if (!list_empty(&sub_list->msg_list)) {
|
||||
@@ -459,7 +591,6 @@ unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pu
|
||||
return 0;
|
||||
}
|
||||
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
memcpy(buf_out, node->msg_buff->buffer, node->msg_buff->length);
|
||||
if (pub_module_id != NULL) {
|
||||
*pub_module_id = node->msg_buff->module_id;
|
||||
@@ -467,25 +598,19 @@ unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pu
|
||||
len = node->msg_buff->length;
|
||||
msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id);
|
||||
msg_check_and_free_message(node);
|
||||
#else
|
||||
memcpy(buf_out, node->buffer, node->length);
|
||||
if (pub_module_id != NULL) {
|
||||
*pub_module_id = node->module_id;
|
||||
}
|
||||
|
||||
len = node->length;
|
||||
msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node);
|
||||
msg_free(node);
|
||||
#endif
|
||||
return len;
|
||||
}
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief 判断消息链表是否是空,没有消息要处理就是空
|
||||
*
|
||||
* @param module_id:要查空的模块id号
|
||||
* @return 0:没有消息 -1:有消息待处理
|
||||
*/
|
||||
unsigned int has_msg_is_message_empty(has_module_ID_e module_id)
|
||||
{
|
||||
has_msg_node_t *node, *temp;
|
||||
has_static_sub_list_t *sub_list;
|
||||
unsigned int num = 0;
|
||||
MSG_CHECK_ID_AND_INDEX(module_id, -1);
|
||||
|
||||
sub_list = &g_sub_list[g_sub_list_index[module_id]];
|
||||
@@ -500,6 +625,12 @@ unsigned int has_msg_is_message_empty(has_module_ID_e module_id)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief 获取待处理消息的个数
|
||||
*
|
||||
* @param module_id:处理消息的模块id号
|
||||
* @return num : 消息数量
|
||||
*/
|
||||
unsigned int has_msg_get_message_number(has_module_ID_e module_id)
|
||||
{
|
||||
has_msg_node_t *node, *temp;
|
||||
@@ -532,16 +663,19 @@ int has_msg_printf_subscribe(has_module_ID_e module_id)
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief 删除所有未处理的消息
|
||||
*
|
||||
* @param module_id:处理消息的模块id号
|
||||
* @return 0:成功 -1:失败
|
||||
*/
|
||||
int has_msg_delete_all_message(has_module_ID_e module_id)
|
||||
{
|
||||
has_msg_node_t *node, *temp;
|
||||
has_static_sub_list_t *sub_list;
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
struct list_head free_list;
|
||||
has_msg_node_t *free_temp;
|
||||
unsigned char id;
|
||||
INIT_LIST_HEAD(&free_list);
|
||||
#endif
|
||||
int ret = 0;
|
||||
MSG_CHECK_ID_AND_INDEX(module_id, -1);
|
||||
|
||||
sub_list = &g_sub_list[g_sub_list_index[module_id]];
|
||||
@@ -549,36 +683,24 @@ int has_msg_delete_all_message(has_module_ID_e module_id)
|
||||
msg_mutex_lock(&sub_list->msg_mutex);
|
||||
if (!list_empty(&sub_list->msg_list)) {
|
||||
list_for_each_entry_safe(node, temp, &sub_list->msg_list, list) {
|
||||
list_del(&node->list);
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
list_add_tail(&node->list, &free_list); // 加入临时链表
|
||||
#else
|
||||
msg_debug("id:%d free addr:0x%p\n", module_id, node);
|
||||
msg_free(node);
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
if (msg_sem_wait(&sub_list->sem, 0) != WAIT_MSG_COME) {
|
||||
msg_printf("error: list number != sem number\n");
|
||||
ret = -1;
|
||||
continue; // eventfd和list不一致,理论上一定是 WAIT_MSG_COME
|
||||
}
|
||||
#endif
|
||||
list_del(&node->list);
|
||||
list_add_tail(&node->list, &free_list); // 加入临时链表
|
||||
}
|
||||
}
|
||||
INIT_LIST_HEAD(&sub_list->msg_list);
|
||||
// INIT_LIST_HEAD(&sub_list->msg_list);
|
||||
msg_mutex_unlock(&sub_list->msg_mutex);
|
||||
#ifdef MSG_OPT_PUB_LIST
|
||||
/* 释放内存 */
|
||||
list_for_each_entry_safe(node, temp, &free_list, list) {
|
||||
/* msg_check_and_free_message(node); */
|
||||
free_temp = NULL;
|
||||
id = node->msg_buff->module_id;
|
||||
msg_mutex_lock(&g_pub_list[id].buffer_mutex);
|
||||
node->msg_buff->node_cnt --;
|
||||
if (node->msg_buff->node_cnt == 0) {
|
||||
free_temp = ((has_msg_node_t *)node->msg_buff - g_pub_list[id].pub_num);
|
||||
}
|
||||
msg_mutex_unlock(&g_pub_list[id].buffer_mutex);
|
||||
if (free_temp) {
|
||||
msg_debug("id:%d free addr:0x%p\n", module_id, free_temp);
|
||||
msg_free(free_temp);
|
||||
}
|
||||
msg_check_and_free_message(node);
|
||||
}
|
||||
#endif
|
||||
return 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
#ifdef MSG_OPT_DYNAMIC_SUB
|
||||
@@ -596,7 +718,7 @@ int has_msg_subscribe(has_module_ID_e module_id, has_module_ID_e sub_id)
|
||||
if (sub_list->sub_module_cnt < (MODULE_MAX - 1)) {
|
||||
/* 检查是否已经订阅 */
|
||||
for (j = 0; j < sub_list->sub_module_cnt; j++) {
|
||||
if (sub_id == sub_list->sub_module[j]) {
|
||||
if ((unsigned char)sub_id == sub_list->sub_module[j]) {
|
||||
msg_mutex_unlock(&sub_list->msg_mutex);
|
||||
msg_printf("ID:%d has been sub by module:%d\n", sub_id, module_id);
|
||||
return -1;
|
||||
@@ -626,7 +748,7 @@ int has_msg_unsubscribe(has_module_ID_e module_id, has_module_ID_e sub_id)
|
||||
|
||||
msg_mutex_lock(&sub_list->msg_mutex);
|
||||
for (int j = 0; j < sub_list->sub_module_cnt; j++) {
|
||||
if (sub_id == sub_list->sub_module[j]) {
|
||||
if ((unsigned char)sub_id == sub_list->sub_module[j]) {
|
||||
sub_list->sub_module[j] = INVALID_ID;
|
||||
sub_list->sub_module_cnt --;
|
||||
}
|
||||
|
||||
219
mw/has_task_msg_manager/has_task_msg_os_port.c
Normal file
219
mw/has_task_msg_manager/has_task_msg_os_port.c
Normal file
@@ -0,0 +1,219 @@
|
||||
/*
|
||||
* has_task_msg_manager/has_task_os_port.c
|
||||
*
|
||||
* Author: zhangzhaopeng
|
||||
* Version: 1.0
|
||||
* Created on : 2026-01-05
|
||||
* 系统适配层
|
||||
*/
|
||||
#include <stdint.h>
|
||||
#include "has_task_msg_os_port.h"
|
||||
|
||||
// TODO: 信号量限制最大数量
|
||||
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
/**
|
||||
* @brief 互斥量初始化
|
||||
*
|
||||
* @param mutex:互斥量
|
||||
* @return 0:成功 -1:失败
|
||||
*/
|
||||
int msg_mutex_init(msg_mutex_t *mutex)
|
||||
{
|
||||
#if (MSG_OPT_OS == MSG_OPT_OS_LINUX)
|
||||
if (pthread_mutex_init(mutex, NULL) != 0) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_freeRTOS)
|
||||
*mutex = xSemaphoreCreateMutex();
|
||||
if (*mutex == NULL) {
|
||||
msg_printf("create mutex err!!\n\n");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_RT_THREAD)
|
||||
*mutex = rt_mutex_create("has_msg", RT_IPC_FLAG_FIFO);
|
||||
if (*mutex == RT_NULL) {
|
||||
msg_printf("create mutex err!!\n\n");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_USER_DEFINED)
|
||||
/* to be done */
|
||||
/* example:xr806 */
|
||||
if (OS_MutexCreate(mutex) != OS_OK) {
|
||||
msg_printf("create mutex err!!\n\n");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief 信号量初始化
|
||||
*
|
||||
* @param sem:信号量
|
||||
* @return 0:成功 -1:失败
|
||||
*/
|
||||
int msg_sem_init(msg_sem_t *sem)
|
||||
{
|
||||
#if (MSG_OPT_OS == MSG_OPT_OS_LINUX)
|
||||
sem->poll_fd.fd = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK);
|
||||
if (sem->poll_fd.fd == -1) {
|
||||
msg_printf("create eventfd error!\n");
|
||||
return -1;
|
||||
}
|
||||
sem->poll_fd.events = POLLIN; // 监控可读事件
|
||||
sem->poll_fd.revents = 0;
|
||||
return 0;
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_freeRTOS)
|
||||
*sem = xSemaphoreCreateCounting(UINT_MAX, 0);
|
||||
if (*sem == NULL) {
|
||||
msg_printf("create counting sem error!\n");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_RT_THREAD)
|
||||
*sem = rt_sem_create("has_sem", 0, RT_IPC_FLAG_FIFO);
|
||||
if (*sem == RT_NULL) {
|
||||
msg_printf("create counting sem error!\n");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_USER_DEFINED)
|
||||
/* to be done */
|
||||
/* example:xr806 */
|
||||
if (OS_SemaphoreCreate(sem, 0, UINT_MAX) != OS_OK) {
|
||||
msg_printf("create counting sem error!\n");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief 消息通知
|
||||
*
|
||||
* @param sem:信号量
|
||||
* @return 0:成功 -1:失败
|
||||
*/
|
||||
int msg_sem_notify(msg_sem_t *sem)
|
||||
{
|
||||
#if (MSG_OPT_OS == MSG_OPT_OS_LINUX)
|
||||
uint64_t val = 1;
|
||||
if (write(sem->poll_fd.fd, &val, sizeof(val)) != sizeof(val)) {
|
||||
msg_printf("notify thread err!\n");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_freeRTOS)
|
||||
if (xSemaphoreGive(*sem) != pdTRUE) {
|
||||
msg_printf("notify thread err!\n");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_RT_THREAD)
|
||||
if (rt_sem_release(*sem) != RT_EOK) {
|
||||
msg_printf("notify thread err!\n");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_USER_DEFINED)
|
||||
/* to be done */
|
||||
/* example:xr806 */
|
||||
if (OS_SemaphoreRelease(sem) != OS_OK) {
|
||||
msg_printf("notify thread err!\n");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief 等待消息
|
||||
*
|
||||
* @param sem:信号量
|
||||
* @param ms_timeout:超时时间(ms)其中0:消息直接返回;-1:一直阻塞
|
||||
* @return MSG_WAIT_RET:
|
||||
* WAIT_FAIL = -1, 等待失败
|
||||
* WAIT_TIMEOUT = -1, 等待超时
|
||||
* WAIT_MSG_COME = -1, 有新消息
|
||||
*/
|
||||
enum MSG_WAIT_RET msg_sem_wait(msg_sem_t *sem, int ms_timeout)
|
||||
{
|
||||
#if (MSG_OPT_OS == MSG_OPT_OS_LINUX)
|
||||
uint64_t val;
|
||||
int ret = poll(&sem->poll_fd, 1, ms_timeout);
|
||||
if (ret == -1) {
|
||||
perror("poll fail\n");
|
||||
return WAIT_FAIL;
|
||||
} else if (ret == 0) {
|
||||
// msg_printf("poll timeout\n");
|
||||
return WAIT_TIMEOUT;
|
||||
}
|
||||
|
||||
if (sem->poll_fd.revents & POLLIN) {
|
||||
if (read(sem->poll_fd.fd, &val, sizeof(val)) == sizeof(val)) { // 清零
|
||||
// msg_printf("read %ld\n", val);
|
||||
return WAIT_MSG_COME;
|
||||
}
|
||||
else {
|
||||
msg_printf("read error\n");
|
||||
}
|
||||
}
|
||||
|
||||
// 检查错误事件
|
||||
msg_printf("something err happened,revents:%x\n", sem->poll_fd.revents);
|
||||
return WAIT_FAIL;
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_freeRTOS)
|
||||
TickType_t xTicksToWait;
|
||||
if (ms_timeout < 0) {
|
||||
xTicksToWait = portMAX_DELAY;
|
||||
} else {
|
||||
xTicksToWait = pdMS_TO_TICKS(ms_timeout);
|
||||
}
|
||||
|
||||
if (xSemaphoreTake(*sem, xTicksToWait) != pdTRUE) {
|
||||
// msg_printf("sem take fail\n");
|
||||
return WAIT_FAIL;
|
||||
}
|
||||
return WAIT_MSG_COME;
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_RT_THREAD)
|
||||
rt_err_t ret;
|
||||
rt_int32_t time;
|
||||
if (ms_timeout < 0) {
|
||||
time = RT_WAITING_FOREVER;
|
||||
} else {
|
||||
time = rt_tick_from_millisecond(ms_timeout);
|
||||
}
|
||||
|
||||
ret = rt_sem_take(*sem, time);
|
||||
|
||||
if (ret == RT_EOK) {
|
||||
return WAIT_MSG_COME;
|
||||
} else if (ret == (-RT_ETIMEOUT)) {
|
||||
return WAIT_TIMEOUT;
|
||||
} else {
|
||||
msg_printf("sem take fail\n");
|
||||
return WAIT_FAIL;
|
||||
}
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_USER_DEFINED)
|
||||
/* to be done */
|
||||
/* example:xr806 */
|
||||
OS_Time_t waitMS;
|
||||
if (ms_timeout < 0) {
|
||||
waitMS = OS_WAIT_FOREVER;
|
||||
} else {
|
||||
waitMS = (OS_Time_t)ms_timeout;
|
||||
}
|
||||
|
||||
if (OS_SemaphoreWait(sem, waitMS) != OS_OK) {
|
||||
// msg_printf("sem take fail\n");
|
||||
return WAIT_FAIL;
|
||||
}
|
||||
return WAIT_MSG_COME;
|
||||
#endif
|
||||
}
|
||||
|
||||
#endif
|
||||
@@ -7,15 +7,22 @@
|
||||
#define MSG_OPT_OS_NONE_OR_OSAL 4
|
||||
#define MSG_OPT_OS_USER_DEFINED 5
|
||||
|
||||
/* TODO:rtt和freertos未测过 */
|
||||
/* TODO:rtt未验证 */
|
||||
#define MSG_OPT_OS MSG_OPT_OS_LINUX
|
||||
// #define MSG_OPT_OS MSG_OPT_OS_freeRTOS
|
||||
// #define MSG_OPT_OS MSG_OPT_OS_RT_THREAD
|
||||
// #define MSG_OPT_OS MSG_OPT_OS_NONE_OR_OSAL // 裸机或osal
|
||||
// #define MSG_OPT_OS MSG_OPT_OS_USER_DEFINED // 自定义
|
||||
|
||||
#define MSG_OPT_USE_MUTEX // 支持互斥量
|
||||
#define MSG_OPT_MUTEX_SEM // 支持互斥量和信号量
|
||||
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
enum MSG_WAIT_RET{
|
||||
WAIT_FAIL = -1, // 等待失败
|
||||
WAIT_TIMEOUT, // 等待超时
|
||||
WAIT_MSG_COME, // 需要处理消息
|
||||
};
|
||||
#endif
|
||||
|
||||
/* ----------------------- for Linux ----------------------- */
|
||||
#if (MSG_OPT_OS == MSG_OPT_OS_LINUX)
|
||||
@@ -23,15 +30,22 @@
|
||||
#include <pthread.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include "list.h"
|
||||
|
||||
#ifdef MSG_OPT_USE_MUTEX
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
#include <poll.h>
|
||||
#include <sys/eventfd.h>
|
||||
#include <unistd.h>
|
||||
#define msg_mutex_t pthread_mutex_t
|
||||
#define msg_mutex_init(m) pthread_mutex_init(m, NULL)
|
||||
#define msg_mutex_lock(m) pthread_mutex_lock(m)
|
||||
#define msg_mutex_unlock(m) pthread_mutex_unlock(m)
|
||||
typedef struct {
|
||||
struct pollfd poll_fd;
|
||||
} sync_t;
|
||||
#define msg_sem_t sync_t
|
||||
|
||||
#else
|
||||
#define msg_mutex_t
|
||||
#define msg_mutex_init(m)
|
||||
#define msg_mutex_lock(m)
|
||||
#define msg_mutex_unlock(m)
|
||||
#endif
|
||||
@@ -44,79 +58,74 @@
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_freeRTOS)
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <limits.h>
|
||||
#include "FreeRTOS.h"
|
||||
#include "semphr.h"
|
||||
#ifdef MSG_OPT_USE_MUTEX
|
||||
#define msg_mutex_t SemaphoreHandle_t
|
||||
static inline void msg_mutex_init(void *mutex) {
|
||||
*mutex = xSemaphoreCreateMutex();
|
||||
if (*mutex == NULL) {
|
||||
printf("create mutex err!!\n\n");
|
||||
}
|
||||
}
|
||||
|
||||
static inline void msg_mutex_lock(void *mutex) {
|
||||
if (xSemaphoreTake(*mutex, portMAX_DELAY) != pdTRUE) {
|
||||
printf("lock mutex err!!\n\n");
|
||||
}
|
||||
}
|
||||
|
||||
static inline void msg_mutex_unlock(void *mutex) {
|
||||
xSemaphoreGive(*mutex);
|
||||
}
|
||||
#else
|
||||
#define msg_mutex_t
|
||||
#define msg_mutex_init(m)
|
||||
#define msg_mutex_lock(m)
|
||||
#define msg_mutex_unlock(m)
|
||||
#endif
|
||||
#include "list.h"
|
||||
|
||||
#define msg_malloc pvPortMalloc
|
||||
#define msg_free vPortFree
|
||||
#define msg_printf printf
|
||||
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
#define msg_mutex_t SemaphoreHandle_t
|
||||
|
||||
static inline void msg_mutex_lock(msg_mutex_t *mutex) {
|
||||
if (xSemaphoreTake(*mutex, portMAX_DELAY) != pdTRUE) {
|
||||
msg_printf("lock mutex err!!\n\n");
|
||||
}
|
||||
}
|
||||
|
||||
static inline void msg_mutex_unlock(msg_mutex_t *mutex) {
|
||||
xSemaphoreGive(*mutex);
|
||||
}
|
||||
|
||||
#define msg_sem_t SemaphoreHandle_t
|
||||
|
||||
#else
|
||||
#define msg_mutex_t
|
||||
#define msg_mutex_lock(m)
|
||||
#define msg_mutex_unlock(m)
|
||||
#endif
|
||||
/* ----------------------- for RT-thread ----------------------- */
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_RT_THREAD)
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <rtthread.h>
|
||||
#ifdef MSG_OPT_USE_MUTEX
|
||||
#define msg_mutex_t rt_mutex_t
|
||||
static inline void msg_mutex_init(void *mutex) {
|
||||
*mutex = rt_mutex_create("has_msg", RT_IPC_FLAG_FIFO);
|
||||
if (*mutex == RT_NULL) {
|
||||
printf("create mutex err!!\n\n");
|
||||
}
|
||||
}
|
||||
|
||||
static inline void msg_mutex_lock(void *mutex) {
|
||||
if (rt_mutex_take(*mutex, RT_WAITING_FOREVER) != RT_EOK) {
|
||||
printf("lock mutex err!!\n\n");
|
||||
}
|
||||
}
|
||||
|
||||
static inline void msg_mutex_unlock(void *mutex) {
|
||||
rt_mutex_release(*mutex);
|
||||
}
|
||||
#else
|
||||
#define msg_mutex_t
|
||||
#define msg_mutex_init(m)
|
||||
#define msg_mutex_lock(m)
|
||||
#define msg_mutex_unlock(m)
|
||||
#endif
|
||||
#include "list.h"
|
||||
|
||||
#define msg_malloc rt_malloc
|
||||
#define msg_free rt_free
|
||||
#define msg_printf printf
|
||||
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
#define msg_mutex_t rt_mutex_t
|
||||
|
||||
static inline void msg_mutex_lock(msg_mutex_t *mutex) {
|
||||
if (rt_mutex_take(*mutex, RT_WAITING_FOREVER) != RT_EOK) {
|
||||
msg_printf("lock mutex err!!\n\n");
|
||||
}
|
||||
}
|
||||
|
||||
static inline void msg_mutex_unlock(msg_mutex_t *mutex) {
|
||||
rt_mutex_release(*mutex);
|
||||
}
|
||||
|
||||
#define msg_sem_t rt_sem_t
|
||||
|
||||
#else
|
||||
#define msg_mutex_t
|
||||
#define msg_mutex_lock(m)
|
||||
#define msg_mutex_unlock(m)
|
||||
#endif
|
||||
/* ----------------------- for none os ----------------------- */
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_NONE_OR_OSAL)
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#undef MSG_OPT_USE_MUTEX
|
||||
#include "list.h"
|
||||
#undef MSG_OPT_MUTEX_SEM
|
||||
|
||||
#define msg_mutex_t
|
||||
#define msg_mutex_init(m)
|
||||
#define msg_mutex_lock(m)
|
||||
#define msg_mutex_unlock(m)
|
||||
|
||||
@@ -126,37 +135,50 @@ static inline void msg_mutex_unlock(void *mutex) {
|
||||
|
||||
/* ----------------------- for user define ----------------------- */
|
||||
#elif (MSG_OPT_OS == MSG_OPT_OS_USER_DEFINED)
|
||||
/* 下面示例是基于xr806移植 */
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <limits.h>
|
||||
#include "list.h"
|
||||
|
||||
#ifdef MSG_OPT_USE_MUTEX
|
||||
#define msg_mutex_t int/* to be done */
|
||||
static inline void msg_mutex_init(void *mutex) {
|
||||
/* to be done */
|
||||
#include "../has_wifi_slab.h"
|
||||
#include "../kernel/os/os.h"
|
||||
|
||||
#define msg_malloc test_slab_malloc
|
||||
#define msg_free test_slab_free
|
||||
#define msg_printf printf
|
||||
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
#define msg_mutex_t OS_Mutex_t
|
||||
|
||||
static inline void msg_mutex_lock(msg_mutex_t *mutex) {
|
||||
if (OS_MutexLock(mutex, OS_WAIT_FOREVER) != OS_OK) {
|
||||
msg_printf("lock mutex err!!\n\n");
|
||||
}
|
||||
}
|
||||
|
||||
static inline void msg_mutex_lock(void *mutex) {
|
||||
/* to be done */
|
||||
static inline void msg_mutex_unlock(msg_mutex_t *mutex) {
|
||||
OS_MutexUnlock(mutex);
|
||||
}
|
||||
|
||||
static inline void msg_mutex_unlock(void *mutex) {
|
||||
/* to be done */
|
||||
}
|
||||
#define msg_sem_t OS_Semaphore_t
|
||||
|
||||
#else
|
||||
#define msg_mutex_t
|
||||
#define msg_mutex_init(m)
|
||||
#define msg_mutex_lock(m)
|
||||
#define msg_mutex_unlock(m)
|
||||
#endif
|
||||
|
||||
|
||||
#define msg_malloc malloc
|
||||
#define msg_free free
|
||||
#define msg_printf printf
|
||||
|
||||
#else
|
||||
#error "!Undefined system environment!"
|
||||
#endif
|
||||
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#ifdef MSG_OPT_MUTEX_SEM
|
||||
int msg_mutex_init(msg_mutex_t *mutex);
|
||||
int msg_sem_init(msg_sem_t *sem);
|
||||
int msg_sem_notify(msg_sem_t *sem);
|
||||
enum MSG_WAIT_RET msg_sem_wait(msg_sem_t *sem, int ms_timeout);
|
||||
#endif
|
||||
|
||||
@@ -43,8 +43,8 @@ void *gui_function(void *arg)
|
||||
has_msg_publish(GUI, gui_publish_buffer3, sizeof(gui_publish_buffer3)); // GUI发布第三条消息
|
||||
while (1)
|
||||
{
|
||||
usleep(500 * 1000);
|
||||
has_msg_handle(GUI, gui_handle_cb); // 处理一条GUI收到的消息,先入先出
|
||||
// usleep(500 * 1000);
|
||||
has_msg_handle(GUI, gui_handle_cb, 500); // 处理一条GUI收到的消息,先入先出
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,9 +63,9 @@ void *sensor_function(void *arg)
|
||||
has_msg_publish(moude_ID, sensor_publish_buffer2, sizeof(sensor_publish_buffer2)); // SENSOR发布第二条消息
|
||||
while (1)
|
||||
{
|
||||
usleep(500 * 1000);
|
||||
has_msg_handle(moude_ID, sensor_handle_cb); // 处理一条SENSOR收到的消息,先入先出
|
||||
has_msg_handle_latest(moude_ID, sensor_handle_cb); // 处理一条SENSOR最新收到的消息
|
||||
// usleep(500 * 1000);
|
||||
has_msg_handle(moude_ID, sensor_handle_cb, 500); // 处理一条SENSOR收到的消息,先入先出
|
||||
has_msg_handle_latest(moude_ID, sensor_handle_cb, 0); // 处理一条SENSOR最新收到的消息
|
||||
if (has_msg_is_message_empty(moude_ID) != 0) // 判断是否还有消息未处理
|
||||
{
|
||||
printf("模块:%d 还有%d条消息待处理\n", moude_ID, has_msg_get_message_number(moude_ID));
|
||||
|
||||
Reference in New Issue
Block a user