#include <threadpool.h>
static t_pool_t *g_pools;
void set_worker_status(int index, int status)
{
g_pools[index].status = status;
}
int get_worker_status(int index)
{
int status;
status = g_pools[index].status;
return status;
}
void * thread_handler(void *arg)
{
int id;
t_pool_t *myend;
taskque_t *tque;
t_e_t curtask;
id = (int)arg;
myend = g_pools + id;
tque = myend->tque;
while (1) {
set_worker_status(id, IDLE);
pthread_mutex_lock(&myend->lock);
pthread_cond_wait(&myend->cond,
&myend->lock);
if (task_is_empty(tque)) {
pthread_mutex_unlock(&myend->lock);
continue;
}
set_worker_status(id, BUSY);
pthread_mutex_unlock(&myend->lock);
while (1) {
pop_task(tque, &curtask);
(curtask.task)(curtask.arg);
if (task_is_empty(tque))
break;
}
}
}
void wait_threads_ready(t_pool_t *ppool, int num)
{
int i;
for (i = 0; i < num; i++) {
while (ppool[i].status == UNASSIGNED) {
sched_yield();
}
}
}
void create_threads(t_pool_t *ppool, int num)
{
pthread_t tid;
pthread_attr_t attr;
int i, ret;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr,
PTHREAD_CREATE_DETACHED);
for (i = 0; i < num; i++) {
ret = pthread_create(&tid, &attr,
thread_handler, (void *)i);
if (ret) {
fprintf(stderr, "%s\n",
strerror(ret));
return;
}
ppool[i].tid = tid;
}
pthread_attr_destroy(&attr);
return;
}
t_pool_t * create_thread_pool(int num)
{
int i;
t_pool_t *ppool;
taskque_t *pque;
if (i <= 0) {
fprintf(stderr, "invalied argument!\n");
return NULL;
}
//allocate memory for thread pool
ppool = (t_pool_t *)malloc(
num * sizeof(t_pool_t));
if (!ppool) {
perror("malloc");
return NULL;
}
g_pools = ppool;
// create task que
pque = create_taskque();
if (!pque) {
free(ppool);
return NULL;
}
// init thread_pool
for (i = 0; i < num; i++) {
pthread_mutex_init(&ppool[i].lock, NULL);
pthread_cond_init(&ppool[i].cond, NULL);
ppool[i].tque = pque;
ppool[i].poolnum = num;
ppool[i].myself = i;
ppool[i].status = UNASSIGNED;
}
// create threads
create_threads(ppool, num);
// wait all threads ready
wait_threads_ready(ppool, num);
return ppool;
}
void notify_thread(t_pool_t *threadpool)
{
int threadsnum, i;
threadsnum = threadpool[0].poolnum;
for (i = 0; i < threadsnum; i++) {
pthread_mutex_lock(&threadpool[i].lock);
if (IDLE ==
threadpool[i].status) {
pthread_cond_signal(&threadpool[i].cond);
pthread_mutex_unlock(&threadpool[i].lock);
break;
}
pthread_mutex_unlock(&threadpool[i].lock);
}
}
void add_task_to_thread_pool(t_pool_t *threadpool,
que_task task, void *arg)
{
int ret;
taskque_t *tque;
t_e_t telement;
telement.arg = arg;
telement.task = task;
tque = threadpool[0].tque;
// add task to task queue
ret = push_task(&telement, tque);
if (ret == -1)
return;
// notify thread
notify_thread(threadpool);
}
评论0