#include "Threadpool.h"
#include "mysql.h"
#include "connection_pool.h"
/***************pthread_create()**********
int pthread_create(pthread_t *tidp,//第一个参数为指向线程标识符的指针。
const pthread_attr_t *attr,//第二个参数用来设置线程属性。
(void*)(*start_rtn)(void*),//第三个参数是线程运行函数的起始地址。
void *arg//最后一个参数是运行函数的参数。
);
下面会用到pthread_create(&(this->pthreads[i]),NULL,run,(void *)this);
******************************************/
void* run(void* arg) //返回一个未知类型的值
{
ThreadPool *pool = (ThreadPool *)arg;//定义一个ThreadPool指针,有什么用呢??看不懂
struct Job job;
while(1)
{
pthread_mutex_lock(&(pool->mutex)); //线程得不到锁,会被阻塞,等待锁的释放,pool->mutex跟this->mutex是这两把锁是同一把锁吗?
if(pool->isPoolClose) //线程池已经关闭
{
pthread_mutex_unlock(&(pool->mutex));
pthread_exit(NULL);
}
while((pool->jobQueue.empty())) //队列为空,就排队等
{
printf("job queue is empty..i am waiting\n");
pthread_cond_wait(&(pool->conQueueEmpty),&(pool->mutex));//释放锁,被挂起;要等待pool->conQueueEmpty状态改变,才能有机会获取锁。
if(pool->isPoolClose) //线程池已经关闭
{
pthread_mutex_unlock(&(pool->mutex));//解锁
pthread_exit(NULL);
}
} //end while
job=pool->jobQueue.front(); //获取队列开头的任务
pool->jobQueue.pop_front(); //deque的开头删除函数,任务减少一个
printf("get job: %d ,size : %d\n",(int)job.arg,pool->jobQueue.size());
if(pool->jobQueue.size()==(pool->maxJobNum-1))
{
pthread_cond_broadcast(&(pool->conQueueFull));
//队列非满,就可以通知add_job函数添加新任务
perror("jobQueue.size is not full!");
}
pthread_mutex_unlock(&(pool->mutex)); //在执行具体任务时,必须把锁释放,因为有其他线程等待锁
(*(job.doJoFun))(job.arg); //线程真正要做的工作,回调函数的使用
}//end while
}
ThreadPool::ThreadPool(int threadNum,int maxJobNum)
{
this->threadNum = threadNum;
this->maxJobNum = maxJobNum;
if(pthread_mutex_init(&(this->mutex),NULL)) //初始化把锁,成功返回0
{
perror("pthread_mutex_init");
exit(1);
}
if(pthread_cond_init(&(this->conQueueEmpty),NULL)) //初始化信号量
{
perror("pthread_cond_init");
exit(1);
}
if(pthread_cond_init(&(this->conQueueFull),NULL)) //初始化信号量
{
perror("pthread_cond_init");
exit(1);
}
this->pthreads = (pthread_t *)malloc(sizeof(pthread_t) * this->threadNum);
if(NULL == this->pthreads)
{
perror("pthreads malloc");
exit(1);
}
this->isPoolClose = false;
unsigned int i;
for(i=0;i<this->threadNum;++i)
{
pthread_create(&(this->pthreads[i]),NULL,run,(void *)this);//创建线程;run为线程起始函数
}
}
int ThreadPool::addJob(Job job)
{
assert(job.doJoFun != NULL);
if(this->isPoolClose)
{
return -1;
}
pthread_mutex_lock(&(this->mutex));//上锁是为了什么?猜测是拥有锁的线程才能对队列进行操作吗?
while((this->jobQueue.size())==this->maxJobNum)//队列满了
{
printf("job queue is full\n");
pthread_cond_wait(&(this->conQueueFull),&(this->mutex));//释放锁,并等待this->conQueueFull的改变;
//队列满的时候要等
}
if(this->isPoolClose)
{
pthread_mutex_unlock(&(this->mutex));
return -1;
}
if(this->jobQueue.empty())
{
this->jobQueue.push_back(job);
pthread_cond_broadcast(&(this->conQueueEmpty));//pthread_cond_broadcast()函数会将所有等待该条件变量的线程从等待状态变为激活状态,不是仅仅解锁一个线程
//队列空的时候,有任务来时就通知线程池中的线程:队列非空
} //激发条件有两种形式,pthread_cond_signal()激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;而pthread_cond_broadcast()则激活所有等待线程。
else
{
this->jobQueue.push_back(job);//双端队列尾部加入一个job结构体
}
printf("add a job: %d, size: %d\n",(int)job.arg,this->jobQueue.size());
pthread_mutex_unlock(&(this->mutex));
return 0;
}
int ThreadPool::destroy()
{
printf("bigin destroy..\n");
pthread_mutex_lock(&(this->mutex));
this->isPoolClose = true; //关线程池
this->jobQueue.clear(); //清队列
pthread_mutex_unlock(&(this->mutex));
pthread_cond_broadcast(&(this->conQueueEmpty));
//唤醒线程池中正在阻塞的线程,他们因为没任务所以挂起
pthread_cond_broadcast(&(this->conQueueFull));
//唤醒添加任务的add job函数因为发信号说非满可以添加了
unsigned int i;
for(i=0;i<this->threadNum;++i)
{
pthread_join(this->pthreads[i], NULL);//等待所有线程执行完毕
//等待进程结束函数
printf("thread %X exit\n",(unsigned int)this->pthreads[i]);
}
/*以下是清线程的资源,用他们自带的函数方法*/
pthread_mutex_destroy(&(this->mutex));
pthread_cond_destroy(&(this->conQueueEmpty));
pthread_cond_destroy(&(this->conQueueFull));
free(this->pthreads);
printf("finish destroy\n");
return 0;
}
ThreadPool::~ThreadPool()
{
//destroy();
}
/*
void* prin1(void *)//void* prin1(void*)这个函数的输入输出都是未定义类型void*,意思就是你可以输入任意类型参数,返回任意类型的值
{
printf("this is test job1\n");
MYSQL mysql;
mysql_init(&mysql);
if(mysql_real_connect(&mysql,"localhost","root","cjc","test",3306,NULL,0));//获取了连接,为何再连接???
printf("connect success");
//连接进去之后
return NULL;//返回值是NULL
}
void* prin2(void *)
{
printf("this is test job2\n");
return NULL;
}
/**************************华丽的分界线****************/
/*
int main()
{
struct Job job1,job2;
ThreadPool onlyyou;
job1.doJoFun=prin1;//相当于job1.doJoFun=&prin1,可以理解为函数指针doJoFun指向了prin1函数吗??
job1.arg =NULL;//(*doJoFun)整体可以看作一个函数名,(void* arg)为函数未定义类型的参数arg,定义为NULL表示为空//例如int *p;int x=1;p=&x;*p=2;等价于x=2,等价于*p=x;
//不知道这样的理解是否正确?
job2.doJoFun=prin2;
job2.arg =NULL;
for(int i=0;i<10;i++)
{
if(-1 == onlyyou.addJob(job1))//onlyyou.addJob(job1)返回-1意味着任务添加失败
printf("add job1 error\n");
if(-1 == onlyyou.addJob(job2))
printf("add job1 error\n");
}
sleep(1);//延时1秒,等待所有线程执行完;
onlyyou.destroy();//线程池的释放销毁
return 0;
}*/
评论2
最新资源