#include "CThread.h"
#include <string>
#include <iostream>
using namespace std;
bool CJob::isTimeToExit()
{
if(threadpool == NULL)
return true;
if(threadpool->GetThreadPoolState())
return true;
else
return false;
}
void CJob::SetData(void * data)
{
m_ptrData = data;
}
vector<pthread_t> CThreadPool::m_vecBusyThread;
vector<pthread_t> CThreadPool::m_vecIdleThread;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t CThreadPool::m_pthreadStateMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
bool CThreadPool::isToStop = false;
CThreadPool::CThreadPool(int threadNum)
{
this->m_iThreadNum = threadNum;
Create();
}
bool CThreadPool::GetThreadPoolState()
{
return isToStop;
}
void CThreadPool::SetThreadPoolState(bool state)
{
pthread_mutex_lock(&m_pthreadStateMutex);
isToStop = state;
pthread_mutex_unlock(&m_pthreadStateMutex);
}
int CThreadPool::MoveToIdle(pthread_t tid)
{
pthread_mutex_lock(&m_pthreadStateMutex);
if(isToStop)
{
pthread_mutex_unlock(&m_pthreadStateMutex);
return 0;
}
vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin();
while(busyIter != m_vecBusyThread.end())
{
if(tid == *busyIter)
{
break;
}
busyIter++;
}
m_vecBusyThread.erase(busyIter);
m_vecIdleThread.push_back(tid);
pthread_mutex_unlock(&m_pthreadStateMutex);
return 0;
}
int CThreadPool::MoveToBusy(pthread_t tid)
{
pthread_mutex_lock(&m_pthreadStateMutex);
if(isToStop)
{
pthread_mutex_unlock(&m_pthreadStateMutex);
return 0;
}
vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin();
while(idleIter != m_vecIdleThread.end())
{
if(tid == *idleIter)
{
break;
}
idleIter++;
}
m_vecIdleThread.erase(idleIter);
m_vecBusyThread.push_back(tid);
pthread_mutex_unlock(&m_pthreadStateMutex);
return 0;
}
void* CThreadPool::ThreadFunc(void * threadData)
{
pthread_t tid = pthread_self();
list<CJob*>* taskList = (list<CJob*>*)threadData;
while(1)
{
if(CThreadPool::isToStop)
{
cout << "CThread::thread work exit" << endl;
return 0;
}
pthread_mutex_lock(&m_pthreadMutex);
if(CThreadPool::isToStop)
{
pthread_mutex_unlock(&m_pthreadMutex);
cout << "CThread::thread work exit" << endl;
return 0;
}
if(taskList->size() == 0)
{
pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);
}
//pthread_testcancel();
if(CThreadPool::isToStop)
{
pthread_mutex_unlock(&m_pthreadMutex);
cout << "CThread::thread work exit" << endl;
return 0;
}
//cout << "tid:" << tid << " run" << endl;
//get task
list<CJob*>::iterator iter = taskList->begin();
if(iter != taskList->end())
{
MoveToBusy(tid);
CJob* task = *iter;
taskList->erase(iter);
pthread_mutex_unlock(&m_pthreadMutex);
//cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl;
//cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl;
cout << "task to be run:" << taskList->size() << endl;
task->Run();
if(task->GetDelFlag())
{
cout << "del Task:" << task->GetJobId()<< endl;
delete task;
}
MoveToIdle(tid);
}else{
pthread_mutex_unlock(&m_pthreadMutex);
}
//cout << "CThread::thread work exit" << endl;
//cout << "tid:" << tid << " idle" << endl;
}
//cout << "CThread::thread work exit" << endl;
return (void*)0;
}
int CThreadPool::AddTask(CJob *job)
{
pthread_mutex_lock(&m_pthreadStateMutex);
job->SetThreadPoolHandle(this);
this->m_vecTaskList.push_back(job);
pthread_mutex_unlock(&m_pthreadStateMutex);
pthread_mutex_lock(&m_pthreadMutex);
pthread_cond_signal(&m_pthreadCond);
pthread_mutex_unlock(&m_pthreadMutex);
return 0;
}
int CThreadPool::AddTaskHead(CJob *job)
{
pthread_mutex_lock(&m_pthreadStateMutex);
job->SetThreadPoolHandle(this);
this->m_vecTaskList.push_front(job);
pthread_mutex_unlock(&m_pthreadStateMutex);
pthread_mutex_lock(&m_pthreadMutex);
pthread_cond_signal(&m_pthreadCond);
pthread_mutex_unlock(&m_pthreadMutex);
return 0;
}
int CThreadPool::Create()
{
for(int i = 0; i < m_iThreadNum;i++)
{
pthread_t tid = 0;
pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList);
m_vecIdleThread.push_back(tid);
}
return 0;
}
int CThreadPool::StopAll()
{
if(!CThreadPool::isToStop)
{
SetThreadPoolState(true);
}
pthread_mutex_lock(&m_pthreadMutex);
pthread_cond_broadcast(&m_pthreadCond);
pthread_mutex_unlock(&m_pthreadMutex);
vector<pthread_t>::iterator iter = m_vecIdleThread.begin();
while(iter != m_vecIdleThread.end())
{
//pthread_cancel(*iter);
pthread_join(*iter,NULL);
iter++;
}
cout << " idle" << endl;
iter = m_vecBusyThread.begin();
while(iter != m_vecBusyThread.end())
{
//pthread_cancel(*iter);
pthread_join(*iter,NULL);
iter++;
}
cout << " Busy" << endl;
//移除还未执行的任务
list<CJob*>::iterator iter_task = m_vecTaskList.begin();
while(iter_task != m_vecTaskList.end())
{
CJob* task = *iter_task;
iter_task = m_vecTaskList.erase(iter_task);
if(task->GetDelFlag())
{
cout << " TaskList size:" << m_vecTaskList.size() << endl;
cout << "StopAll del Task:" << task->GetJobId()<< endl;
delete task;
}
//iter_task++;
}
cout << " Task" << endl;
return 0;
}