411 lines
12 KiB
C
411 lines
12 KiB
C
|
/*
|
||
|
* =====================================================================================
|
||
|
* Copyright (c), 2013-2020, Jz.
|
||
|
* Filename: ringbuffer.h
|
||
|
* Author: linux kernel,
|
||
|
* Organization: jz
|
||
|
*
|
||
|
* =====================================================================================
|
||
|
*/
|
||
|
/*************************************************
|
||
|
File name : ringbuffer.h
|
||
|
Module : common
|
||
|
Author : amir
|
||
|
Created on : 2023-01-6
|
||
|
Description :
|
||
|
a ringbuffer to store data(copy)
|
||
|
|
||
|
Modify History:
|
||
|
1. Date: Author: Modification:
|
||
|
202407 罗启宏 free时packet重复释放
|
||
|
*************************************************/
|
||
|
#include <stdlib.h>
|
||
|
#include <stdint.h>
|
||
|
#include <errno.h>
|
||
|
#include "list.h"
|
||
|
#include "htime.h"
|
||
|
#include "hthread.h"
|
||
|
#include "hbase.h"
|
||
|
#include "hlog.h"
|
||
|
#include "hmutex.h"
|
||
|
|
||
|
#include "threadpool.h"
|
||
|
|
||
|
|
||
|
#define TAG "TAG_THREADPOOL"
|
||
|
|
||
|
#define POOL_DEGUG 0
|
||
|
|
||
|
enum{
|
||
|
TASKLIST_FREE = 0,//unuse queue
|
||
|
TASKLIST_BUSYING, //normal task queue, all threads can run
|
||
|
TASKLIST_MAX,
|
||
|
}TASKLIST_TYPE;
|
||
|
|
||
|
|
||
|
/**
|
||
|
* @struct threadpool_task
|
||
|
* @brief the work struct
|
||
|
*
|
||
|
* @var function Pointer to the function that will perform the task.
|
||
|
* @var argument Argument to be passed to the function.
|
||
|
*/
|
||
|
|
||
|
typedef struct {
|
||
|
struct list_head link;
|
||
|
thread_routine routine;
|
||
|
kpacket *packet;
|
||
|
} threadpool_task_s;
|
||
|
|
||
|
|
||
|
typedef struct {
|
||
|
uint8_t busying;
|
||
|
uint32_t max_exe_time;
|
||
|
uint32_t total_exe_time;
|
||
|
uint32_t total_exe_cnt;
|
||
|
uint32_t th_idx;
|
||
|
hthread_t th;
|
||
|
} thread_info_s;
|
||
|
|
||
|
struct threadpool_s{
|
||
|
kpacket ref;
|
||
|
hmutex_t mutex;
|
||
|
hsem_t sem;
|
||
|
thread_info_s *threads_info;
|
||
|
uint8_t exit;
|
||
|
uint8_t thread_count;
|
||
|
uint8_t cur_th_idx; /*当前的id*/
|
||
|
uint8_t max_thread_count;
|
||
|
uint8_t all_thread_busy_times;
|
||
|
struct list_head route_queue[TASKLIST_MAX];
|
||
|
};
|
||
|
|
||
|
#define is_tasklist_empty(_QUEUE) list_empty(pool->route_queue[_QUEUE].next)
|
||
|
|
||
|
|
||
|
HTHREAD_ROUTINE(threadpool_thread){
|
||
|
threadpool *pool = (threadpool *)userdata;
|
||
|
hmutex_lock(&(pool->mutex));
|
||
|
thread_info_s *this_info = &pool->threads_info[pool->cur_th_idx];
|
||
|
this_info->th_idx = pool->cur_th_idx;
|
||
|
pool->cur_th_idx++;
|
||
|
hmutex_unlock(&(pool->mutex));
|
||
|
hlogd("%s start:%p th_idx:%u", __func__, userdata, this_info->th_idx);
|
||
|
while(!pool->exit) {
|
||
|
threadpool_task_s *task = NULL;
|
||
|
hmutex_lock(&(pool->mutex));
|
||
|
if (!is_tasklist_empty(TASKLIST_BUSYING)){
|
||
|
task = list_entry(pool->route_queue[TASKLIST_BUSYING].next, threadpool_task_s, link);
|
||
|
list_del(&task->link);
|
||
|
}
|
||
|
if (!task){
|
||
|
if(!pool->exit){
|
||
|
#if POOL_DEGUG
|
||
|
hlogd("wait th_idx:%u normal:%u", this_info->th_idx, is_tasklist_empty(TASKLIST_BUSYING));
|
||
|
#endif
|
||
|
hmutex_unlock(&(pool->mutex));
|
||
|
hsem_wait(&(pool->sem));
|
||
|
//hlogd("th_idx:%u", this_info->th_idx);
|
||
|
}
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
/* Get to work */
|
||
|
kpacket *packet = task->packet;
|
||
|
uint64_t elasped = gettimeofday_ms();
|
||
|
#if POOL_DEGUG
|
||
|
hlogd("task->routine:%p", task->routine);
|
||
|
#endif
|
||
|
this_info->busying = 1;
|
||
|
hmutex_unlock(&(pool->mutex));
|
||
|
task->routine(packet);
|
||
|
hmutex_lock(&(pool->mutex));
|
||
|
this_info->busying = 0;
|
||
|
elasped = gettimeofday_ms() - elasped;
|
||
|
if (elasped > this_info->max_exe_time){
|
||
|
this_info->max_exe_time = elasped;
|
||
|
}
|
||
|
this_info->total_exe_cnt ++;
|
||
|
this_info->total_exe_time += elasped;
|
||
|
packet ? kpacket_dec(packet) : 0;
|
||
|
list_add(&task->link, &pool->route_queue[TASKLIST_FREE]);
|
||
|
hmutex_unlock(&(pool->mutex));
|
||
|
}
|
||
|
}
|
||
|
hmutex_lock(&(pool->mutex));
|
||
|
pool->thread_count--;
|
||
|
hmutex_unlock(&(pool->mutex));
|
||
|
hlogw("%s exit thread_id:%ld\n",__func__, hv_gettid());
|
||
|
return(NULL);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @function void *threadpool_thread(void *threadpool)
|
||
|
* @brief the worker thread
|
||
|
* @param threadpool the pool which own the thread
|
||
|
*/
|
||
|
|
||
|
static uint32_t threadpool_free(threadpool *pool)
|
||
|
{
|
||
|
if (!pool) {
|
||
|
return -1;
|
||
|
}
|
||
|
/* Did we manage to allocate ? */
|
||
|
if (pool->threads_info) {
|
||
|
/* Because we allocate pool->threads_info after initializing the
|
||
|
mutex and condition variable, we're sure they're
|
||
|
initialized. Let's mutex the mutex just in case. */
|
||
|
hmutex_lock(&(pool->mutex));
|
||
|
struct list_head *pos, *n;
|
||
|
uint32_t i;
|
||
|
for(i=0; i<TASKLIST_MAX; i++){
|
||
|
list_for_each_safe(pos, n, &pool->route_queue[i]){
|
||
|
threadpool_task_s* task = list_entry(pos, threadpool_task_s, link);
|
||
|
list_del(pos);
|
||
|
if (task->packet && i != TASKLIST_FREE){
|
||
|
kpacket_dec(task->packet);
|
||
|
}
|
||
|
hv_free(task);
|
||
|
}
|
||
|
}
|
||
|
hmutex_unlock(&(pool->mutex));
|
||
|
hmutex_destroy(&(pool->mutex));
|
||
|
hsem_destroy(&(pool->sem));
|
||
|
hv_free(pool->threads_info);
|
||
|
pool->threads_info = NULL;
|
||
|
}
|
||
|
hv_free(pool);
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
threadpool *threadpool_create(uint32_t max_thread_cnt)
|
||
|
{
|
||
|
threadpool *pool;
|
||
|
max_thread_cnt = max_thread_cnt <= 0 || max_thread_cnt > MAX_THREADS?MAX_THREADS:max_thread_cnt;
|
||
|
if ((pool = (threadpool *)hv_calloc(1, sizeof(threadpool))) == NULL) {
|
||
|
goto err;
|
||
|
}
|
||
|
|
||
|
/* Initialize */
|
||
|
INIT_LIST_HEAD(&pool->route_queue[TASKLIST_BUSYING]);
|
||
|
INIT_LIST_HEAD(&pool->route_queue[TASKLIST_FREE]);
|
||
|
pool->max_thread_count = max_thread_cnt;
|
||
|
|
||
|
/* Allocate thread and task queue */
|
||
|
pool->threads_info = (thread_info_s *)calloc(1, sizeof(thread_info_s) * max_thread_cnt);
|
||
|
/* Initialize mutex and conditional variable first */
|
||
|
if ((hmutex_init(&pool->mutex) != 0) ||
|
||
|
(hsem_init(&pool->sem, 0) != 0) ||
|
||
|
(pool->threads_info == NULL)) {
|
||
|
goto err;
|
||
|
}
|
||
|
return pool;
|
||
|
|
||
|
err:
|
||
|
if (pool) {
|
||
|
threadpool_free(pool);
|
||
|
}
|
||
|
return NULL;
|
||
|
}
|
||
|
static uint32_t threadpool_free_thread_count_locked(threadpool *pool){
|
||
|
uint32_t i, count = 0;
|
||
|
for(i=0; i < pool->thread_count && i<pool->max_thread_count; i++){
|
||
|
thread_info_s *info = &pool->threads_info[i];
|
||
|
count += info->busying ? 0 : 1;
|
||
|
}
|
||
|
//hlogd("%s %u", __func__, count);
|
||
|
return count;
|
||
|
}
|
||
|
static uint32_t threadpool_add_internal(threadpool *pool, thread_routine routine, kpacket *packet){
|
||
|
uint32_t err = 0;
|
||
|
if (pool == NULL || routine == NULL)
|
||
|
{
|
||
|
hlogd("ERR pool = %p or routine=%p", pool, routine);
|
||
|
return EINVAL;
|
||
|
}
|
||
|
if (hmutex_lock(&(pool->mutex)) != 0)
|
||
|
{
|
||
|
hlogd("ERR hmutex_lock");
|
||
|
return EPERM;
|
||
|
}
|
||
|
do {
|
||
|
/* Are we shutting down ? */
|
||
|
if (pool->exit) {
|
||
|
err = EBADRQC;
|
||
|
break;
|
||
|
}
|
||
|
threadpool_task_s *task;
|
||
|
if (is_tasklist_empty(TASKLIST_FREE)){
|
||
|
task = (threadpool_task_s *)hv_malloc(sizeof(threadpool_task_s));
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
task = list_entry(pool->route_queue[TASKLIST_FREE].next, threadpool_task_s, link);
|
||
|
list_del(&task->link);
|
||
|
}
|
||
|
|
||
|
if (!task)
|
||
|
{
|
||
|
err = ENOMEM;
|
||
|
break;
|
||
|
}
|
||
|
task->routine = routine;
|
||
|
task->packet = packet;
|
||
|
(void)(packet ? kpacket_inc(packet) : 0);
|
||
|
//if block task then add to block_list otherwise normal ready_list
|
||
|
//static cnt; hlogd("cnt:%u",cnt++);
|
||
|
list_add_tail(&task->link,&(pool->route_queue[TASKLIST_BUSYING]));
|
||
|
if (threadpool_free_thread_count_locked(pool) == 0 && pool->thread_count < pool->max_thread_count)
|
||
|
{
|
||
|
pool->threads_info[pool->thread_count].th = hthread_create(threadpool_thread, pool);
|
||
|
pool->thread_count++;
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
pool->all_thread_busy_times++;
|
||
|
|
||
|
// hloge("%s hv_free_threads:%u, thread_count:%u, max_thread_count:%u",__func__, threadpool_free_thread_count_locked(pool), pool->thread_count, pool->max_thread_count);
|
||
|
|
||
|
}
|
||
|
if (hsem_post(&pool->sem) != 0) {
|
||
|
err = EPERM;
|
||
|
break;
|
||
|
}
|
||
|
} while(0);
|
||
|
|
||
|
if (hmutex_unlock(&pool->mutex) != 0)
|
||
|
{
|
||
|
err = EPERM;
|
||
|
}
|
||
|
return err;
|
||
|
}
|
||
|
|
||
|
uint32_t threadpool_add(threadpool *pool, thread_routine routine, kpacket *packet)
|
||
|
{
|
||
|
return threadpool_add_internal(pool, routine, packet);
|
||
|
}
|
||
|
|
||
|
|
||
|
uint32_t threadpool_del(threadpool *pool, thread_routine routine){
|
||
|
uint32_t err = 0;
|
||
|
if (routine == NULL || pool == NULL) {
|
||
|
hloge("%s err routine:%p(!NULL) spool:%p(!NULL)",__func__, routine, pool );
|
||
|
return EINVAL;
|
||
|
}
|
||
|
if (hmutex_lock(&(pool->mutex)) != 0) {
|
||
|
return EPERM;
|
||
|
}
|
||
|
struct list_head *pos, *n;
|
||
|
uint32_t i;
|
||
|
for(i=0; i<TASKLIST_MAX; i++){
|
||
|
list_for_each_safe(pos, n, &pool->route_queue[i]){
|
||
|
threadpool_task_s* task = list_entry(pos, threadpool_task_s, link);
|
||
|
if (task->routine == routine){
|
||
|
list_del(pos);
|
||
|
if (task->packet && i != TASKLIST_FREE){
|
||
|
kpacket_dec(task->packet);
|
||
|
}
|
||
|
hv_free(task);
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
if (hmutex_unlock(&pool->mutex) != 0) {
|
||
|
err = EPERM;
|
||
|
}
|
||
|
return err;
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
uint32_t threadpool_destroy(threadpool *pool)
|
||
|
{
|
||
|
uint32_t i, thread_cnt, err = 0;
|
||
|
if (pool == NULL) {
|
||
|
return EINVAL;
|
||
|
}
|
||
|
if (hmutex_lock(&(pool->mutex)) != 0) {
|
||
|
return EPERM;
|
||
|
}
|
||
|
do {
|
||
|
/* Already shutting down */
|
||
|
if (pool->exit) {
|
||
|
err = EBADRQC;
|
||
|
break;
|
||
|
}
|
||
|
thread_cnt = pool->thread_count;
|
||
|
pool->exit = 1;
|
||
|
hmutex_unlock(&(pool->mutex));
|
||
|
/* then Wake up all worker threads */
|
||
|
for(i=0; i<thread_cnt; i++){
|
||
|
if ((hsem_post(&(pool->sem)) != 0)) {
|
||
|
err = EPERM;
|
||
|
thread_info_s *info = &pool->threads_info[i];
|
||
|
hloge("%s err hsem_post th_idx:%d %p", __func__, info->th_idx, info->th);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
/* Join all worker thread */
|
||
|
//hloge("%s thread_cnt %u", __func__, thread_cnt);
|
||
|
for(i = 0; i < thread_cnt && err == 0; i++) {
|
||
|
uint64_t start_ms = gettimeofday_ms();
|
||
|
thread_info_s *info = &pool->threads_info[i];
|
||
|
if (hthread_join(info->th) != 0) {
|
||
|
err = ESRCH;
|
||
|
hloge("%s hthread_join th_idx:%d %p", __func__, info->th_idx, info->th);
|
||
|
}else {
|
||
|
hlogw("%s %u/%u hthread_join th_idx:%d %p elapsed:%ul ms", __func__, i+1, thread_cnt, info->th_idx, info->th, gettimeofday_ms() - start_ms);
|
||
|
}
|
||
|
}
|
||
|
} while(0);
|
||
|
|
||
|
/* Only if everything went well do we deallocate the pool */
|
||
|
if (!err) {
|
||
|
threadpool_free(pool);
|
||
|
}
|
||
|
hlogw("%s",__func__);
|
||
|
return err;
|
||
|
}
|
||
|
|
||
|
void threadpool_dump(threadpool *pool){
|
||
|
hmutex_lock(&(pool->mutex));
|
||
|
uint32_t i;
|
||
|
hlogd("%s start", __func__);
|
||
|
hlogd("hv_free_thread:%u busy_times:%u max_thread_count:%u thread_count:%u",threadpool_free_thread_count_locked(pool), pool->all_thread_busy_times, pool->max_thread_count, pool->thread_count);
|
||
|
for(i=0; i< pool->thread_count && i<pool->max_thread_count; i++){
|
||
|
thread_info_s *info = &pool->threads_info[i];
|
||
|
hlogd("thread_info_s:%02d busy:%d exe_cnt:%u max_exe_time:%u total_exe_time:%u", info->th_idx, info->busying, info->total_exe_cnt, info->max_exe_time, info->total_exe_time);
|
||
|
}
|
||
|
hlogd("%s end", __func__);
|
||
|
hmutex_unlock(&(pool->mutex));
|
||
|
|
||
|
}
|
||
|
|
||
|
static void *test_routine(void *param)
|
||
|
{
|
||
|
hloge("%s th_id:%ld", __func__, hv_gettid());
|
||
|
hv_msleep(10);
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
void threadpool_test(){
|
||
|
hlogw("%s start", __func__);
|
||
|
threadpool *tp = threadpool_create(5);
|
||
|
for (uint32_t i = 10; i>0; i--){
|
||
|
threadpool_add(tp, test_routine, NULL);
|
||
|
hv_msleep(1);
|
||
|
}
|
||
|
hv_sleep(1);
|
||
|
threadpool_destroy(tp);
|
||
|
hlogw("%s end", __func__);
|
||
|
}
|
||
|
|
||
|
#if 0
|
||
|
kpacket *packet = kpacket_new(EVENT_HAL_DAYNIGHT, sizeof(uint32_t));
|
||
|
*packet->box->payload = IRCUT_NIGHT;
|
||
|
threadpool_add(pool, timer_callback, packet, 1000);
|
||
|
kpacket_dec(packet);
|
||
|
#endif
|
||
|
|