#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <base/queue.h>
#include "threadpool.h"
#include "mutex_wrap.h"
#include "cond_wrap.h"
#include "tpool.h"
#undef __TPOOL_DEBUG
static inline void finit_pool(PTHREADPOOL_PRIV_t tpool)
{
if( tpool->mutex )
{
pthread_mutex_destroy(tpool->mutex);
free(tpool->mutex);
}
if( tpool->cv )
{
pthread_cond_destroy(tpool->cv);
free(tpool->cv);
}
tpool->mutex = NULL;
tpool->cv = NULL;
}
static inline int init_pool(PTHREADPOOL_PRIV_t tpool)
{
tpool->mutex = malloc(sizeof(pthread_mutex_t));
tpool->cv = malloc(sizeof(pthread_cond_t));
if( tpool->mutex == NULL || tpool->cv == NULL )
goto err;
pthread_mutex_init(tpool->mutex,NULL);
if( pthread_cond_init(tpool->cv,NULL) != 0 )
goto err;
return 0;
err:
if( tpool->mutex )
free(tpool->mutex);
if( tpool->cv )
free(tpool->cv);
return -1;
}
static void *work_thread(void *args)
{
PTHREADPOOL_PRIV_t tpool = (PTHREADPOOL_PRIV_t)args;
PDATANODE_t node;
int ret;
struct timespec timeout;
int timedout;
if( mutex_lock_wrap(tpool->mutex) != 0 )
return NULL;
for(;;)
{
timedout = 0;
while( TAILQ_EMPTY(&tpool->queue) && !tpool->quit )
{
clock_gettime(CLOCK_REALTIME,&timeout);
timeout.tv_sec += 3;
ret = pthread_cond_timedwait(tpool->cv,tpool->mutex,&timeout);
if( ret == ETIMEDOUT )
{
timedout = 1;
break;
}
else if( ret != 0 )
{
tpool->threadnums--;
mutex_unlock_wrap(tpool->mutex);
return NULL;
}
}
if( (node = TAILQ_FIRST(&tpool->queue)) != NULL )
{
TAILQ_REMOVE(&tpool->queue,node,entries);
mutex_unlock_wrap(tpool->mutex);
if( tpool->proc )
tpool->proc(node->task,tpool->alloc,tpool->free);
tpool->tfree(node->task);
tpool->free(node);
if( mutex_lock_wrap(tpool->mutex) != 0 )
return NULL;
}
if( tpool->quit )
{
tpool->threadnums--;
if( tpool->threadnums == 0 )
cond_broadcast_wrap(tpool->cv);
break;
}
if( TAILQ_FIRST(&tpool->queue) == NULL && timedout )
{
tpool->threadnums--;
break;
}
}
mutex_unlock_wrap(tpool->mutex);
#if defined(__TEST) || defined(__TPOOL_DEBUG)
printf("thread %u quit\n",(u_int)pthread_self());
#endif
return NULL;
}
PTHREADPOOL_t threadpool_create(int max_threads,THREADPOOL_ALLOC tp_alloc,THREADPOOL_FREE tp_free)
{
PTHREADPOOL_PRIV_t tpool;
if( tp_alloc == NULL )
tp_alloc = malloc;
if( tp_free == NULL )
tp_free = free;
if( (tpool = tp_alloc(sizeof(THREADPOOL_PRIV_t))) == NULL )
return NULL;
bzero(tpool,sizeof(THREADPOOL_PRIV_t));
tpool->alloc = tp_alloc;
tpool->free = tp_free;
tpool->proc = NULL;
tpool->tfree = tp_free;
tpool->max_threadnums = max_threads;
tpool->valid = THREADPOOL_VALID;
if( init_pool(tpool) != 0 )
{
tpool->free(tpool);
return NULL;
}
TAILQ_INIT(&tpool->queue);
#if 0
for( tpool->threadnums = 0;
tpool->threadnums < tpool->max_threadnums;
tpool->threadnums++ )
{
if( pthread_create(&i,NULL,(void *)work_thread,(void *)tpool) != 0 )
break;
pthread_detach(i);
}
if( tpool->threadnums == 0 )
{
finit_pool(tpool);
tpool->free(tpool);
return NULL;
}
for( i = 0; i < tpool->threadnums; i++ )
SEM_WAIT(tpool->threadstart);
#endif
return (PTHREADPOOL_t)tpool;
}
// 本函数必须在调用threadpool_create成功后马上调用。
void threadpool_set_proc(PTHREADPOOL_t tpool,THREADPOOL_PROC tp_proc,THREADPOOL_TASK_FREE tp_free)
{
PTHREADPOOL_PRIV_t tp = (PTHREADPOOL_PRIV_t)tpool;
tp->proc = tp_proc;
if( tp_free )
tp->tfree = tp_free;
}
PTHREADPOOL_t threadpool_create2(int max_threads,THREADPOOL_ALLOC tp_alloc,THREADPOOL_FREE tp_free,
THREADPOOL_PROC tp_proc,THREADPOOL_TASK_FREE tfree)
{
PTHREADPOOL_PRIV_t tpool = threadpool_create(max_threads,tp_alloc,tp_free);
if( tpool )
threadpool_set_proc((PTHREADPOOL_t)tpool,tp_proc,tfree);
return tpool;
}
int threadpool_dec_threadnums(PTHREADPOOL_t tpool,int decnum)
{
PTHREADPOOL_PRIV_t tp = (PTHREADPOOL_PRIV_t)tpool;
int i;
if( mutex_lock_wrap(tp->mutex) != 0 )
return -1;
if( tp->max_threadnums <= decnum )
{
i = -1;
goto err;
}
tp->max_threadnums -= decnum;
i = tp->max_threadnums;
#if defined(__TEST) || defined(__TPOOL_DEBUG)
printf("result nums:%d\n",tp->max_threadnums);
#endif
err:
mutex_unlock_wrap(tp->mutex);
return i;
}
int threadpool_inc_threadnums(PTHREADPOOL_t tpool,int incnum)
{
PTHREADPOOL_PRIV_t tp = (PTHREADPOOL_PRIV_t)tpool;
int i;
if( mutex_lock_wrap(tp->mutex) != 0 )
return -1;
tp->max_threadnums += incnum;
i = tp->max_threadnums;
#if defined(__TEST) || defined(__TPOOL_DEBUG)
printf("result nums:%d\n",tp->max_threadnums);
#endif
mutex_unlock_wrap(tp->mutex);
return i;
}
int threadpool_add_task(PTHREADPOOL_t tpool,PTASK_t task,int tryadd)
{
PTHREADPOOL_PRIV_t tp = (PTHREADPOOL_PRIV_t)tpool;
PDATANODE_t node;
int status;
if( tp->valid != THREADPOOL_VALID )
return -1;
if( tryadd )
status = mutex_trylock_wrap(tp->mutex);
else
status = mutex_lock_wrap(tp->mutex);
if( status != 0 )
return -1;
if( (node = tp->alloc(sizeof(DATANODE_t))) == NULL )
{
mutex_unlock_wrap(tp->mutex);
return -1;
}
node->task = task;
TAILQ_INSERT_TAIL(&tp->queue,node,entries);
#if 0
if( tp->idle > 0 )
{
status = cond_signal_wrap(tp->cv);
if( status != 0 )
{
mutex_unlock_wrap(tp->mutex);
return 0;
}
}
else if( tp->threadnums < tp->max_threadnums )
#endif
if( tp->threadnums < tp->max_threadnums )
{
pthread_t id;
status = pthread_create(&id,NULL,work_thread,(void *)tp);
if( status != 0 )
{
mutex_unlock_wrap(tp->mutex);
return 0;
}
pthread_detach(id);
tp->threadnums++;
}
cond_signal_wrap(tp->cv);
mutex_unlock_wrap(tp->mutex);
return 0;
}
int threadpool_get_tasks_count(PTHREADPOOL_t tpool,int tryget)
{
PTHREADPOOL_PRIV_t tp = (PTHREADPOOL_PRIV_t)tpool;
PDATANODE_t var;
int count,ret;
if( tryget )
ret = mutex_trylock_wrap(tp->mutex);
else
ret = mutex_lock_wrap(tp->mutex);
if( ret != 0 )
return -1;
count = 0;
TAILQ_FOREACH(var,&tp->queue,entries)
{
count++;
}
mutex_unlock_wrap(tp->mutex);
return count;
}
int threadpool_close(PTHREADPOOL_t tpool)
{
PTHREADPOOL_PRIV_t tp = (PTHREADPOOL_PRIV_t)tpool;
PDATANODE_t var;
int status;
if( tp->valid != THREADPOOL_VALID )
return EINVAL;
if( (status = mutex_lock_wrap(tp->mutex)) != 0 )
return status;
#if defined(__TEST) || defined(__TPOOL_DEBUG)
printf("threadpool_close thread nums:%d\n",tp->threadnums);
#endif
if( tp->threadnums > 0 )
评论0
最新资源