/********************************************************
* An example source module to accompany...
*
* "Using POSIX Threads: Programming with Pthreads"
* by Brad nichols, Dick Buttlar, Jackie Farrell
* O'Reilly & Associates, Inc.
*
********************************************************
* tpool.c --
*
* Example thread pooling library
*/
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <pthread.h>
#include "tpool.h"
void *tpool_thread(void *);
void tpool_init(tpool_t *tpoolp,
int num_worker_threads,
int max_queue_size,
int do_not_block_when_full)
{
int i, rtn;
tpool_t tpool;
/* allocate a pool data structure */
if ((tpool = (tpool_t )malloc(sizeof(struct tpool))) == NULL)
perror("malloc"), exit(1);
/* initialize th fields */
tpool->num_threads = num_worker_threads;
tpool->max_queue_size = max_queue_size;
tpool->do_not_block_when_full = do_not_block_when_full;
if ((tpool->threads =
(pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads))
== NULL)
perror("malloc"), exit(1);
tpool->cur_queue_size = 0;
tpool->queue_head = NULL;
tpool->queue_tail = NULL;
tpool->queue_closed = 0;
tpool->shutdown = 0;
if ((rtn = pthread_mutex_init(&(tpool->queue_lock), NULL)) != 0)
fprintf(stderr,"pthread_mutex_init %s",strerror(rtn)), exit(1);
if ((rtn = pthread_cond_init(&(tpool->queue_not_empty), NULL)) != 0)
fprintf(stderr,"pthread_cond_init %s",strerror(rtn)), exit(1);
if ((rtn = pthread_cond_init(&(tpool->queue_not_full), NULL)) != 0)
fprintf(stderr,"pthread_cond_init %s",strerror(rtn)), exit(1);
if ((rtn = pthread_cond_init(&(tpool->queue_empty), NULL)) != 0)
fprintf(stderr,"pthread_cond_init %s",strerror(rtn)), exit(1);
/* create threads */
for (i = 0; i != num_worker_threads; i++) {
if ((rtn = pthread_create( &(tpool->threads[i]),
NULL,
tpool_thread,
(void *)tpool)) != 0)
fprintf(stderr,"pthread_create %d",rtn), exit(1);
}
*tpoolp = tpool;
}
int tpool_add_work(
tpool_t tpool,
void (*routine)(void *),
void *arg)
{
int rtn;
tpool_work_t *workp;
if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);
/* no space and this caller doesn't want to wait */
if ((tpool->cur_queue_size == tpool->max_queue_size) &&
tpool->do_not_block_when_full) {
if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);
return -1;
}
while( (tpool->cur_queue_size == tpool->max_queue_size) &&
(!(tpool->shutdown || tpool->queue_closed)) ) {
if ((rtn = pthread_cond_wait(&(tpool->queue_not_full),
&(tpool->queue_lock))) != 0)
fprintf(stderr,"pthread_cond_wait %d",rtn), exit(1);
}
/* the pool is in the process of being destroyed */
if (tpool->shutdown || tpool->queue_closed) {
if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);
return -1;
}
/* allocate work structure */
if ((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL)
perror("malloc"), exit(1);
workp->routine = routine;
workp->arg = arg;
workp->next = NULL;
printf("adder: adding an item %d\n", workp->routine);
if (tpool->cur_queue_size == 0) {
tpool->queue_tail = tpool->queue_head = workp;
printf("adder: queue == 0, waking all workers\n");
if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_empty))) != 0)
fprintf(stderr,"pthread_cond_signal %d",rtn), exit(1);;
} else {
tpool->queue_tail->next = workp;
tpool->queue_tail = workp;
}
tpool->cur_queue_size++;
if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);
return 1;
}
int tpool_destroy(tpool_t tpool,
int finish)
{
int i,rtn;
tpool_work_t *cur_nodep;
if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);
/* Is a shutdown already in progress? */
if (tpool->queue_closed || tpool->shutdown) {
if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);
return 0;
}
tpool->queue_closed = 1;
/* If the finish flag is set, wait for workers to
drain queue */
if (finish == 1) {
while (tpool->cur_queue_size != 0) {
if ((rtn = pthread_cond_wait(&(tpool->queue_empty),
&(tpool->queue_lock))) != 0)
fprintf(stderr,"pthread_cond_wait %d",rtn), exit(1);
}
}
tpool->shutdown = 1;
if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);
/* Wake up any workers so they recheck shutdown flag */
if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_empty))) != 0)
fprintf(stderr,"pthread_cond_broadcast %d",rtn), exit(1);
if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_full))) != 0)
fprintf(stderr,"pthread_cond_broadcast %d",rtn), exit(1);
/* Wait for workers to exit */
for(i=0; i < tpool->num_threads; i++) {
if ((rtn = pthread_join(tpool->threads[i],NULL)) != 0)
fprintf(stderr,"pthread_join %d",rtn), exit(1);
}
/* Now free pool structures */
free(tpool->threads);
while(tpool->queue_head != NULL) {
cur_nodep = tpool->queue_head->next;
tpool->queue_head = tpool->queue_head->next;
free(cur_nodep);
}
free(tpool);
}
void *tpool_thread(void *arg)
{
tpool_t tpool = (tpool_t)arg;
int rtn;
tpool_work_t *my_workp;
for(;;) {
/* Check queue for work */
if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);
while ((tpool->cur_queue_size == 0) && (!tpool->shutdown)) {
printf("worker %d: I'm sleeping again\n", pthread_self());
if ((rtn = pthread_cond_wait(&(tpool->queue_not_empty),
&(tpool->queue_lock))) != 0)
fprintf(stderr,"pthread_cond_wait %d",rtn), exit(1);
}
sleep(5);
printf("worker %d: I'm awake\n", pthread_self());
/* Has a shutdown started while i was sleeping? */
if (tpool->shutdown == 1) {
if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);
pthread_exit(NULL);
}
/* Get to work, dequeue the next item */
my_workp = tpool->queue_head;
tpool->cur_queue_size--;
if (tpool->cur_queue_size == 0)
tpool->queue_head = tpool->queue_tail = NULL;
else
tpool->queue_head = my_workp->next;
printf("worker %d: dequeing item %d\n", pthread_self(), my_workp->next);
/* Handle waiting add_work threads */
if ((!tpool->do_not_block_when_full) &&
(tpool->cur_queue_size == (tpool->max_queue_size - 1)))
if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_full))) != 0)
fprintf(stderr,"pthread_cond_broadcast %d",rtn), exit(1);
/* Handle waiting destroyer threads */
if (tpool->cur_queue_size == 0)
if ((rtn = pthread_cond_signal(&(tpool->queue_empty))) != 0)
fprintf(stderr,"pthread_cond_signal %d",rtn), exit(1);
if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);
/* Do this work item */
(*(my_workp->routine))(my_workp->arg);
free(my_workp);
}
return(NULL);
}
一个c语言实现的线程池
4星 · 超过85%的资源 需积分: 17 120 浏览量
2009-12-18
17:48:50
上传
评论
收藏 4KB RAR 举报
「已注销」
- 粉丝: 11
- 资源: 5
最新资源
- 基于Matlab人脸肤色定理的教师人数统计+源代码+全部数据+文档说明+详细注释+使用说明+截图(高分课程设计)
- 基于Matlab霍夫曼变换的表盘读数识别+源代码+全部数据+文档说明+详细注释+使用说明+截图(高分课程设计)
- 基于Matlab火灾烟雾检测源码带GUI界面+源代码+全部数据+文档说明+详细注释+使用说明+截图(高分课程设计)
- 基于Matlab的恶劣天气交通标志识别系统+源代码+全部数据+文档说明+详细注释+使用说明+截图(高分课程设计)
- 基于MATLAB的霍夫曼变换的表盘示数识别+源代码+全部数据+文档说明+详细注释+使用说明+截图(高分课程设计)
- 基于Matlab的车道线识别系统 +源代码+全部数据+文档说明+详细注释+使用说明+截图(高分课程设计)
- 基于MATLAB的教室人数统计系统带Gui界面+源代码+全部数据+文档说明+详细注释+使用说明+截图(高分课程设计)
- 基于MATLAB的教室人数统计系统带Gui界面+源代码+全部数据+文档说明+详细注释+使用说明+截图(高分课程设计)
- 基于MATLAB 的霍夫曼变换答题卡识别源码+全部数据+文档说明+详细注释+使用说明+截图(高分课程设计)
- 基于Matlab+bp神经网络的神经网络汉字识别系统+源代码+全部数据+文档说明+详细注释+使用说明+截图(高分课程设计)
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈