/*
* @author: boyce
* @contact: boyce.ywr#gmail.com (# -> @)
* @version: 1.01
* @created: 2011-07-25
* @modified: 2011-08-04
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "thread_pool.h"
//#define __DEBUG__
#ifdef __DEBUG__
#define DEBUG(format,...) printf(format,##__VA_ARGS__)
#else
#define DEBUG(format,...)
#endif
static TpThreadInfo *tp_add_thread(TpThreadPool *pTp);
static TPBOOL tp_delete_thread(TpThreadPool *pTp);
static int tp_get_tp_status(TpThreadPool *pTp);
static void *tp_work_thread(void *pthread);
static void *tp_manage_thread(void *pthread);
static void def_proc_fun(TpWorkDesc *job){
//do nothing
}
/**
* user interface. creat thread pool.
* para:
* num: min thread number to be created in the pool
* return:
* thread pool struct instance be created successfully
*/
TpThreadPool *tp_create(unsigned min_num, unsigned max_num) {
TpThreadPool *pTp;
pTp = (TpThreadPool*) malloc(sizeof(TpThreadPool));
memset(pTp, 0, sizeof(TpThreadPool));
//init member var
pTp->min_th_num = min_num;
pTp->cur_th_num = min_num;
pTp->max_th_num = max_num;
pthread_mutex_init(&pTp->tp_lock, NULL);
//malloc mem for num thread info struct
if (NULL != pTp->thread_info)
free(pTp->thread_info);
pTp->thread_info = (TpThreadInfo*) malloc(sizeof(TpThreadInfo) * pTp->max_th_num);
memset(pTp->thread_info, 0, sizeof(TpThreadInfo) * pTp->max_th_num);
return pTp;
}
/**
* member function reality. thread pool init function.
* para:
* pTp: thread pool struct instance ponter
* return:
* true: successful; false: failed
*/
TPBOOL tp_init(TpThreadPool *pTp) {
int i;
int err;
TpThreadInfo *pThi;
initQueue(&pTp->idle_q, NULL);
pTp->stop_flag = FALSE;
//create work thread and init work thread info
for (i = 0; i < pTp->min_th_num; i++) {
pThi = pTp->thread_info +i;
pThi->tp_pool = pTp;
pThi->is_busy = FALSE;
pthread_cond_init(&pThi->thread_cond, NULL);
pthread_mutex_init(&pThi->thread_lock, NULL);
pThi->proc_fun = def_proc_fun;
pThi->th_job = NULL;
enQueue(&pTp->idle_q, pThi);
err = pthread_create(&pThi->thread_id, NULL, tp_work_thread, pThi);
if (0 != err) {
perror("tp_init: create work thread failed.");
clearQueue(&pTp->idle_q);
return FALSE;
}
}
//create manage thread
err = pthread_create(&pTp->manage_thread_id, NULL, tp_manage_thread, pTp);
if (0 != err) {
clearQueue(&pTp->idle_q);
printf("tp_init: creat manage thread failed\n");
return FALSE;
}
return TRUE;
}
/**
* member function reality. thread pool entirely close function.
* para:
* pTp: thread pool struct instance ponter
* return:
*/
void tp_close(TpThreadPool *pTp, TPBOOL wait) {
unsigned i;
pTp->stop_flag = TRUE;
if (wait) {
for (i = 0; i < pTp->cur_th_num; i++) {
pthread_cond_signal(&pTp->thread_info[i].thread_cond);
}
for (i = 0; i < pTp->cur_th_num; i++) {
pthread_join(pTp->thread_info[i].thread_id, NULL);
pthread_mutex_destroy(&pTp->thread_info[i].thread_lock);
pthread_cond_destroy(&pTp->thread_info[i].thread_cond);
}
} else {
//close work thread
for (i = 0; i < pTp->cur_th_num; i++) {
kill((pid_t)pTp->thread_info[i].thread_id, SIGKILL);
pthread_mutex_destroy(&pTp->thread_info[i].thread_lock);
pthread_cond_destroy(&pTp->thread_info[i].thread_cond);
}
}
//close manage thread
kill((pid_t)pTp->manage_thread_id, SIGKILL);
pthread_mutex_destroy(&pTp->tp_lock);
clearQueue(&pTp->idle_q);
//free thread struct
free(pTp->thread_info);
pTp->thread_info = NULL;
}
/**
* member function reality. main interface opened.
* after getting own worker and job, user may use the function to process the task.
* para:
* pTp: thread pool struct instance ponter
* worker: user task reality.
* job: user task para
* return:
*/
TPBOOL tp_process_job(TpThreadPool *pTp, process_job proc_fun, TpWorkDesc *job) {
TpThreadInfo *pThi ;
//fill pTp->thread_info's relative work key
//pthread_mutex_lock(&pTp->tp_lock);
pThi = (TpThreadInfo *) deQueue(&pTp->idle_q);
//pthread_mutex_unlock(&pTp->tp_lock);
if(pThi){
pThi->is_busy =TRUE;
pThi->proc_fun = proc_fun;
pThi->th_job = job;
pthread_cond_signal(&pThi->thread_cond);
DEBUG("Fetch a thread from pool.\n");
return TRUE;
}
//if all current thread are busy, new thread is created here
pthread_mutex_lock(&pTp->tp_lock);
pThi = tp_add_thread(pTp);
pthread_mutex_unlock(&pTp->tp_lock);
if(!pThi){
DEBUG("The thread pool is full, no more thread available.\n");
return FALSE;
}
DEBUG("No more idle thread, created a new one.\n");
pThi->proc_fun = proc_fun;
pThi->th_job = job;
//send cond to work thread
pthread_cond_signal(&pThi->thread_cond);
return TRUE;
}
/**
* member function reality. add new thread into the pool.
* para:
* pTp: thread pool struct instance ponter
* return:
* true: successful; false: failed
*/
TpThreadInfo *tp_add_thread(TpThreadPool *pTp) {
int err;
TpThreadInfo *new_thread;
if (pTp->max_th_num <= pTp->cur_th_num)
return NULL;
//malloc new thread info struct
new_thread = pTp->thread_info + pTp->cur_th_num;
new_thread->tp_pool = pTp;
//init new thread's cond & mutex
pthread_cond_init(&new_thread->thread_cond, NULL);
pthread_mutex_init(&new_thread->thread_lock, NULL);
//init status is busy, only new process job will call this function
new_thread->is_busy = TRUE;
err = pthread_create(&new_thread->thread_id, NULL, tp_work_thread, new_thread);
if (0 != err) {
free(new_thread);
return NULL;
}
//add current thread number in the pool.
pTp->cur_th_num++;
return new_thread;
}
/**
* member function reality. delete idle thread in the pool.
* only delete last idle thread in the pool.
* para:
* pTp: thread pool struct instance ponter
* return:
* true: successful; false: failed
*/
TPBOOL tp_delete_thread(TpThreadPool *pTp) {
unsigned idx;
TpThreadInfo *pThi;
TpThreadInfo tT;
//current thread num can't < min thread num
if (pTp->cur_th_num <= pTp->min_th_num)
return FALSE;
//pthread_mutex_lock(&pTp->tp_lock);
pThi = deQueue(&pTp->idle_q);
//pthread_mutex_unlock(&pTp->tp_lock);
if(!pThi)
return FALSE;
//after deleting idle thread, current thread num -1
pTp->cur_th_num--;
memcpy(&tT, pThi, sizeof(TpThreadInfo));
memcpy(pThi, pTp->thread_info + pTp->cur_th_num, sizeof(TpThreadInfo));
//kill the idle thread and free info struct
kill((pid_t)tT.thread_id, SIGKILL);
pthread_mutex_destroy(&tT.thread_lock);
pthread_cond_destroy(&tT.thread_cond);
return TRUE;
}
/**
* member function reality. get current thread pool status:idle, normal, busy, .etc.
* para:
* pTp: thread pool struct instance ponter
* return:
* 0: idle; 1: normal or busy(don't process)
*/
int tp_get_tp_status(TpThreadPool *pTp) {
float busy_num = 0.0;
int i;
//get busy thread number
busy_num = pTp->cur_th_num - pTp->idle_q.count;
DEBUG("Current thread pool status, current num: %u, busy num: %u, idle num: %u\n", pTp->cur_th_num, (unsigned)busy_num, pTp->idle_q.count);
//0.2? or other num?
if (busy_num / (pTp->cur_th_num) < BUSY_THRESHOLD)
return 0;//idle status
else
return 1;//busy or normal status
}
/**
* internal interface. real work thread.
* para:
* pthread: thread pool struct ponter
* return:
*/
static void *tp_work_thread(void *arg) {
pthread_t curid;//current thread id
TpThreadInfo *pTinfo = (TpThreadInfo *) arg;
//wait cond for processing real job.
while (!(pTinfo->tp_pool->stop_flag)) {
pthread_mutex_lock(&pTinfo->thread_lock);
pthread_cond_wait(&pTinfo->thread_cond, &pTinfo->thread_lock);
pthread_mutex_unlock(&pTinfo->thread_lock);
//process
pTinfo->proc_fun(pTinfo->th_job);
//thread state be s
thd_pool_v1.01.tar.gz_THDPOOL
版权申诉
93 浏览量
2022-09-19
20:35:57
上传
评论
收藏 4KB GZ 举报
朱moyimi
- 粉丝: 61
- 资源: 1万+
最新资源
- Surfer,线性函数
- MyBatis 的动态 SQL 是其核心特性之一.txt
- 时代的sdddsddsddsd
- 基于哈希链表的简单人员信息管理系统
- 其他类别JdonFramework开源框架 v5.1 Build20071025-jdonframework-5.1.rar
- 2001~2022年上市公司数字赋能指数.dta
- 2001~2022年上市公司数字赋能指数.xlsx
- 信息办公石大在线财务管理系统(含源码)-shidacaiwu.rar
- 信息办公电信计费系统完整代码-netctossconformity.rar
- matlab实现TD-SCDMA中初始同步捕捉DwPTS下行同步导频时隙的仿真.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈