Compare commits

...

8 Commits

Author SHA1 Message Date
9df7284314 同步更新示例 2026-01-12 18:01:49 +08:00
a02dc81bbf 兼容freeRTOS系统支持,兼容xr806 2026-01-12 17:46:14 +08:00
2f4c00fd3d 1.修复删除所有消息接口的bug
2.完善初始化时的错误处理
3.添加freeRTOS的支持
4.添加rt-thread的支持
5.消除一些编译警告
2026-01-08 17:30:22 +08:00
403447b4bd 优化发布逻辑,修复by_module链表逻辑错误 2026-01-06 20:13:03 +08:00
49db80b5fe 添加消息同步处理逻辑,添加Linux信号量的实现 2026-01-06 15:40:03 +08:00
83b013c678 调整格式,添加注释和说明 2025-12-30 15:39:29 +08:00
79a5899717 仅使用发布表,一次分配内存,去除发布时多次分配内存的逻辑 2025-12-30 10:48:59 +08:00
3f378c0531 添加预留的OS初始化接口 2025-12-30 10:45:40 +08:00
6 changed files with 691 additions and 304 deletions

View File

@@ -0,0 +1,5 @@
这个软件用于代码模块之间的消息传递,采用发布-订阅的方式传递消息
使用方法:
1. 在 has_task_msg_os_port.h 选择操作系统更改宏MSG_OPT_OS。或选择自定义自己实现接口如果自己定义需要自己实现has_task_msg_os_port.c的相应接口。
2. 在 has_task_msg.h 中添加自己负责的软件模块id号has_module_ID_e并且完成订阅表 SUBSCIBE_INFO 要把模块的id、模块要订阅的id等信息填好。
3. 代码中使用初始化、发布、订阅和消息回调即可。接口在has_task_msg.h中声明请参考示例 msg_example.c

View File

@@ -15,23 +15,42 @@ typedef enum {
MODULE_MAX
} has_module_ID_e;
#define CHECK_SUBSCIBE_INFO // 如果打开,代码初始化时会检查订阅表是否写错,开发时应该打开
/* 格式:模块 订阅数量 订阅模块 */
#define SUBSCIBE_INFO \
{ \
{GUI, 4, {ACM, SENSOR, WIFI, VOICE}}, \
{SENSOR, 1, {GUI}}, \
{ACM, 1, {WIFI}}, \
{SENSOR, 2, {GUI, WIFI}}, \
{WIFI, 0}, \
{VOICE, 3, {ACM, VOICE, GUI}}, \
}
/* 消息处理回调 */
/*
#define SUBSCIBE_INFO \
{ \
{GUI, 4, {ACM, WIFI, VOICE, SENSOR}}, \
{SENSOR, 5, {ACM, GUI, WIFI, VOICE, SENSOR}}, \
{ACM, 4, {GUI, WIFI, VOICE, SENSOR}}, \
{WIFI, 4, {ACM, GUI, VOICE, SENSOR}}, \
{VOICE, 4, {ACM, WIFI, GUI, SENSOR}}, \
}
*/
/**
* @brief 消息处理回调类型
* @param module_id:发布者的id号
* @param buf:消息数据指针
* @param len:消息数据长度
* @return 0:成功 -1失败
*/
typedef int (*has_msg_handle_cb)(unsigned char module_id, const unsigned char *buf, unsigned int len);
/* 基础功能 */
int has_msg_init(void);
int has_msg_os_init(void);
int has_msg_init_module(has_module_ID_e module_id); // HAS OS调用
int has_msg_publish(has_module_ID_e module_id, void *buffer, unsigned int length);
int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb);
int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb, int ms_timeout);
unsigned int has_msg_is_message_empty(has_module_ID_e module_id);
unsigned int has_msg_get_message_number(has_module_ID_e module_id);
@@ -39,9 +58,9 @@ int has_msg_delete_all_message(has_module_ID_e module_id);
/* 扩展功能 */
int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb, has_module_ID_e pub_module_id);
int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb);
unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out);
unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out);
int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb, int ms_timeout);
unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out, int ms_timeout);
unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out, int ms_timeout);
int has_msg_printf_subscribe(has_module_ID_e module_id);
// int has_msg_subscribe(has_module_ID_e module_id, has_module_ID_e sub_id);
// int has_msg_unsubscribe(has_module_ID_e module_id, has_module_ID_e sub_id);

View File

@@ -1,15 +1,22 @@
/*
* has_task_msg_manager/has_task_msg_manager.c
*
* Author: zhangzhaopeng
* Version 1.0
* Created on : 2025-12-29
* 这个软件用于代码模块之间的消息传递,采用发布-订阅的方式传递消息
*
*/
#include <string.h>
#include "list.h"
#include "has_task_msg.h"
#include "has_task_msg_os_port.h"
// #define MSG_OPT_DYNAMIC_SUB // 允许动态订阅
// #define MSG_OPT_DEBUG // 调试
#define MSG_OPT_PUB_LIST // 使用静态发布表(多条消息单次分配内存)
/* 暂时不支持发布列表和动态订阅同时使用,后续需要再更新支持 */
#if defined(MSG_OPT_DYNAMIC_SUB) && defined(MSG_OPT_PUB_LIST)
#error "These two functions are not currently supported to be used together."
#if defined(MSG_OPT_DYNAMIC_SUB)
#error "These functions is not currently supported to be used together."
#endif
#ifdef MSG_OPT_DEBUG
@@ -18,10 +25,10 @@
#define msg_debug(fmt, ...)
#endif
#define MSG_ASSERT {while(1);}
#define MSG_ASSERT \
{msg_printf("!!!!!wrong config!!!!! file:%s line:%d\n", __FILE__, __LINE__);while(1);}
#pragma pack(1)
#ifdef MSG_OPT_PUB_LIST
/* 消息数据 */
typedef struct {
unsigned char module_id; // 发布的模块ID
@@ -34,28 +41,17 @@ typedef struct {
struct list_head list; // 消息链表
has_msg_buffer_t *msg_buff;
} has_msg_node_t;
#else
/* 消息节点和数据 */
typedef struct {
unsigned char module_id; // 发布的模块ID
struct list_head list; // 消息链表
unsigned int length;
unsigned char buffer[];
} has_msg_node_t;
#endif
#pragma pack()
#ifdef MSG_OPT_PUB_LIST
/* 静态发布列表 */
typedef struct {
unsigned char pub_module[MODULE_MAX - 1]; // 发布表
#ifdef MSG_OPT_USE_MUTEX
#ifdef MSG_OPT_MUTEX_SEM
msg_mutex_t buffer_mutex; // 消息互斥量
#endif
unsigned char pub_num;
} has_static_pub_list_t;
static has_static_pub_list_t g_pub_list[MODULE_MAX] = {0};
#endif
/* 静态订阅列表 */
typedef struct {
@@ -68,13 +64,14 @@ typedef struct {
const unsigned char sub_module_cnt; // 订阅数量
const unsigned char sub_module[MODULE_MAX - 1]; // 订阅表
#endif
#ifdef MSG_OPT_USE_MUTEX
#ifdef MSG_OPT_MUTEX_SEM
msg_mutex_t msg_mutex; // 消息互斥量
msg_sem_t sem;
#endif
struct list_head msg_list; // 消息链表
} has_static_sub_list_t;
static has_static_sub_list_t g_sub_list[] = SUBSCIBE_INFO;
static unsigned char g_sub_list_index[MODULE_MAX] = {0}; // 订阅表id索引
static unsigned char g_sub_list_index[MODULE_MAX]; // 订阅表id索引,同时表示模块是否可用
/* 检查ID是否合法id:检查的IDret:不合法时return的值 */
@@ -89,53 +86,85 @@ if (((id) >= MODULE_MAX) || ((id) == INVALID_ID)) { \
if (g_sub_list_index[(id)] == MODULE_MAX) { \
return (ret);}
/**
* @brief 初始化消息表,必须在所有接口调用之前调用init
*
* @return 0:成功 -1失败
*/
int has_msg_init()
{
int j;
has_static_pub_list_t *pub_list;
/* 设置无效,标记是否初始化 */
memset(g_sub_list_index, MODULE_MAX, sizeof(g_sub_list_index));
#ifdef MSG_OPT_PUB_LIST
memset(g_pub_list, 0, sizeof(g_pub_list));
#endif
for (int i = 0; i < (sizeof(g_sub_list) / sizeof(g_sub_list[0])); i++) {
for (unsigned int i = 0; i < (sizeof(g_sub_list) / sizeof(g_sub_list[0])); i++) {
#ifdef CHECK_SUBSCIBE_INFO
/* 检查各模块ID是否配置正确 */
if ((g_sub_list[i].module_id >= MODULE_MAX) || (g_sub_list[i].module_id == INVALID_ID)) {
msg_printf("module_id:%d is invalid, please check the macro:SUBSCIBE_INFO row:%d\n", g_sub_list[i].module_id, i);
msg_printf("module_id:%d is invalid, please check the macro:SUBSCIBE_INFO row:%d\n"
, g_sub_list[i].module_id, i);
MSG_ASSERT; // 配置都能写错,必须进断言
return -1;
}
/* 已经被初始化了,配置错误 */
if (g_sub_list_index[g_sub_list[i].module_id] != MODULE_MAX) {
msg_printf("[msg warning]: msg two same module id:%d\n", g_sub_list[i].module_id);
MSG_ASSERT; // 配置都能写错,必须进断言
}
/* 检查订阅表ID是否配置正确 */
for (j = 0; j < (MODULE_MAX - 1); j++) {
if (g_sub_list[i].sub_module[j] == INVALID_ID) {
break;
} else if (g_sub_list[i].sub_module[j] >= MODULE_MAX) {
msg_printf("sub_module id:%d is invalid, module:id %d\nplease check the macro:"
" SUBSCIBE_INFO row:%d, sub list:%d\n", g_sub_list[i].sub_module[j], g_sub_list[i].module_id, i, j);
msg_printf("sub_module id:%d is invalid, module:id %d\n"
"please check the macro: SUBSCIBE_INFO row:%d, sub list:%d\n"
, g_sub_list[i].sub_module[j] , g_sub_list[i].module_id, i, j);
MSG_ASSERT; // 配置都能写错,必须进断言
return -1;
}
}
/* 检查订阅数量是否配置正确 */
if (g_sub_list[i].sub_module_cnt != j) {
msg_printf("sub_module_cnt is config as %d, which actually is %d, please check the"
" macro: SUBSCIBE_INFO row:%d\n", g_sub_list[i].sub_module_cnt, j, i);
msg_printf("sub_module_cnt is config as %d, which actually is %d,"
"please check the macro: SUBSCIBE_INFO row:%d\n"
, g_sub_list[i].sub_module_cnt, j, i);
MSG_ASSERT; // 配置都能写错,必须进断言
return -1;
}
g_sub_list_index[g_sub_list[i].module_id] = i; // 索引表赋值
#ifdef MSG_OPT_PUB_LIST
/* 构建发布表 */
for (int k = 0; k < g_sub_list[i].sub_module_cnt; k++) {
if (g_pub_list[g_sub_list[i].sub_module[k]].pub_num == 0) {
msg_mutex_init(&g_pub_list[g_sub_list[i].sub_module[k]].buffer_mutex);
}
#endif
g_pub_list[g_sub_list[i].sub_module[k]].pub_module[g_pub_list[g_sub_list[i].sub_module[k]].pub_num] = g_sub_list[i].module_id;
g_pub_list[g_sub_list[i].sub_module[k]].pub_num ++;
/* 通知发布者,需要发给此模块 */
for (int k = 0; k < g_sub_list[i].sub_module_cnt; k++) { // 寻找发布者
pub_list = &g_pub_list[g_sub_list[i].sub_module[k]];
#ifdef MSG_OPT_MUTEX_SEM
if (pub_list->pub_num == 0) {
if (msg_mutex_init(&pub_list->buffer_mutex) != 0) {
msg_printf("[msg warning]:pub mutex init err! pub id:%d\n"
, g_sub_list[i].sub_module[k]);
/* 使用计数没有加锁释放msg的内存时有可能出错 */
continue;
}
}
#endif
/* 注册所找到发布者的发布表 */
pub_list->pub_module[pub_list->pub_num] = g_sub_list[i].module_id;
pub_list->pub_num ++;
}
INIT_LIST_HEAD(&g_sub_list[i].msg_list); // 初始化订阅链表
#ifdef MSG_OPT_MUTEX_SEM
if (msg_sem_init(&g_sub_list[i].sem) != 0) { // 初始化该模块同步量
msg_printf("[msg warning]:sub sem init err! sub id:%d\n", i);
continue;
}
if (msg_mutex_init(&g_sub_list[i].msg_mutex) != 0) { // 初始化该模块互斥量
msg_printf("[msg warning]:sub mutex init err! sub id:%d\n", i);
continue;
}
#endif
msg_mutex_init(&g_sub_list[i].msg_mutex); // 初始化该模块互斥量
INIT_LIST_HEAD(&g_sub_list[i].msg_list); // 初始化订阅链表
g_sub_list_index[g_sub_list[i].module_id] = i; // 索引表赋值,相当于enable
}
#ifdef MSG_OPT_DEBUG
// msg_printf("show index:\n");
@@ -150,7 +179,6 @@ int has_msg_init()
// }
// msg_printf("\n");
// }
#ifdef MSG_OPT_PUB_LIST
// msg_printf("\n");
// for (int i = 1; i < MODULE_MAX; i++) {
// msg_printf("module:%d publish to:\n", i);
@@ -160,99 +188,126 @@ int has_msg_init()
// msg_printf("\n");
// }
#endif
return 0;
}
/**
* @brief 根据模块分别初始化消息表
*
* @param module_id:模块id号
* @return 0:成功 -1失败
* 用于HAS OS中的初始化
*/
int has_msg_init_module(has_module_ID_e module_id)
{
unsigned int i;
int j;
has_static_pub_list_t *pub_list;
/* 检查ID */
#ifdef CHECK_SUBSCIBE_INFO
if ((module_id >= MODULE_MAX) || (module_id == INVALID_ID)) {
msg_printf("%s:module_id:%d is invalid\n", __func__, module_id);
MSG_ASSERT; // 配置都能写错,必须进断言
return -1;
}
#endif
/* 初始化该模块订阅表 */
for (i = 0; i < (sizeof(g_sub_list) / sizeof(g_sub_list[0])); i++)
{
if (g_sub_list[i].module_id == (unsigned char)module_id) {
#ifdef CHECK_SUBSCIBE_INFO
/* 已经被初始化了,配置错误 */
if (g_sub_list_index[module_id] != MODULE_MAX) {
msg_printf("[msg warning]: msg two same module id:%d\n", module_id);
MSG_ASSERT; // 配置都能写错,必须进断言
}
/* 检查订阅表ID是否配置正确 */
for (j = 0; j < (MODULE_MAX - 1); j++) {
if (g_sub_list[i].sub_module[j] == INVALID_ID) {
break;
} else if (g_sub_list[i].sub_module[j] >= MODULE_MAX) {
msg_printf("sub_module id:%d is invalid, module:id %d\n"
"please check the macro: SUBSCIBE_INFO row:%d, sub list:%d\n"
, g_sub_list[i].sub_module[j], module_id, i, j);
MSG_ASSERT; // 配置都能写错,必须进断言
return -1;
}
}
/* 检查订阅数量是否配置正确 */
if (g_sub_list[i].sub_module_cnt != j) {
msg_printf("sub_module_cnt is config as %d, which actually is %d,"
"please check the macro: SUBSCIBE_INFO row:%d\n"
, g_sub_list[i].sub_module_cnt, j, i);
MSG_ASSERT; // 配置都能写错,必须进断言
return -1;
}
#endif
/* 通知发布者,需要发给此模块 */
for (int k = 0; k < g_sub_list[i].sub_module_cnt; k++) { // 寻找发布者
pub_list = &g_pub_list[g_sub_list[i].sub_module[k]];
#ifdef MSG_OPT_MUTEX_SEM
if (pub_list->pub_num == 0) {
if (msg_mutex_init(&pub_list->buffer_mutex) != 0) {
msg_printf("[msg warning]:pub mutex init err! pub id:%d\n"
, g_sub_list[i].sub_module[k]);
/* 使用计数没有加锁释放msg的内存时有可能出错 */
continue;
}
}
#endif
/* 注册所找到发布者的发布表 */
pub_list->pub_module[pub_list->pub_num] = module_id;
pub_list->pub_num ++;
}
INIT_LIST_HEAD(&g_sub_list[i].msg_list); // 初始化订阅链表
#ifdef MSG_OPT_MUTEX_SEM
if (msg_sem_init(&g_sub_list[i].sem) != 0) { // 初始化该模块同步量
msg_printf("[msg warning]:sub sem init err! sub id:%d\n", i);
break;
}
if (msg_mutex_init(&g_sub_list[i].msg_mutex) != 0) { // 初始化该模块互斥量
msg_printf("[msg warning]:sub mutex init err! sub id:%d\n", i);
break;
}
#endif
g_sub_list_index[module_id] = i; // 索引表赋值,相当于enable
break;
}
}
#ifdef MSG_OPT_DEBUG
if (i == (sizeof(g_sub_list) / sizeof(g_sub_list[0]))) {
msg_debug("module:%d never subscribe any module\n", module_id);
}
#endif
return 0;
}
int has_msg_publish(has_module_ID_e module_id, void *buffer, unsigned int length)
/* 如果是OS初始化则用下列类似调用方式先清除
需要初始化的模块再调用 has_msg_init_module 传相应ID */
int has_msg_os_init()
{
has_msg_node_t *node;
if (buffer == NULL) {
return -1;
/* 设置无效,标记是否初始化 */
memset(g_sub_list_index, MODULE_MAX, sizeof(g_sub_list_index));
memset(g_pub_list, 0, sizeof(g_pub_list));
for (int i = INVALID_ID + 1; i < MODULE_MAX; i++)
{
has_msg_init_module(i);
}
// if (length == 0) {
// return -1;
// }
#ifdef MSG_OPT_PUB_LIST
has_msg_buffer_t *msg_buffer;
unsigned char sub_index;
MSG_CHECK_MODULE_ID(module_id, -1);
if (g_pub_list[module_id].pub_num == 0) {
// msg_printf("no one subscribe you:%d\n", module_id);
return 0;
}
/* 根据订阅表分配内存 */
node = (has_msg_node_t *)msg_malloc((sizeof(has_msg_node_t) * g_pub_list[module_id].pub_num) + sizeof(has_msg_buffer_t) + length);
if (node == NULL) {
msg_printf("malloc fail,pub id:%d\n", module_id);
return -1;
}
msg_debug("publish: id:%d node addr:0x%p\n", module_id, node);
msg_buffer = (has_msg_buffer_t *)(&node[g_pub_list[module_id].pub_num]);
msg_buffer->node_cnt = g_pub_list[module_id].pub_num;
msg_buffer->length = length;
msg_buffer->module_id = module_id;
memcpy(msg_buffer->buffer, buffer, length);
/* 查订阅表添加消息 */
for (int i = 0; i < g_pub_list[module_id].pub_num; i++) {
sub_index = g_sub_list_index[g_pub_list[module_id].pub_module[i]];
node[i].msg_buff = msg_buffer;
msg_mutex_lock(&g_sub_list[sub_index].msg_mutex);
list_add_tail(&node[i].list, &g_sub_list[sub_index].msg_list); // 加入消息链表
msg_mutex_unlock(&g_sub_list[sub_index].msg_mutex);
}
#else
int i;
msg_debug("publish: id:%d\n", module_id);
for (i = 0; i < (sizeof(g_sub_list) / sizeof(g_sub_list[0])); i++) { // 遍历模块
#ifdef MSG_OPT_DYNAMIC_SUB
msg_mutex_lock(&g_sub_list[i].msg_mutex);
#endif
for (int j = 0; j < g_sub_list[i].sub_module_cnt; j++) { // 查找订阅表
if ((unsigned char)module_id == g_sub_list[i].sub_module[j]) { // 找到订阅信息
node = msg_malloc(sizeof(has_msg_node_t) + length);
msg_debug("node addr:0x%p\n", node);
if (node == NULL) {
msg_printf("malloc fail,pub id:%d\n", module_id);
#ifdef MSG_OPT_DYNAMIC_SUB
msg_mutex_unlock(&g_sub_list[i].msg_mutex);
#endif
return -1;
}
node->length = length;
node->module_id = module_id;
memcpy(node->buffer, buffer, length);
#ifndef MSG_OPT_DYNAMIC_SUB
msg_mutex_lock(&g_sub_list[i].msg_mutex);
#endif
list_add_tail(&node->list, &g_sub_list[i].msg_list); // 加入消息链表
#ifndef MSG_OPT_DYNAMIC_SUB
msg_mutex_unlock(&g_sub_list[i].msg_mutex);
#endif
break; // 不可重复订阅
}
}
#ifdef MSG_OPT_DYNAMIC_SUB
msg_mutex_unlock(&g_sub_list[i].msg_mutex);
#endif
}
#endif
return 0;
}
/* TODO:更合理的方式是每个消息一个互斥量但是可能会导致每个消息malloc和free的时候频繁init和deinit */
#ifdef MSG_OPT_PUB_LIST
/* 检查计数判断该消息是否被所有订阅模块处理完,处理完就释放内存 */
static inline void msg_check_and_free_message(has_msg_node_t *node)
{
has_msg_node_t *temp = NULL;
unsigned char id = node->msg_buff->module_id;
#ifdef MSG_OPT_USE_MUTEX
#ifdef MSG_OPT_MUTEX_SEM
msg_mutex_t *target_mutex = &g_pub_list[id].buffer_mutex;
#endif
msg_mutex_lock(target_mutex);
@@ -262,13 +317,84 @@ static inline void msg_check_and_free_message(has_msg_node_t *node)
}
msg_mutex_unlock(target_mutex);
if (temp) {
msg_debug("free node addr:0x%p\n", temp);
msg_debug("free node addr:%p\n", temp);
msg_free(temp);
}
}
#endif
int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb)
/**
* @brief 发布一个消息
*
* @param module_id:发布消息的模块id号
* @param buffer:消息数据指针
* @param length:消息数据长度
* @return 0:成功 -1失败
*/
int has_msg_publish(has_module_ID_e module_id, void *buffer, unsigned int length)
{
has_msg_node_t *node;
if (buffer == NULL) {
return -1;
}
// if (length == 0) {
// return -1;
// }
has_msg_buffer_t *msg_buffer;
has_static_pub_list_t *pub_list;
has_static_sub_list_t *sub_list;
MSG_CHECK_ID_AND_INDEX(module_id, -1);
// MSG_CHECK_MODULE_ID 如果使用这个宏检查,未初始化的模块也可以发布
pub_list = &g_pub_list[module_id];
if (pub_list->pub_num == 0) {
// msg_printf("no one subscribe you:%d\n", module_id);
return 0;
}
/* 根据订阅表分配内存 */
node = (has_msg_node_t *)msg_malloc((sizeof(has_msg_node_t) * pub_list->pub_num)
+ sizeof(has_msg_buffer_t) + length);
if (node == NULL) {
msg_printf("malloc fail,pub id:%d\n", module_id);
return -1;
}
msg_debug("publish: id:%d node addr:%p\n", module_id, node);
msg_buffer = (has_msg_buffer_t *)(&node[pub_list->pub_num]);
msg_buffer->node_cnt = pub_list->pub_num;
msg_buffer->length = length;
msg_buffer->module_id = module_id;
memcpy(msg_buffer->buffer, buffer, length);
/* 查订阅表添加消息 */
for (int i = 0; i < pub_list->pub_num; i++) {
sub_list = &g_sub_list[g_sub_list_index[pub_list->pub_module[i]]];
node[i].msg_buff = msg_buffer;
msg_mutex_lock(&sub_list->msg_mutex);
list_add_tail(&node[i].list, &sub_list->msg_list); // 加入消息链表
#ifdef MSG_OPT_MUTEX_SEM
/* 通知等待线程唤醒 */
if (msg_sem_notify(&sub_list->sem) == -1) {
list_del(&node[i].list); // 添加失败,删除链表
msg_mutex_unlock(&sub_list->msg_mutex);
msg_check_and_free_message(&node[i]);
msg_printf("[msg warning]:Failed to add msg on module:%d !!!\n"
, sub_list->module_id);
continue;
}
#endif
msg_mutex_unlock(&sub_list->msg_mutex);
}
return 0;
}
/**
* @brief 处理一个消息,先入先出
*
* @param module_id:处理消息的模块id号
* @param cb:消息处理回调
* @param ms_timeout:超时时间ms其中0消息直接返回-1一直阻塞
* @return 0:成功 -1失败
*/
int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb, int ms_timeout)
{
has_msg_node_t *node;
has_static_sub_list_t *sub_list;
@@ -276,6 +402,12 @@ int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb)
sub_list = &g_sub_list[g_sub_list_index[module_id]];
#ifdef MSG_OPT_MUTEX_SEM
if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) {
return -1; // 没有消息需要处理或发生错误
}
#endif
/* 处理一个消息,先入先出 */
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
@@ -287,25 +419,26 @@ int has_msg_handle(has_module_ID_e module_id, has_msg_handle_cb cb)
return -1;
}
#ifdef MSG_OPT_PUB_LIST
if (cb != NULL) {
cb(node->msg_buff->module_id, node->msg_buff->buffer, node->msg_buff->length);
}
msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id);
msg_check_and_free_message(node);
#else
if (cb != NULL) {
cb(node->module_id, node->buffer, node->length);
}
msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node);
msg_free(node);
#endif
return 0;
}
int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb, has_module_ID_e pub_module_id)
/**
* @brief 处理一个指定发布者的消息,先入先出
*
* @param module_id:处理消息的模块id号
* @param cb:消息处理回调
* @param pub_module_id:指定发布者的id号
* @return 0:成功 -1失败
*/
int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb,
has_module_ID_e pub_module_id)
{
has_msg_node_t *node, *temp;
has_static_sub_list_t *sub_list;
@@ -318,40 +451,38 @@ int has_msg_handle_by_module(has_module_ID_e module_id, has_msg_handle_cb cb, ha
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
list_for_each_entry_safe(node, temp, &sub_list->msg_list, list) {
#ifdef MSG_OPT_PUB_LIST
if (node->msg_buff->module_id == pub_module_id) {
#else
if (node->module_id == pub_module_id) {
if (node->msg_buff->module_id == (unsigned char)pub_module_id) {
#ifdef MSG_OPT_MUTEX_SEM
if (msg_sem_wait(&sub_list->sem, 0) != WAIT_MSG_COME) {
msg_mutex_unlock(&sub_list->msg_mutex);
msg_printf("error: list number != sem number\n");
return -1; // eventfd和list不一致理论上一定是 WAIT_MSG_COME
}
#endif
list_del(&node->list);
break;
msg_mutex_unlock(&sub_list->msg_mutex);
if (cb != NULL) {
cb(node->msg_buff->module_id, node->msg_buff->buffer, node->msg_buff->length);
}
msg_debug("by handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id);
msg_check_and_free_message(node);
return 0;
}
}
msg_mutex_unlock(&sub_list->msg_mutex);
} else { // 没有消息需要处理
msg_mutex_unlock(&sub_list->msg_mutex);
return -1;
}
#ifdef MSG_OPT_PUB_LIST
if (cb != NULL) {
cb(node->msg_buff->module_id, node->msg_buff->buffer, node->msg_buff->length);
}
msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id);
msg_check_and_free_message(node);
#else
if (cb != NULL) {
cb(node->module_id, node->buffer, node->length);
}
msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node);
msg_free(node);
#endif
return 0;
// 没有消息需要处理
msg_mutex_unlock(&sub_list->msg_mutex);
return -1;
}
int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb)
/**
* @brief 处理一个最近收到的消息
*
* @param module_id:处理消息的模块id号
* @param cb:消息处理回调
* @return 0:成功 -1失败
*/
int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb, int ms_timeout)
{
has_msg_node_t *node;
has_static_sub_list_t *sub_list;
@@ -359,6 +490,12 @@ int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb)
sub_list = &g_sub_list[g_sub_list_index[module_id]];
#ifdef MSG_OPT_MUTEX_SEM
if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) {
return -1; // 没有消息需要处理或发生错误
}
#endif
/* 处理最新的消息 */
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
@@ -370,19 +507,11 @@ int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb)
return -1;
}
#ifdef MSG_OPT_PUB_LIST
if (cb != NULL) {
cb(node->msg_buff->module_id, node->msg_buff->buffer, node->msg_buff->length);
}
msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id);
msg_check_and_free_message(node);
#else
if (cb != NULL) {
cb(node->module_id, node->buffer, node->length);
}
msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node);
msg_free(node);
#endif
return 0;
@@ -390,7 +519,8 @@ int has_msg_handle_latest(has_module_ID_e module_id, has_msg_handle_cb cb)
/* 将消息复制出接口处理,不建议使用 */
#if 1
unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out)
unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_module_id,
unsigned char *buf_out, int ms_timeout)
{
has_msg_node_t *node;
has_static_sub_list_t *sub_list;
@@ -403,6 +533,12 @@ unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_modul
sub_list = &g_sub_list[g_sub_list_index[module_id]];
#ifdef MSG_OPT_MUTEX_SEM
if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) {
return 0; // 没有消息需要处理或发生错误
}
#endif
/* 处理一个消息,先入先出 */
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
@@ -414,7 +550,6 @@ unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_modul
return 0;
}
#ifdef MSG_OPT_PUB_LIST
memcpy(buf_out, node->msg_buff->buffer, node->msg_buff->length);
if (pub_module_id != NULL) {
*pub_module_id = node->msg_buff->module_id;
@@ -422,20 +557,11 @@ unsigned int has_msg_receive(has_module_ID_e module_id, unsigned char *pub_modul
len = node->msg_buff->length;
msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id);
msg_check_and_free_message(node);
#else
memcpy(buf_out, node->buffer, node->length);
if (pub_module_id != NULL) {
*pub_module_id = node->module_id;
}
len = node->length;
msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node);
msg_free(node);
#endif
return len;
}
unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pub_module_id, unsigned char *buf_out)
unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pub_module_id,
unsigned char *buf_out, int ms_timeout)
{
has_msg_node_t *node;
has_static_sub_list_t *sub_list;
@@ -448,6 +574,12 @@ unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pu
sub_list = &g_sub_list[g_sub_list_index[module_id]];
#ifdef MSG_OPT_MUTEX_SEM
if (msg_sem_wait(&sub_list->sem, ms_timeout) != WAIT_MSG_COME) {
return 0; // 没有消息需要处理或发生错误
}
#endif
/* 处理最新的消息 */
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
@@ -459,7 +591,6 @@ unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pu
return 0;
}
#ifdef MSG_OPT_PUB_LIST
memcpy(buf_out, node->msg_buff->buffer, node->msg_buff->length);
if (pub_module_id != NULL) {
*pub_module_id = node->msg_buff->module_id;
@@ -467,25 +598,19 @@ unsigned int has_msg_receive_latest(has_module_ID_e module_id, unsigned char *pu
len = node->msg_buff->length;
msg_debug("handle:pub id:%d handle id:%d\n", node->msg_buff->module_id, module_id);
msg_check_and_free_message(node);
#else
memcpy(buf_out, node->buffer, node->length);
if (pub_module_id != NULL) {
*pub_module_id = node->module_id;
}
len = node->length;
msg_debug("handle:pub id:%d handle id:%d free addr:0x%p\n", node->module_id, module_id, node);
msg_free(node);
#endif
return len;
}
#endif
/**
* @brief 判断消息链表是否是空,没有消息要处理就是空
*
* @param module_id:要查空的模块id号
* @return 0:没有消息 -1有消息待处理
*/
unsigned int has_msg_is_message_empty(has_module_ID_e module_id)
{
has_msg_node_t *node, *temp;
has_static_sub_list_t *sub_list;
unsigned int num = 0;
MSG_CHECK_ID_AND_INDEX(module_id, -1);
sub_list = &g_sub_list[g_sub_list_index[module_id]];
@@ -500,6 +625,12 @@ unsigned int has_msg_is_message_empty(has_module_ID_e module_id)
}
}
/**
* @brief 获取待处理消息的个数
*
* @param module_id:处理消息的模块id号
* @return num : 消息数量
*/
unsigned int has_msg_get_message_number(has_module_ID_e module_id)
{
has_msg_node_t *node, *temp;
@@ -532,16 +663,19 @@ int has_msg_printf_subscribe(has_module_ID_e module_id)
return 0;
}
/**
* @brief 删除所有未处理的消息
*
* @param module_id:处理消息的模块id号
* @return 0:成功 -1失败
*/
int has_msg_delete_all_message(has_module_ID_e module_id)
{
has_msg_node_t *node, *temp;
has_static_sub_list_t *sub_list;
#ifdef MSG_OPT_PUB_LIST
struct list_head free_list;
has_msg_node_t *free_temp;
unsigned char id;
INIT_LIST_HEAD(&free_list);
#endif
int ret = 0;
MSG_CHECK_ID_AND_INDEX(module_id, -1);
sub_list = &g_sub_list[g_sub_list_index[module_id]];
@@ -549,36 +683,24 @@ int has_msg_delete_all_message(has_module_ID_e module_id)
msg_mutex_lock(&sub_list->msg_mutex);
if (!list_empty(&sub_list->msg_list)) {
list_for_each_entry_safe(node, temp, &sub_list->msg_list, list) {
list_del(&node->list);
#ifdef MSG_OPT_PUB_LIST
list_add_tail(&node->list, &free_list); // 加入临时链表
#else
msg_debug("id:%d free addr:0x%p\n", module_id, node);
msg_free(node);
#ifdef MSG_OPT_MUTEX_SEM
if (msg_sem_wait(&sub_list->sem, 0) != WAIT_MSG_COME) {
msg_printf("error: list number != sem number\n");
ret = -1;
continue; // eventfd和list不一致理论上一定是 WAIT_MSG_COME
}
#endif
list_del(&node->list);
list_add_tail(&node->list, &free_list); // 加入临时链表
}
}
INIT_LIST_HEAD(&sub_list->msg_list);
// INIT_LIST_HEAD(&sub_list->msg_list);
msg_mutex_unlock(&sub_list->msg_mutex);
#ifdef MSG_OPT_PUB_LIST
/* 释放内存 */
list_for_each_entry_safe(node, temp, &free_list, list) {
/* msg_check_and_free_message(node); */
free_temp = NULL;
id = node->msg_buff->module_id;
msg_mutex_lock(&g_pub_list[id].buffer_mutex);
node->msg_buff->node_cnt --;
if (node->msg_buff->node_cnt == 0) {
free_temp = ((has_msg_node_t *)node->msg_buff - g_pub_list[id].pub_num);
}
msg_mutex_unlock(&g_pub_list[id].buffer_mutex);
if (free_temp) {
msg_debug("id:%d free addr:0x%p\n", module_id, free_temp);
msg_free(free_temp);
}
msg_check_and_free_message(node);
}
#endif
return 0;
return ret;
}
#ifdef MSG_OPT_DYNAMIC_SUB
@@ -596,7 +718,7 @@ int has_msg_subscribe(has_module_ID_e module_id, has_module_ID_e sub_id)
if (sub_list->sub_module_cnt < (MODULE_MAX - 1)) {
/* 检查是否已经订阅 */
for (j = 0; j < sub_list->sub_module_cnt; j++) {
if (sub_id == sub_list->sub_module[j]) {
if ((unsigned char)sub_id == sub_list->sub_module[j]) {
msg_mutex_unlock(&sub_list->msg_mutex);
msg_printf("ID:%d has been sub by module:%d\n", sub_id, module_id);
return -1;
@@ -626,7 +748,7 @@ int has_msg_unsubscribe(has_module_ID_e module_id, has_module_ID_e sub_id)
msg_mutex_lock(&sub_list->msg_mutex);
for (int j = 0; j < sub_list->sub_module_cnt; j++) {
if (sub_id == sub_list->sub_module[j]) {
if ((unsigned char)sub_id == sub_list->sub_module[j]) {
sub_list->sub_module[j] = INVALID_ID;
sub_list->sub_module_cnt --;
}

View File

@@ -0,0 +1,219 @@
/*
* has_task_msg_manager/has_task_os_port.c
*
* Author: zhangzhaopeng
* Version 1.0
* Created on : 2026-01-05
* 系统适配层
*/
#include <stdint.h>
#include "has_task_msg_os_port.h"
// TODO: 信号量限制最大数量
#ifdef MSG_OPT_MUTEX_SEM
/**
* @brief 互斥量初始化
*
* @param mutex:互斥量
* @return 0:成功 -1失败
*/
int msg_mutex_init(msg_mutex_t *mutex)
{
#if (MSG_OPT_OS == MSG_OPT_OS_LINUX)
if (pthread_mutex_init(mutex, NULL) != 0) {
return -1;
}
return 0;
#elif (MSG_OPT_OS == MSG_OPT_OS_freeRTOS)
*mutex = xSemaphoreCreateMutex();
if (*mutex == NULL) {
msg_printf("create mutex err!!\n\n");
return -1;
}
return 0;
#elif (MSG_OPT_OS == MSG_OPT_OS_RT_THREAD)
*mutex = rt_mutex_create("has_msg", RT_IPC_FLAG_FIFO);
if (*mutex == RT_NULL) {
msg_printf("create mutex err!!\n\n");
return -1;
}
return 0;
#elif (MSG_OPT_OS == MSG_OPT_OS_USER_DEFINED)
/* to be done */
/* example:xr806 */
if (OS_MutexCreate(mutex) != OS_OK) {
msg_printf("create mutex err!!\n\n");
return -1;
}
return 0;
#endif
}
/**
* @brief 信号量初始化
*
* @param sem:信号量
* @return 0:成功 -1失败
*/
int msg_sem_init(msg_sem_t *sem)
{
#if (MSG_OPT_OS == MSG_OPT_OS_LINUX)
sem->poll_fd.fd = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK);
if (sem->poll_fd.fd == -1) {
msg_printf("create eventfd error!\n");
return -1;
}
sem->poll_fd.events = POLLIN; // 监控可读事件
sem->poll_fd.revents = 0;
return 0;
#elif (MSG_OPT_OS == MSG_OPT_OS_freeRTOS)
*sem = xSemaphoreCreateCounting(UINT_MAX, 0);
if (*sem == NULL) {
msg_printf("create counting sem error!\n");
return -1;
}
return 0;
#elif (MSG_OPT_OS == MSG_OPT_OS_RT_THREAD)
*sem = rt_sem_create("has_sem", 0, RT_IPC_FLAG_FIFO);
if (*sem == RT_NULL) {
msg_printf("create counting sem error!\n");
return -1;
}
return 0;
#elif (MSG_OPT_OS == MSG_OPT_OS_USER_DEFINED)
/* to be done */
/* example:xr806 */
if (OS_SemaphoreCreate(sem, 0, UINT_MAX) != OS_OK) {
msg_printf("create counting sem error!\n");
return -1;
}
return 0;
#endif
}
/**
* @brief 消息通知
*
* @param sem:信号量
* @return 0:成功 -1失败
*/
int msg_sem_notify(msg_sem_t *sem)
{
#if (MSG_OPT_OS == MSG_OPT_OS_LINUX)
uint64_t val = 1;
if (write(sem->poll_fd.fd, &val, sizeof(val)) != sizeof(val)) {
msg_printf("notify thread err!\n");
return -1;
}
return 0;
#elif (MSG_OPT_OS == MSG_OPT_OS_freeRTOS)
if (xSemaphoreGive(*sem) != pdTRUE) {
msg_printf("notify thread err!\n");
return -1;
}
return 0;
#elif (MSG_OPT_OS == MSG_OPT_OS_RT_THREAD)
if (rt_sem_release(*sem) != RT_EOK) {
msg_printf("notify thread err!\n");
return -1;
}
return 0;
#elif (MSG_OPT_OS == MSG_OPT_OS_USER_DEFINED)
/* to be done */
/* example:xr806 */
if (OS_SemaphoreRelease(sem) != OS_OK) {
msg_printf("notify thread err!\n");
return -1;
}
return 0;
#endif
}
/**
* @brief 等待消息
*
* @param sem:信号量
* @param ms_timeout:超时时间ms其中0消息直接返回-1一直阻塞
* @return MSG_WAIT_RET:
* WAIT_FAIL = -1, 等待失败
* WAIT_TIMEOUT = -1, 等待超时
* WAIT_MSG_COME = -1, 有新消息
*/
enum MSG_WAIT_RET msg_sem_wait(msg_sem_t *sem, int ms_timeout)
{
#if (MSG_OPT_OS == MSG_OPT_OS_LINUX)
uint64_t val;
int ret = poll(&sem->poll_fd, 1, ms_timeout);
if (ret == -1) {
perror("poll fail\n");
return WAIT_FAIL;
} else if (ret == 0) {
// msg_printf("poll timeout\n");
return WAIT_TIMEOUT;
}
if (sem->poll_fd.revents & POLLIN) {
if (read(sem->poll_fd.fd, &val, sizeof(val)) == sizeof(val)) { // 清零
// msg_printf("read %ld\n", val);
return WAIT_MSG_COME;
}
else {
msg_printf("read error\n");
}
}
// 检查错误事件
msg_printf("something err happened,revents:%x\n", sem->poll_fd.revents);
return WAIT_FAIL;
#elif (MSG_OPT_OS == MSG_OPT_OS_freeRTOS)
TickType_t xTicksToWait;
if (ms_timeout < 0) {
xTicksToWait = portMAX_DELAY;
} else {
xTicksToWait = pdMS_TO_TICKS(ms_timeout);
}
if (xSemaphoreTake(*sem, xTicksToWait) != pdTRUE) {
// msg_printf("sem take fail\n");
return WAIT_FAIL;
}
return WAIT_MSG_COME;
#elif (MSG_OPT_OS == MSG_OPT_OS_RT_THREAD)
rt_err_t ret;
rt_int32_t time;
if (ms_timeout < 0) {
time = RT_WAITING_FOREVER;
} else {
time = rt_tick_from_millisecond(ms_timeout);
}
ret = rt_sem_take(*sem, time);
if (ret == RT_EOK) {
return WAIT_MSG_COME;
} else if (ret == (-RT_ETIMEOUT)) {
return WAIT_TIMEOUT;
} else {
msg_printf("sem take fail\n");
return WAIT_FAIL;
}
#elif (MSG_OPT_OS == MSG_OPT_OS_USER_DEFINED)
/* to be done */
/* example:xr806 */
OS_Time_t waitMS;
if (ms_timeout < 0) {
waitMS = OS_WAIT_FOREVER;
} else {
waitMS = (OS_Time_t)ms_timeout;
}
if (OS_SemaphoreWait(sem, waitMS) != OS_OK) {
// msg_printf("sem take fail\n");
return WAIT_FAIL;
}
return WAIT_MSG_COME;
#endif
}
#endif

View File

@@ -7,15 +7,22 @@
#define MSG_OPT_OS_NONE_OR_OSAL 4
#define MSG_OPT_OS_USER_DEFINED 5
/* TODO:rtt和freertos未测过 */
/* TODO:rtt未验证 */
#define MSG_OPT_OS MSG_OPT_OS_LINUX
// #define MSG_OPT_OS MSG_OPT_OS_freeRTOS
// #define MSG_OPT_OS MSG_OPT_OS_RT_THREAD
// #define MSG_OPT_OS MSG_OPT_OS_NONE_OR_OSAL // 裸机或osal
// #define MSG_OPT_OS MSG_OPT_OS_USER_DEFINED // 自定义
#define MSG_OPT_USE_MUTEX // 支持互斥量
#define MSG_OPT_MUTEX_SEM // 支持互斥量和信号量
#ifdef MSG_OPT_MUTEX_SEM
enum MSG_WAIT_RET{
WAIT_FAIL = -1, // 等待失败
WAIT_TIMEOUT, // 等待超时
WAIT_MSG_COME, // 需要处理消息
};
#endif
/* ----------------------- for Linux ----------------------- */
#if (MSG_OPT_OS == MSG_OPT_OS_LINUX)
@@ -23,15 +30,22 @@
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include "list.h"
#ifdef MSG_OPT_USE_MUTEX
#ifdef MSG_OPT_MUTEX_SEM
#include <poll.h>
#include <sys/eventfd.h>
#include <unistd.h>
#define msg_mutex_t pthread_mutex_t
#define msg_mutex_init(m) pthread_mutex_init(m, NULL)
#define msg_mutex_lock(m) pthread_mutex_lock(m)
#define msg_mutex_unlock(m) pthread_mutex_unlock(m)
typedef struct {
struct pollfd poll_fd;
} sync_t;
#define msg_sem_t sync_t
#else
#define msg_mutex_t
#define msg_mutex_init(m)
#define msg_mutex_lock(m)
#define msg_mutex_unlock(m)
#endif
@@ -44,79 +58,74 @@
#elif (MSG_OPT_OS == MSG_OPT_OS_freeRTOS)
#include <stdlib.h>
#include <stdio.h>
#include <limits.h>
#include "FreeRTOS.h"
#include "semphr.h"
#ifdef MSG_OPT_USE_MUTEX
#define msg_mutex_t SemaphoreHandle_t
static inline void msg_mutex_init(void *mutex) {
*mutex = xSemaphoreCreateMutex();
if (*mutex == NULL) {
printf("create mutex err!!\n\n");
}
}
static inline void msg_mutex_lock(void *mutex) {
if (xSemaphoreTake(*mutex, portMAX_DELAY) != pdTRUE) {
printf("lock mutex err!!\n\n");
}
}
static inline void msg_mutex_unlock(void *mutex) {
xSemaphoreGive(*mutex);
}
#else
#define msg_mutex_t
#define msg_mutex_init(m)
#define msg_mutex_lock(m)
#define msg_mutex_unlock(m)
#endif
#include "list.h"
#define msg_malloc pvPortMalloc
#define msg_free vPortFree
#define msg_printf printf
#ifdef MSG_OPT_MUTEX_SEM
#define msg_mutex_t SemaphoreHandle_t
static inline void msg_mutex_lock(msg_mutex_t *mutex) {
if (xSemaphoreTake(*mutex, portMAX_DELAY) != pdTRUE) {
msg_printf("lock mutex err!!\n\n");
}
}
static inline void msg_mutex_unlock(msg_mutex_t *mutex) {
xSemaphoreGive(*mutex);
}
#define msg_sem_t SemaphoreHandle_t
#else
#define msg_mutex_t
#define msg_mutex_lock(m)
#define msg_mutex_unlock(m)
#endif
/* ----------------------- for RT-thread ----------------------- */
#elif (MSG_OPT_OS == MSG_OPT_OS_RT_THREAD)
#include <stdlib.h>
#include <stdio.h>
#include <rtthread.h>
#ifdef MSG_OPT_USE_MUTEX
#define msg_mutex_t rt_mutex_t
static inline void msg_mutex_init(void *mutex) {
*mutex = rt_mutex_create("has_msg", RT_IPC_FLAG_FIFO);
if (*mutex == RT_NULL) {
printf("create mutex err!!\n\n");
}
}
static inline void msg_mutex_lock(void *mutex) {
if (rt_mutex_take(*mutex, RT_WAITING_FOREVER) != RT_EOK) {
printf("lock mutex err!!\n\n");
}
}
static inline void msg_mutex_unlock(void *mutex) {
rt_mutex_release(*mutex);
}
#else
#define msg_mutex_t
#define msg_mutex_init(m)
#define msg_mutex_lock(m)
#define msg_mutex_unlock(m)
#endif
#include "list.h"
#define msg_malloc rt_malloc
#define msg_free rt_free
#define msg_printf printf
#ifdef MSG_OPT_MUTEX_SEM
#define msg_mutex_t rt_mutex_t
static inline void msg_mutex_lock(msg_mutex_t *mutex) {
if (rt_mutex_take(*mutex, RT_WAITING_FOREVER) != RT_EOK) {
msg_printf("lock mutex err!!\n\n");
}
}
static inline void msg_mutex_unlock(msg_mutex_t *mutex) {
rt_mutex_release(*mutex);
}
#define msg_sem_t rt_sem_t
#else
#define msg_mutex_t
#define msg_mutex_lock(m)
#define msg_mutex_unlock(m)
#endif
/* ----------------------- for none os ----------------------- */
#elif (MSG_OPT_OS == MSG_OPT_OS_NONE_OR_OSAL)
#include <stdlib.h>
#include <stdio.h>
#undef MSG_OPT_USE_MUTEX
#include "list.h"
#undef MSG_OPT_MUTEX_SEM
#define msg_mutex_t
#define msg_mutex_init(m)
#define msg_mutex_lock(m)
#define msg_mutex_unlock(m)
@@ -126,37 +135,50 @@ static inline void msg_mutex_unlock(void *mutex) {
/* ----------------------- for user define ----------------------- */
#elif (MSG_OPT_OS == MSG_OPT_OS_USER_DEFINED)
/* 下面示例是基于xr806移植 */
#include <stdlib.h>
#include <stdio.h>
#include <limits.h>
#include "list.h"
#ifdef MSG_OPT_USE_MUTEX
#define msg_mutex_t int/* to be done */
static inline void msg_mutex_init(void *mutex) {
/* to be done */
#include "../has_wifi_slab.h"
#include "../kernel/os/os.h"
#define msg_malloc test_slab_malloc
#define msg_free test_slab_free
#define msg_printf printf
#ifdef MSG_OPT_MUTEX_SEM
#define msg_mutex_t OS_Mutex_t
static inline void msg_mutex_lock(msg_mutex_t *mutex) {
if (OS_MutexLock(mutex, OS_WAIT_FOREVER) != OS_OK) {
msg_printf("lock mutex err!!\n\n");
}
}
static inline void msg_mutex_lock(void *mutex) {
/* to be done */
static inline void msg_mutex_unlock(msg_mutex_t *mutex) {
OS_MutexUnlock(mutex);
}
static inline void msg_mutex_unlock(void *mutex) {
/* to be done */
}
#define msg_sem_t OS_Semaphore_t
#else
#define msg_mutex_t
#define msg_mutex_init(m)
#define msg_mutex_lock(m)
#define msg_mutex_unlock(m)
#endif
#define msg_malloc malloc
#define msg_free free
#define msg_printf printf
#else
#error "!Undefined system environment!"
#endif
#endif
#endif
#ifdef MSG_OPT_MUTEX_SEM
int msg_mutex_init(msg_mutex_t *mutex);
int msg_sem_init(msg_sem_t *sem);
int msg_sem_notify(msg_sem_t *sem);
enum MSG_WAIT_RET msg_sem_wait(msg_sem_t *sem, int ms_timeout);
#endif

View File

@@ -43,8 +43,8 @@ void *gui_function(void *arg)
has_msg_publish(GUI, gui_publish_buffer3, sizeof(gui_publish_buffer3)); // GUI发布第三条消息
while (1)
{
usleep(500 * 1000);
has_msg_handle(GUI, gui_handle_cb); // 处理一条GUI收到的消息先入先出
// usleep(500 * 1000);
has_msg_handle(GUI, gui_handle_cb, 500); // 处理一条GUI收到的消息先入先出
}
}
@@ -63,9 +63,9 @@ void *sensor_function(void *arg)
has_msg_publish(moude_ID, sensor_publish_buffer2, sizeof(sensor_publish_buffer2)); // SENSOR发布第二条消息
while (1)
{
usleep(500 * 1000);
has_msg_handle(moude_ID, sensor_handle_cb); // 处理一条SENSOR收到的消息先入先出
has_msg_handle_latest(moude_ID, sensor_handle_cb); // 处理一条SENSOR最新收到的消息
// usleep(500 * 1000);
has_msg_handle(moude_ID, sensor_handle_cb, 500); // 处理一条SENSOR收到的消息先入先出
has_msg_handle_latest(moude_ID, sensor_handle_cb, 0); // 处理一条SENSOR最新收到的消息
if (has_msg_is_message_empty(moude_ID) != 0) // 判断是否还有消息未处理
{
printf("模块:%d 还有%d条消息待处理\n", moude_ID, has_msg_get_message_number(moude_ID));