#include "MessageCentrer.h"
#include "Subscriber.h"
#include "Publisher.h"
#define MAX_PUBLISHES 10000
MessageCentrer* MessageCentrer::mSgMC = nullptr;
std::mutex MessageCentrer::mMCMutex;
MessageCentrer* MessageCentrer::GetMC()
{
if (mSgMC == nullptr)
{
std::unique_lock<std::mutex> lock(mMCMutex);
if (mSgMC == nullptr)
{
volatile auto temp = new (std::nothrow) MessageCentrer();
mSgMC = temp;
}
}
return mSgMC;
}
void MessageCentrer::Run()
{
mCoreProcss.reset(new std::thread(&MessageCentrer::CoreProcss, this));
}
void MessageCentrer::RegistPublish(std::string tpcKey, void* msgdata, unsigned int datasize)
{
std::unique_lock<std::mutex> lock(mPublishMutex);
if ((this->mSubscriber.find(tpcKey) != this->mSubscriber.end()) &&
(this->mPublisher[tpcKey].size() > MAX_PUBLISHES)) return;
void* tmpdata = new char[datasize];
memcpy(tmpdata, msgdata, datasize);
this->mPublisher[tpcKey].push_back(tmpdata);
}
void MessageCentrer::RegistSubscribe(std::string tpcKey, Subscriber* subscriber)
{
std::unique_lock<std::mutex> lock(mSubscribeMutex);
this->mSubscriber[tpcKey].remove(subscriber);
this->mSubscriber[tpcKey].push_back(subscriber);
}
void MessageCentrer::CancelSubscribe(std::string tpcKey, Subscriber* subscriber)
{
std::unique_lock<std::mutex> lock(mSubscribeMutex);
if (this->mSubscriber.find(tpcKey) != this->mSubscriber.end())
this->mSubscriber.find(tpcKey)->second.remove(subscriber);
}
MessageCentrer::MessageCentrer()
{
this->mPublisher.clear();
this->mSubscriber.clear();
}
MessageCentrer::~MessageCentrer()
{
}
void MessageCentrer::CoreProcss()
{
while (true)
{
auto it = this->mSubscriber.begin();
while (it != this->mSubscriber.end())
{
if (this->mPublisher.find(it->first) != this->mPublisher.end())
{
auto itt = it->second.begin();
while (itt != it->second.end())
{
auto mpitr = this->mPublisher.find(it->first)->second.begin();
auto mpitrend = this->mPublisher.find(it->first)->second.end();
while (mpitr != mpitrend)
{
(*itt)->HandeEnvent(it->first, *mpitr);
++mpitr;
}
++itt;
}
mPublishMutex.lock();
auto mpitr = this->mPublisher.find(it->first)->second.begin();
auto mpitrend = this->mPublisher.find(it->first)->second.end();
while (mpitr != mpitrend)
{
delete[](*mpitr);
++mpitr;
}
this->mPublisher.find(it->first)->second.clear();
this->mPublisher.erase(it->first);
mPublishMutex.unlock();
}
++it;
}
}
}
东风吹柳
- 粉丝: 1498
- 资源: 13
最新资源
- (免费下载)LogiSim 华中科技大学优化版 - 逻辑电路/电路设计/单片机
- Android大作业-仿抖音APP源代码+文档答辩ppt+演示视频(高分期末大作业)
- 植物大战僵尸Mixed
- 毕设-c语言UDP传输系统源码8.zip
- 毕设-c语言24点游戏源码6.zip
- 毕设-C语言超市管理系统1.zip
- 毕设-c语言Turbo C下写的俄罗斯方块7.zip
- 毕设-c语言别踩白块儿(双人版)源码10.zip
- 毕设-c语言吃逗游戏源码2.zip
- 毕设-c语言奔跑的火柴人游戏源码9.rar
- 毕设-c语言打字游戏代码4.zip
- 毕设-c语言打字母游戏源码3.zip
- 毕设-c语言大丰收游戏源码5.zip
- 17325458887980.zip
- 12.5MN冲孔压机机架结构及焊接工艺性浅析.pdf
- 12Cr1MoVG与TP347H(Ф38mm×5mm)钢管的钨极氩弧焊焊接工艺 - .pdf
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈