fusion/common/threadpool/threadpool.c

411 lines
12 KiB
C
Raw Permalink Normal View History

2025-08-05 07:53:44 +00:00
/*
* =====================================================================================
* Copyright (c), 2013-2020, Jz.
* Filename: ringbuffer.h
* Author: linux kernel,
* Organization: jz
*
* =====================================================================================
*/
/*************************************************
File name : ringbuffer.h
Module : common
Author : amir
Created on : 2023-01-6
Description :
a ringbuffer to store data(copy)
Modify History:
1. Date: Author: Modification:
202407 free时packet重复释放
*************************************************/
#include <stdlib.h>
#include <stdint.h>
#include <errno.h>
#include "list.h"
#include "htime.h"
#include "hthread.h"
#include "hbase.h"
#include "hlog.h"
#include "hmutex.h"
#include "threadpool.h"
#define TAG "TAG_THREADPOOL"
#define POOL_DEGUG 0
enum{
TASKLIST_FREE = 0,//unuse queue
TASKLIST_BUSYING, //normal task queue, all threads can run
TASKLIST_MAX,
}TASKLIST_TYPE;
/**
* @struct threadpool_task
* @brief the work struct
*
* @var function Pointer to the function that will perform the task.
* @var argument Argument to be passed to the function.
*/
typedef struct {
struct list_head link;
thread_routine routine;
kpacket *packet;
} threadpool_task_s;
typedef struct {
uint8_t busying;
uint32_t max_exe_time;
uint32_t total_exe_time;
uint32_t total_exe_cnt;
uint32_t th_idx;
hthread_t th;
} thread_info_s;
struct threadpool_s{
kpacket ref;
hmutex_t mutex;
hsem_t sem;
thread_info_s *threads_info;
uint8_t exit;
uint8_t thread_count;
uint8_t cur_th_idx; /*当前的id*/
uint8_t max_thread_count;
uint8_t all_thread_busy_times;
struct list_head route_queue[TASKLIST_MAX];
};
#define is_tasklist_empty(_QUEUE) list_empty(pool->route_queue[_QUEUE].next)
HTHREAD_ROUTINE(threadpool_thread){
threadpool *pool = (threadpool *)userdata;
hmutex_lock(&(pool->mutex));
thread_info_s *this_info = &pool->threads_info[pool->cur_th_idx];
this_info->th_idx = pool->cur_th_idx;
pool->cur_th_idx++;
hmutex_unlock(&(pool->mutex));
hlogd("%s start:%p th_idx:%u", __func__, userdata, this_info->th_idx);
while(!pool->exit) {
threadpool_task_s *task = NULL;
hmutex_lock(&(pool->mutex));
if (!is_tasklist_empty(TASKLIST_BUSYING)){
task = list_entry(pool->route_queue[TASKLIST_BUSYING].next, threadpool_task_s, link);
list_del(&task->link);
}
if (!task){
if(!pool->exit){
#if POOL_DEGUG
hlogd("wait th_idx:%u normal:%u", this_info->th_idx, is_tasklist_empty(TASKLIST_BUSYING));
#endif
hmutex_unlock(&(pool->mutex));
hsem_wait(&(pool->sem));
//hlogd("th_idx:%u", this_info->th_idx);
}
}
else
{
/* Get to work */
kpacket *packet = task->packet;
uint64_t elasped = gettimeofday_ms();
#if POOL_DEGUG
hlogd("task->routine:%p", task->routine);
#endif
this_info->busying = 1;
hmutex_unlock(&(pool->mutex));
task->routine(packet);
hmutex_lock(&(pool->mutex));
this_info->busying = 0;
elasped = gettimeofday_ms() - elasped;
if (elasped > this_info->max_exe_time){
this_info->max_exe_time = elasped;
}
this_info->total_exe_cnt ++;
this_info->total_exe_time += elasped;
packet ? kpacket_dec(packet) : 0;
list_add(&task->link, &pool->route_queue[TASKLIST_FREE]);
hmutex_unlock(&(pool->mutex));
}
}
hmutex_lock(&(pool->mutex));
pool->thread_count--;
hmutex_unlock(&(pool->mutex));
hlogw("%s exit thread_id:%ld\n",__func__, hv_gettid());
return(NULL);
}
/**
* @function void *threadpool_thread(void *threadpool)
* @brief the worker thread
* @param threadpool the pool which own the thread
*/
static uint32_t threadpool_free(threadpool *pool)
{
if (!pool) {
return -1;
}
/* Did we manage to allocate ? */
if (pool->threads_info) {
/* Because we allocate pool->threads_info after initializing the
mutex and condition variable, we're sure they're
initialized. Let's mutex the mutex just in case. */
hmutex_lock(&(pool->mutex));
struct list_head *pos, *n;
uint32_t i;
for(i=0; i<TASKLIST_MAX; i++){
list_for_each_safe(pos, n, &pool->route_queue[i]){
threadpool_task_s* task = list_entry(pos, threadpool_task_s, link);
list_del(pos);
if (task->packet && i != TASKLIST_FREE){
kpacket_dec(task->packet);
}
hv_free(task);
}
}
hmutex_unlock(&(pool->mutex));
hmutex_destroy(&(pool->mutex));
hsem_destroy(&(pool->sem));
hv_free(pool->threads_info);
pool->threads_info = NULL;
}
hv_free(pool);
return 0;
}
threadpool *threadpool_create(uint32_t max_thread_cnt)
{
threadpool *pool;
max_thread_cnt = max_thread_cnt <= 0 || max_thread_cnt > MAX_THREADS?MAX_THREADS:max_thread_cnt;
if ((pool = (threadpool *)hv_calloc(1, sizeof(threadpool))) == NULL) {
goto err;
}
/* Initialize */
INIT_LIST_HEAD(&pool->route_queue[TASKLIST_BUSYING]);
INIT_LIST_HEAD(&pool->route_queue[TASKLIST_FREE]);
pool->max_thread_count = max_thread_cnt;
/* Allocate thread and task queue */
pool->threads_info = (thread_info_s *)calloc(1, sizeof(thread_info_s) * max_thread_cnt);
/* Initialize mutex and conditional variable first */
if ((hmutex_init(&pool->mutex) != 0) ||
(hsem_init(&pool->sem, 0) != 0) ||
(pool->threads_info == NULL)) {
goto err;
}
return pool;
err:
if (pool) {
threadpool_free(pool);
}
return NULL;
}
static uint32_t threadpool_free_thread_count_locked(threadpool *pool){
uint32_t i, count = 0;
for(i=0; i < pool->thread_count && i<pool->max_thread_count; i++){
thread_info_s *info = &pool->threads_info[i];
count += info->busying ? 0 : 1;
}
//hlogd("%s %u", __func__, count);
return count;
}
static uint32_t threadpool_add_internal(threadpool *pool, thread_routine routine, kpacket *packet){
uint32_t err = 0;
if (pool == NULL || routine == NULL)
{
hlogd("ERR pool = %p or routine=%p", pool, routine);
return EINVAL;
}
if (hmutex_lock(&(pool->mutex)) != 0)
{
hlogd("ERR hmutex_lock");
return EPERM;
}
do {
/* Are we shutting down ? */
if (pool->exit) {
err = EBADRQC;
break;
}
threadpool_task_s *task;
if (is_tasklist_empty(TASKLIST_FREE)){
task = (threadpool_task_s *)hv_malloc(sizeof(threadpool_task_s));
}
else
{
task = list_entry(pool->route_queue[TASKLIST_FREE].next, threadpool_task_s, link);
list_del(&task->link);
}
if (!task)
{
err = ENOMEM;
break;
}
task->routine = routine;
task->packet = packet;
(void)(packet ? kpacket_inc(packet) : 0);
//if block task then add to block_list otherwise normal ready_list
//static cnt; hlogd("cnt:%u",cnt++);
list_add_tail(&task->link,&(pool->route_queue[TASKLIST_BUSYING]));
if (threadpool_free_thread_count_locked(pool) == 0 && pool->thread_count < pool->max_thread_count)
{
pool->threads_info[pool->thread_count].th = hthread_create(threadpool_thread, pool);
pool->thread_count++;
}
else
{
pool->all_thread_busy_times++;
// hloge("%s hv_free_threads:%u, thread_count:%u, max_thread_count:%u",__func__, threadpool_free_thread_count_locked(pool), pool->thread_count, pool->max_thread_count);
}
if (hsem_post(&pool->sem) != 0) {
err = EPERM;
break;
}
} while(0);
if (hmutex_unlock(&pool->mutex) != 0)
{
err = EPERM;
}
return err;
}
uint32_t threadpool_add(threadpool *pool, thread_routine routine, kpacket *packet)
{
return threadpool_add_internal(pool, routine, packet);
}
uint32_t threadpool_del(threadpool *pool, thread_routine routine){
uint32_t err = 0;
if (routine == NULL || pool == NULL) {
hloge("%s err routine:%p(!NULL) spool:%p(!NULL)",__func__, routine, pool );
return EINVAL;
}
if (hmutex_lock(&(pool->mutex)) != 0) {
return EPERM;
}
struct list_head *pos, *n;
uint32_t i;
for(i=0; i<TASKLIST_MAX; i++){
list_for_each_safe(pos, n, &pool->route_queue[i]){
threadpool_task_s* task = list_entry(pos, threadpool_task_s, link);
if (task->routine == routine){
list_del(pos);
if (task->packet && i != TASKLIST_FREE){
kpacket_dec(task->packet);
}
hv_free(task);
break;
}
}
}
if (hmutex_unlock(&pool->mutex) != 0) {
err = EPERM;
}
return err;
}
uint32_t threadpool_destroy(threadpool *pool)
{
uint32_t i, thread_cnt, err = 0;
if (pool == NULL) {
return EINVAL;
}
if (hmutex_lock(&(pool->mutex)) != 0) {
return EPERM;
}
do {
/* Already shutting down */
if (pool->exit) {
err = EBADRQC;
break;
}
thread_cnt = pool->thread_count;
pool->exit = 1;
hmutex_unlock(&(pool->mutex));
/* then Wake up all worker threads */
for(i=0; i<thread_cnt; i++){
if ((hsem_post(&(pool->sem)) != 0)) {
err = EPERM;
thread_info_s *info = &pool->threads_info[i];
hloge("%s err hsem_post th_idx:%d %p", __func__, info->th_idx, info->th);
}
}
/* Join all worker thread */
//hloge("%s thread_cnt %u", __func__, thread_cnt);
for(i = 0; i < thread_cnt && err == 0; i++) {
uint64_t start_ms = gettimeofday_ms();
thread_info_s *info = &pool->threads_info[i];
if (hthread_join(info->th) != 0) {
err = ESRCH;
hloge("%s hthread_join th_idx:%d %p", __func__, info->th_idx, info->th);
}else {
hlogw("%s %u/%u hthread_join th_idx:%d %p elapsed:%ul ms", __func__, i+1, thread_cnt, info->th_idx, info->th, gettimeofday_ms() - start_ms);
}
}
} while(0);
/* Only if everything went well do we deallocate the pool */
if (!err) {
threadpool_free(pool);
}
hlogw("%s",__func__);
return err;
}
void threadpool_dump(threadpool *pool){
hmutex_lock(&(pool->mutex));
uint32_t i;
hlogd("%s start", __func__);
hlogd("hv_free_thread:%u busy_times:%u max_thread_count:%u thread_count:%u",threadpool_free_thread_count_locked(pool), pool->all_thread_busy_times, pool->max_thread_count, pool->thread_count);
for(i=0; i< pool->thread_count && i<pool->max_thread_count; i++){
thread_info_s *info = &pool->threads_info[i];
hlogd("thread_info_s:%02d busy:%d exe_cnt:%u max_exe_time:%u total_exe_time:%u", info->th_idx, info->busying, info->total_exe_cnt, info->max_exe_time, info->total_exe_time);
}
hlogd("%s end", __func__);
hmutex_unlock(&(pool->mutex));
}
static void *test_routine(void *param)
{
hloge("%s th_id:%ld", __func__, hv_gettid());
hv_msleep(10);
return NULL;
}
void threadpool_test(){
hlogw("%s start", __func__);
threadpool *tp = threadpool_create(5);
for (uint32_t i = 10; i>0; i--){
threadpool_add(tp, test_routine, NULL);
hv_msleep(1);
}
hv_sleep(1);
threadpool_destroy(tp);
hlogw("%s end", __func__);
}
#if 0
kpacket *packet = kpacket_new(EVENT_HAL_DAYNIGHT, sizeof(uint32_t));
*packet->box->payload = IRCUT_NIGHT;
threadpool_add(pool, timer_callback, packet, 1000);
kpacket_dec(packet);
#endif