#include "ThreadPool.h"
#include <cstdio>
#include <Windows.h>
namespace ThreadAPP {
static volatile int pool_seq = 0;
static MutexLock pool_seq_locker;
ThreadPool::ThreadPool(int num)
: closed(false)
, poolSeq(0)
, threadNum(num)
, taskLocker()
, taskCond()
, waitCompleteCond()
#ifdef USING_C11THREAD
#endif
{
pool_seq_locker.lock();
this->poolSeq = ++pool_seq;
pool_seq_locker.unlock();
printf("create thread pool %d\n", poolSeq);
init();
}
ThreadPool::~ThreadPool()
{
for (list<ThreadWrapper*>::iterator iter = threads.begin(); iter != threads.end(); iter++)
{
ThreadWrapper* tw = *iter;
delete tw;
}
threads.clear();
while (!taskQueue.empty())
{
printf("thread pool release task is not empty");
TaskWrapper* taskw = taskQueue.front();
taskQueue.pop();
delete taskw;
}
printf("delete thread pool %d\n", poolSeq);
}
void* ThreadPool::__run__thread(void *arg)
{
ThreadPool* pool = (ThreadPool*)arg;
pool->process();
printf("thread run end...");
return NULL;
}
void ThreadPool::init()
{
taskLocker.lock();
for (int i = 0; i < threadNum; i++)
{
#ifdef USING_C11THREAD
char name[16];
sprintf(name, "p-%d-t-%d", poolSeq, i + 1);
threads.push_back(new ThreadWrapper(__run__thread, this, name));
#else
pthread_t pid;
pthread_create(&pid, NULL, __run__thread, this);
pthread_detach(pid);
char name[16];
sprintf(name, "p-%d-t-%d", poolSeq, i + 1);
threads.push_back(new ThreadWrapper(pid, name));
#endif
}
taskLocker.unlock();
}
ThreadWrapper* ThreadPool::findThreadWrapper() const
{
for (list<ThreadWrapper*>::const_iterator iter = threads.begin(); iter != threads.end(); iter++)
{
ThreadWrapper* tw = *iter;
#ifdef USING_C11THREAD
if (this_thread::get_id() == tw->getPid())
#else
if (pthread_equal(pthread_self(), tw->getPid()))
#endif
{
return tw;
}
}
return NULL;
}
#ifndef USING_C11THREAD
void ThreadPool::process()
{
TaskWrapper* taskw;
while (true)
{
taskLocker.lock();
while (taskQueue.empty())
{
ThreadWrapper* tw = findThreadWrapper();
if (closed)
{
if (tw != NULL)
{
threads.remove(tw);
delete tw;
}
bool allClosed = threads.empty();
taskLocker.unlock();
if (allClosed)
{
delete this;
}
return;
}
tw->setIdle(true);
waitCompleteCond.notify();
taskCond.wait(taskLocker);
tw->setIdle(false);
}
taskw = taskQueue.front();
printf("process : %p\n", taskw);
taskQueue.pop();
#ifndef USING_C11THREAD
taskLocker.unlock();
#endif
taskw->getTask()->run();
delete taskw;
}
}
#else
void ThreadPool::process()
{
TaskWrapper* taskw;
while (true)
{
std::unique_lock<std::mutex> locker(taskLocker.getPthreadMutexByRef());
while (taskQueue.empty())
{
ThreadWrapper* tw = findThreadWrapper();
if (closed)
{
if (tw != NULL)
{
threads.remove(tw);
delete tw;
}
bool allClosed = threads.empty();
if (allClosed)
{
delete this;
}
return;
}
tw->setIdle(true);
waitCompleteCond.notify();
#ifdef USING_C11THREAD
taskCond.wait(locker);
#else
taskCond.wait(taskLocker);
#endif
tw->setIdle(false);
}
taskw = taskQueue.front();
taskQueue.pop();
#ifndef USING_C11THREAD
taskLocker.unlock();
#endif
taskw->getTask()->run();
delete taskw;
}
}
#endif
void ThreadPool::addTask(Task* task, bool needDeleteAfterRun)
{
TaskWrapper* taskw = new TaskWrapper(task, needDeleteAfterRun);
taskLocker.lock();
taskQueue.push(taskw);
#ifndef USING_C11THREAD
taskCond.notify();
#else
taskCond.notify();
#endif
taskLocker.unlock();
}
void ThreadPool::addTasks(vector<Task*>* tasks, bool needDeleteAfterRun)
{
taskLocker.lock();
for (unsigned int i = 0; i < tasks->size(); i++)
{
taskQueue.push(new TaskWrapper(tasks->at(i), needDeleteAfterRun));
}
#ifndef USING_C11THREAD
taskCond.notifyAll();
#else
condTask.notify_all();
#endif
taskLocker.unlock();
}
void ThreadPool::shutdownAfterProcess()
{
if (!closed)
{
closed = true;
taskCond.notifyAll();
}
}
#ifndef USING_C11THREAD
void ThreadPool::waitComplete()
{
while (true)
{
taskLocker.lock();
if (taskQueue.empty())
{
bool allIdle = true;
for (list<ThreadWrapper*>::iterator iter = threads.begin(); iter != threads.end(); iter++)
{
ThreadWrapper* tw = *iter;
if (!tw->isIdle())
{
allIdle = false;
break;
}
}
if (allIdle)
{
taskLocker.unlock();
return;
}
}
waitCompleteCond.wait(taskLocker);
taskLocker.unlock();
}
}
#else
void ThreadPool::waitComplete()
{
while (true)
{
std::unique_lock<std::mutex> locker(taskLocker.getPthreadMutexByRef());
if (taskQueue.empty())
{
bool allIdle = true;
for (list<ThreadWrapper*>::iterator iter = threads.begin(); iter != threads.end(); iter++)
{
ThreadWrapper* tw = *iter;
if (!tw->isIdle())
{
allIdle = false;
break;
}
}
if (allIdle)
{
return;
}
}
waitCompleteCond.wait(locker);
}
}
#endif
}
没有合适的资源?快使用搜索试试~ 我知道了~
posix和c11都实现的跨平台线程库
共36个文件
h:9个
tlog:6个
cpp:3个
3星 · 超过75%的资源 需积分: 16 10 下载量 158 浏览量
2016-11-28
16:45:41
上传
评论
收藏 13.62MB ZIP 举报
温馨提示
c++多线程,运用posix线程库,win下也可以用。真正实现跨平台,posix线程和c11线程可以切换(在define.h中USING_C11THREAD宏定义决定),实现相同的功能-线程池
资源推荐
资源详情
资源评论
收起资源包目录
thread.zip (36个子文件)
thread
thread
semaphore.h 4KB
Task.h 502B
temp.cpp 5KB
pthread.h 41KB
Condition.h 1KB
thread.vcxproj 7KB
thread.vcxproj.user 386B
sched.h 5KB
test.cpp 2KB
thread.vcxproj.filters 2KB
Debug
thread.log 230B
vc140.pdb 1.6MB
test.obj 209KB
ThreadPool.obj 664KB
thread.tlog
CL.write.1.tlog 2KB
link.command.1.tlog 2KB
CL.read.1.tlog 75KB
CL.command.1.tlog 2KB
thread.lastbuildstate 230B
link.write.1.tlog 854B
link.read.1.tlog 4KB
threadpool.obj.enc 614KB
vc140.idb 1.5MB
pthread
pthreadVCE2.lib 29KB
noncopyable.h 213B
ThreadPool.cpp 4KB
Mutex.h 2KB
ThreadPool.h 2KB
define.h 395B
.vs
thread
v14
.suo 64KB
Debug
thread.ilk 645KB
pthreadVCE2.dll 76KB
thread.pdb 3.34MB
thread.exe 159KB
thread.sln 1KB
thread.VC.db 30.86MB
共 36 条
- 1
资源评论
- nrxtgcb2016-11-29这个代码写的真是烂!不推荐下载。
- abemail2020-07-23参考了,还可以
yzqhao
- 粉丝: 0
- 资源: 14
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功