DWORD CFTLThreadTester::ProducerThreadFun(void* pParam)
{
CFTLThreadTester* pThis = static_cast<CFTLThreadTester*>(pParam);
for (LONG index = 0; index < 5; index++)
{
CMyTestData* pData = new CMyTestData(index,FALSE);
FTLTRACE(TEXT("ProducerThread Write pData->Index = %d, CurrentTotalCount = %d\n"),
pData->GetCurrentIndex(),CMyTestData::GetTestDataCount());
if (pThis->m_MyTestDataQueue.Append(pData,INFINITE) == ftwtContinue)
{
}
else
{
delete pData;
}
pThis->m_ThreadProducer.SleepAndCheckStop(200);
}
for (LONG index = 6; index < 10; index++)
{
CMyTestData* pData = new CMyTestData(index,FALSE);
FTLTRACE(TEXT("ProducerThread Write pData->Index = %d, CurrentTotalCount = %d\n"),
pData->GetCurrentIndex(),CMyTestData::GetTestDataCount());
{
CFAutoQueueOperator<CMyTestData*> queueOperator(&pThis->m_MyTestDataQueue,pData,TRUE,INFINITE);
if (queueOperator.HaveGotQueue()) //成功获取,并能够加入
{
//Donothing
}
else
{
delete pData;
}
}
pThis->m_ThreadProducer.SleepAndCheckStop(200);
}
//传入要求退出的元素
CMyTestData *pQuitData = new CMyTestData(-1,FALSE);
if (pThis->m_MyTestDataQueue.Append(pQuitData,INFINITE) != ftwtContinue)
{
delete pQuitData;
}
FTLTRACE(TEXT("Now Producer Thread End.\n"));
return 0;
}
DWORD CFTLThreadTester::ResumerThreadFun(void* pParam)
{
CFTLThreadTester* pThis = static_cast<CFTLThreadTester*>(pParam);
CMyTestData* pData = NULL;
while (ftwtContinue == pThis->m_MyTestDataQueue.Remove(pData,INFINITE))
{
LONG dwCurrentIndex = pData->GetCurrentIndex();
if (dwCurrentIndex == -1) //Quit
{
delete pData;
break;
}
pThis->m_ThreadResumer.SleepAndCheckStop(500);
delete pData;
FTLTRACE(TEXT("ResumerThread Read pData->Index = %d, CurrentTotalCount = %d\n"),
dwCurrentIndex,CMyTestData::GetTestDataCount());
}
{
CFAutoQueueOperator<CMyTestData*> queueOperator(&pThis->m_MyTestDataQueue,pData,FALSE,100);
}
//清除
while (pThis->m_MyTestDataQueue.RemoveAfterStop(pData))
{
delete pData;
}
FTLTRACE(TEXT("Now Resumer Thread End, CurrentTotalCount = %d\n"),CMyTestData::GetTestDataCount());
return 0;
}
void CFTLThreadTester::test_CFProducerResumerQueue()
{
m_ThreadProducer.Start(ProducerThreadFun,this);
m_ThreadResumer.Start(ResumerThreadFun,this);
m_ThreadProducer.Wait(INFINITE,TRUE);
m_ThreadResumer.Wait(INFINITE,TRUE);
m_ThreadProducer.Start(ProducerThreadFun,this);
Sleep(1000);
m_ThreadResumer.Start(ResumerThreadFun,this);
Sleep(10);
m_MyTestDataQueue.Stop(); //停止Queue
m_ThreadProducer.Wait(INFINITE,TRUE);
m_ThreadResumer.Wait(INFINITE,TRUE);
}
void CFTLThreadTester::test_CFProducerResumerQueue_ResolveRelease()
{
//测试保留Slot值 --
CPPUNIT_ASSERT(m_MyTestDataQueue.GetCapability() == MAX_QUEUE_COUNT);
CPPUNIT_ASSERT(m_MyTestDataQueue.GetElementCount() == 0); //Queue中没有元素
CPPUNIT_ASSERT(m_MyTestDataQueue.GetReserveSlotCount() == 1); //初始值是1
CPPUNIT_ASSERT(m_MyTestDataQueue.ReserveSlot(1,INFINITE) == ftwtContinue); //可以保留成功
DWORD dwUseSlot = m_MyTestDataQueue.GetCapability() - m_MyTestDataQueue.GetReserveSlotCount();
CPPUNIT_ASSERT(dwUseSlot == 0); //都保留了,可用Slot为0,将不能再加入Element
CMyTestData data(999);
CPPUNIT_ASSERT(m_MyTestDataQueue.Append(&data,100) == ftwtTimeOut); //
CPPUNIT_ASSERT(m_MyTestDataQueue.ReleaseSlot(MAX_QUEUE_COUNT)); //全部释放
CPPUNIT_ASSERT(m_MyTestDataQueue.GetReserveSlotCount() == 0);
CPPUNIT_ASSERT(m_MyTestDataQueue.Append(&data,1000) == ftwtContinue); //此时能成功加入
CMyTestData* pTempData = NULL;
CPPUNIT_ASSERT(m_MyTestDataQueue.Remove(pTempData,1000) == ftwtContinue); //能成功取出
CPPUNIT_ASSERT(pTempData == &data); //取出的就是之前放入的
CPPUNIT_ASSERT(m_MyTestDataQueue.ReserveSlot(1,1000) == ftwtContinue); //恢复保留Slot为1
}
没有合适的资源?快使用搜索试试~ 我知道了~
一个可以重用的线程安全生产者消费者队列类
共3个文件
hpp:1个
h:1个
cpp:1个
5星 · 超过95%的资源 需积分: 42 264 下载量 167 浏览量
2013-04-21
21:57:24
上传
评论 8
收藏 7KB ZIP 举报
温馨提示
生产者、消费者队列的代码和文档和例子,CSDN或别的网站上已经有很多了,我就不再多说了。不过一般的文档中给出的例子往往只是“例子”,要用于实际的项目开发中差的很远,共享一下我以前写的一个线程安全的生产者消费者队列类。具有如下特点: 1.可以设置队列中的最大长度; 2.线程安全; 3.多线程想向队列中放入数据时,可设置等待队列有空位的超时时间,当有空位时,等待线程会被唤醒并将数据加入队列。函数原型为:FTLThreadWaitType Append(const ELEMENT& element, DWORD dwTimeOut). 4.多线程从队列中取出数据时,可设置等待队列有数据的超时时间, 当有数据时等待线程会被唤醒。函数原型为:FTLThreadWaitType Remove(ELEMENT& element, DWORD dwTimeOut), 5.支持安全的退出,即如果在等待加入数据或取出数据时,能直接通过Stop唤醒。 6.支持动态改变队列大小 -- 但这个功能我测试的不是很多,可能有Bug,如发现有Bug请告诉我。 7.和我其他FTL中的类一样,支持模版。 本次就偷一下懒,只上传源码和UT代码,就不再写示例程序了。下载了源码以后需要自己对应更改一下(如果实在更改不来,可以给我回复信息,我有空时再写个Demo放上来)。 博客地址:http://blog.csdn.net/fishjam/article/details/8832145
资源推荐
资源详情
资源评论
收起资源包目录
FProducerResumerQueue.zip (3个子文件)
PRTest.cpp 4KB
FProducerResumerQueue.hpp 15KB
FProducerResumerQueue.h 6KB
共 3 条
- 1
fishjam
- 粉丝: 166
- 资源: 13
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
- 1
- 2
- 3
前往页