/* * ===================================================================================== * Copyright (c), 2013-2020, Jz. * Filename: ringbuffer.h * Author: linux kernel, * Organization: jz * * ===================================================================================== */ /************************************************* File name : ringbuffer.h Module : common Author : amir Created on : 2023-01-6 Description : a ringbuffer to store data(copy) Modify History: 1. Date: Author: Modification: 202407 罗启宏 free时packet重复释放 *************************************************/ #include #include #include #include "list.h" #include "htime.h" #include "hthread.h" #include "hbase.h" #include "hlog.h" #include "hmutex.h" #include "threadpool.h" #define TAG "TAG_THREADPOOL" #define POOL_DEGUG 0 enum{ TASKLIST_FREE = 0,//unuse queue TASKLIST_BUSYING, //normal task queue, all threads can run TASKLIST_MAX, }TASKLIST_TYPE; /** * @struct threadpool_task * @brief the work struct * * @var function Pointer to the function that will perform the task. * @var argument Argument to be passed to the function. */ typedef struct { struct list_head link; thread_routine routine; kpacket *packet; } threadpool_task_s; typedef struct { uint8_t busying; uint32_t max_exe_time; uint32_t total_exe_time; uint32_t total_exe_cnt; uint32_t th_idx; hthread_t th; } thread_info_s; struct threadpool_s{ kpacket ref; hmutex_t mutex; hsem_t sem; thread_info_s *threads_info; uint8_t exit; uint8_t thread_count; uint8_t cur_th_idx; /*当前的id*/ uint8_t max_thread_count; uint8_t all_thread_busy_times; struct list_head route_queue[TASKLIST_MAX]; }; #define is_tasklist_empty(_QUEUE) list_empty(pool->route_queue[_QUEUE].next) HTHREAD_ROUTINE(threadpool_thread){ threadpool *pool = (threadpool *)userdata; hmutex_lock(&(pool->mutex)); thread_info_s *this_info = &pool->threads_info[pool->cur_th_idx]; this_info->th_idx = pool->cur_th_idx; pool->cur_th_idx++; hmutex_unlock(&(pool->mutex)); hlogd("%s start:%p th_idx:%u", __func__, userdata, this_info->th_idx); while(!pool->exit) { threadpool_task_s *task = NULL; hmutex_lock(&(pool->mutex)); if (!is_tasklist_empty(TASKLIST_BUSYING)){ task = list_entry(pool->route_queue[TASKLIST_BUSYING].next, threadpool_task_s, link); list_del(&task->link); } if (!task){ if(!pool->exit){ #if POOL_DEGUG hlogd("wait th_idx:%u normal:%u", this_info->th_idx, is_tasklist_empty(TASKLIST_BUSYING)); #endif hmutex_unlock(&(pool->mutex)); hsem_wait(&(pool->sem)); //hlogd("th_idx:%u", this_info->th_idx); } } else { /* Get to work */ kpacket *packet = task->packet; uint64_t elasped = gettimeofday_ms(); #if POOL_DEGUG hlogd("task->routine:%p", task->routine); #endif this_info->busying = 1; hmutex_unlock(&(pool->mutex)); task->routine(packet); hmutex_lock(&(pool->mutex)); this_info->busying = 0; elasped = gettimeofday_ms() - elasped; if (elasped > this_info->max_exe_time){ this_info->max_exe_time = elasped; } this_info->total_exe_cnt ++; this_info->total_exe_time += elasped; packet ? kpacket_dec(packet) : 0; list_add(&task->link, &pool->route_queue[TASKLIST_FREE]); hmutex_unlock(&(pool->mutex)); } } hmutex_lock(&(pool->mutex)); pool->thread_count--; hmutex_unlock(&(pool->mutex)); hlogw("%s exit thread_id:%ld\n",__func__, hv_gettid()); return(NULL); } /** * @function void *threadpool_thread(void *threadpool) * @brief the worker thread * @param threadpool the pool which own the thread */ static uint32_t threadpool_free(threadpool *pool) { if (!pool) { return -1; } /* Did we manage to allocate ? */ if (pool->threads_info) { /* Because we allocate pool->threads_info after initializing the mutex and condition variable, we're sure they're initialized. Let's mutex the mutex just in case. */ hmutex_lock(&(pool->mutex)); struct list_head *pos, *n; uint32_t i; for(i=0; iroute_queue[i]){ threadpool_task_s* task = list_entry(pos, threadpool_task_s, link); list_del(pos); if (task->packet && i != TASKLIST_FREE){ kpacket_dec(task->packet); } hv_free(task); } } hmutex_unlock(&(pool->mutex)); hmutex_destroy(&(pool->mutex)); hsem_destroy(&(pool->sem)); hv_free(pool->threads_info); pool->threads_info = NULL; } hv_free(pool); return 0; } threadpool *threadpool_create(uint32_t max_thread_cnt) { threadpool *pool; max_thread_cnt = max_thread_cnt <= 0 || max_thread_cnt > MAX_THREADS?MAX_THREADS:max_thread_cnt; if ((pool = (threadpool *)hv_calloc(1, sizeof(threadpool))) == NULL) { goto err; } /* Initialize */ INIT_LIST_HEAD(&pool->route_queue[TASKLIST_BUSYING]); INIT_LIST_HEAD(&pool->route_queue[TASKLIST_FREE]); pool->max_thread_count = max_thread_cnt; /* Allocate thread and task queue */ pool->threads_info = (thread_info_s *)calloc(1, sizeof(thread_info_s) * max_thread_cnt); /* Initialize mutex and conditional variable first */ if ((hmutex_init(&pool->mutex) != 0) || (hsem_init(&pool->sem, 0) != 0) || (pool->threads_info == NULL)) { goto err; } return pool; err: if (pool) { threadpool_free(pool); } return NULL; } static uint32_t threadpool_free_thread_count_locked(threadpool *pool){ uint32_t i, count = 0; for(i=0; i < pool->thread_count && imax_thread_count; i++){ thread_info_s *info = &pool->threads_info[i]; count += info->busying ? 0 : 1; } //hlogd("%s %u", __func__, count); return count; } static uint32_t threadpool_add_internal(threadpool *pool, thread_routine routine, kpacket *packet){ uint32_t err = 0; if (pool == NULL || routine == NULL) { hlogd("ERR pool = %p or routine=%p", pool, routine); return EINVAL; } if (hmutex_lock(&(pool->mutex)) != 0) { hlogd("ERR hmutex_lock"); return EPERM; } do { /* Are we shutting down ? */ if (pool->exit) { err = EBADRQC; break; } threadpool_task_s *task; if (is_tasklist_empty(TASKLIST_FREE)){ task = (threadpool_task_s *)hv_malloc(sizeof(threadpool_task_s)); } else { task = list_entry(pool->route_queue[TASKLIST_FREE].next, threadpool_task_s, link); list_del(&task->link); } if (!task) { err = ENOMEM; break; } task->routine = routine; task->packet = packet; (void)(packet ? kpacket_inc(packet) : 0); //if block task then add to block_list otherwise normal ready_list //static cnt; hlogd("cnt:%u",cnt++); list_add_tail(&task->link,&(pool->route_queue[TASKLIST_BUSYING])); if (threadpool_free_thread_count_locked(pool) == 0 && pool->thread_count < pool->max_thread_count) { pool->threads_info[pool->thread_count].th = hthread_create(threadpool_thread, pool); pool->thread_count++; } else { pool->all_thread_busy_times++; // hloge("%s hv_free_threads:%u, thread_count:%u, max_thread_count:%u",__func__, threadpool_free_thread_count_locked(pool), pool->thread_count, pool->max_thread_count); } if (hsem_post(&pool->sem) != 0) { err = EPERM; break; } } while(0); if (hmutex_unlock(&pool->mutex) != 0) { err = EPERM; } return err; } uint32_t threadpool_add(threadpool *pool, thread_routine routine, kpacket *packet) { return threadpool_add_internal(pool, routine, packet); } uint32_t threadpool_del(threadpool *pool, thread_routine routine){ uint32_t err = 0; if (routine == NULL || pool == NULL) { hloge("%s err routine:%p(!NULL) spool:%p(!NULL)",__func__, routine, pool ); return EINVAL; } if (hmutex_lock(&(pool->mutex)) != 0) { return EPERM; } struct list_head *pos, *n; uint32_t i; for(i=0; iroute_queue[i]){ threadpool_task_s* task = list_entry(pos, threadpool_task_s, link); if (task->routine == routine){ list_del(pos); if (task->packet && i != TASKLIST_FREE){ kpacket_dec(task->packet); } hv_free(task); break; } } } if (hmutex_unlock(&pool->mutex) != 0) { err = EPERM; } return err; } uint32_t threadpool_destroy(threadpool *pool) { uint32_t i, thread_cnt, err = 0; if (pool == NULL) { return EINVAL; } if (hmutex_lock(&(pool->mutex)) != 0) { return EPERM; } do { /* Already shutting down */ if (pool->exit) { err = EBADRQC; break; } thread_cnt = pool->thread_count; pool->exit = 1; hmutex_unlock(&(pool->mutex)); /* then Wake up all worker threads */ for(i=0; isem)) != 0)) { err = EPERM; thread_info_s *info = &pool->threads_info[i]; hloge("%s err hsem_post th_idx:%d %p", __func__, info->th_idx, info->th); } } /* Join all worker thread */ //hloge("%s thread_cnt %u", __func__, thread_cnt); for(i = 0; i < thread_cnt && err == 0; i++) { uint64_t start_ms = gettimeofday_ms(); thread_info_s *info = &pool->threads_info[i]; if (hthread_join(info->th) != 0) { err = ESRCH; hloge("%s hthread_join th_idx:%d %p", __func__, info->th_idx, info->th); }else { hlogw("%s %u/%u hthread_join th_idx:%d %p elapsed:%ul ms", __func__, i+1, thread_cnt, info->th_idx, info->th, gettimeofday_ms() - start_ms); } } } while(0); /* Only if everything went well do we deallocate the pool */ if (!err) { threadpool_free(pool); } hlogw("%s",__func__); return err; } void threadpool_dump(threadpool *pool){ hmutex_lock(&(pool->mutex)); uint32_t i; hlogd("%s start", __func__); hlogd("hv_free_thread:%u busy_times:%u max_thread_count:%u thread_count:%u",threadpool_free_thread_count_locked(pool), pool->all_thread_busy_times, pool->max_thread_count, pool->thread_count); for(i=0; i< pool->thread_count && imax_thread_count; i++){ thread_info_s *info = &pool->threads_info[i]; hlogd("thread_info_s:%02d busy:%d exe_cnt:%u max_exe_time:%u total_exe_time:%u", info->th_idx, info->busying, info->total_exe_cnt, info->max_exe_time, info->total_exe_time); } hlogd("%s end", __func__); hmutex_unlock(&(pool->mutex)); } static void *test_routine(void *param) { hloge("%s th_id:%ld", __func__, hv_gettid()); hv_msleep(10); return NULL; } void threadpool_test(){ hlogw("%s start", __func__); threadpool *tp = threadpool_create(5); for (uint32_t i = 10; i>0; i--){ threadpool_add(tp, test_routine, NULL); hv_msleep(1); } hv_sleep(1); threadpool_destroy(tp); hlogw("%s end", __func__); } #if 0 kpacket *packet = kpacket_new(EVENT_HAL_DAYNIGHT, sizeof(uint32_t)); *packet->box->payload = IRCUT_NIGHT; threadpool_add(pool, timer_callback, packet, 1000); kpacket_dec(packet); #endif