#include "MessageCentrer.h"
#include "Subscriber.h"
#include "Publisher.h"
#define MAX_PUBLISHES 10000
MessageCentrer* MessageCentrer::mSgMC = nullptr;
std::mutex MessageCentrer::mMCMutex;
MessageCentrer* MessageCentrer::GetMC()
{
if (mSgMC == nullptr)
{
std::unique_lock<std::mutex> lock(mMCMutex);
if (mSgMC == nullptr)
{
volatile auto temp = new (std::nothrow) MessageCentrer();
mSgMC = temp;
}
}
return mSgMC;
}
void MessageCentrer::Run()
{
mCoreProcss.reset(new std::thread(&MessageCentrer::CoreProcss, this));
}
void MessageCentrer::RegistPublish(std::string tpcKey, void* msgdata, unsigned int datasize)
{
std::unique_lock<std::mutex> lock(mPublishMutex);
if ((this->mSubscriber.find(tpcKey) != this->mSubscriber.end()) &&
(this->mPublisher[tpcKey].size() > MAX_PUBLISHES)) return;
void* tmpdata = new char[datasize];
memcpy(tmpdata, msgdata, datasize);
this->mPublisher[tpcKey].push_back(tmpdata);
}
void MessageCentrer::RegistSubscribe(std::string tpcKey, Subscriber* subscriber)
{
std::unique_lock<std::mutex> lock(mSubscribeMutex);
this->mSubscriber[tpcKey].remove(subscriber);
this->mSubscriber[tpcKey].push_back(subscriber);
}
void MessageCentrer::CancelSubscribe(std::string tpcKey, Subscriber* subscriber)
{
std::unique_lock<std::mutex> lock(mSubscribeMutex);
if (this->mSubscriber.find(tpcKey) != this->mSubscriber.end())
this->mSubscriber.find(tpcKey)->second.remove(subscriber);
}
MessageCentrer::MessageCentrer()
{
this->mPublisher.clear();
this->mSubscriber.clear();
}
MessageCentrer::~MessageCentrer()
{
}
void MessageCentrer::CoreProcss()
{
while (true)
{
auto it = this->mSubscriber.begin();
while (it != this->mSubscriber.end())
{
if (this->mPublisher.find(it->first) != this->mPublisher.end())
{
auto itt = it->second.begin();
while (itt != it->second.end())
{
auto mpitr = this->mPublisher.find(it->first)->second.begin();
auto mpitrend = this->mPublisher.find(it->first)->second.end();
while (mpitr != mpitrend)
{
(*itt)->HandeEnvent(it->first, *mpitr);
++mpitr;
}
++itt;
}
mPublishMutex.lock();
auto mpitr = this->mPublisher.find(it->first)->second.begin();
auto mpitrend = this->mPublisher.find(it->first)->second.end();
while (mpitr != mpitrend)
{
delete[](*mpitr);
++mpitr;
}
this->mPublisher.find(it->first)->second.clear();
this->mPublisher.erase(it->first);
mPublishMutex.unlock();
}
++it;
}
}
}

东风吹柳
- 粉丝: 1649
- 资源: 12
最新资源
- STM32F103系列离线下载器制作全套指南:源程序、电路图与PCB图全解析,STM32F103系列离线下载器制作全攻略:源程序、电路图与PCB图全套解析,STM32F103系列离线下载器制作,包括源
- 基于多时段随机配电网重构的优化策略:以Switch Opening and Exchange Method for Stochastic Distribution Network Reconfigu
- Qt开发上位机源码解析:固高八轴运动控制卡集成与多语言样式切换功能下的喷码机控制及光学点定位技术,Qt开发上位机:基于固高八轴运动控制卡与海康威视相机的喷码机控制源码,支持光学点定位、二维码读码与等级
- 深度验证Logistic回归预测模型:Nomogram图、列线图与列线表的综合应用研究,深度验证Logistic回归预测模型:Nomogram图、列线图与列线表的综合应用研究,Logistic回归预测
- BP神经网络在数据分类预测与故障信号诊断中的matlab代码实现:注释详尽,直接运行出分类结果与误差分布,BP神经网络数据分类预测与故障信号诊断matlab代码实战:快速分类结果及误差分布可视化,BP
- C#高级运控框架:固高运动控制GTS400/800的开源解析与实现建议,C#驱动运控框架,掌握固高运动控制核心技术:GTS400与800源码解析,C#运控框架 固高运动控制 1、支持GTS400 80
- 专业量产电动自行车、滑板车电机控制方案:成熟Foc控制技术与国产芯片集成,功能全面升级,高效稳定调试体验,大厂成熟Foc电机控制方案应用于电动两轮车领域:专业调试,多功能集成,高效稳定控制代码,基于国
- 家庭微网能量管理的优化研究:基于并行分布式差分算法的改进粒子群含需求响应方法及其在含PV-电池系统的应用分析 ,基于改进粒子群算法的含需求响应家庭微网能量管理-并行分布式差分算法优化策略研究与应用
- 750W高PF值充电机电源方案:UCC28070等芯片组合,原理图与资料全解析,750W高PF值充电机电源方案详解:UCC28070+ST6599+PIC芯片组合,原理图与资料全套,750W高PF值充
- 污水厂全流程工艺的3D高清渲染图集,展示丰富设备配置及应用实践 ,3D渲染下的污水厂工艺设备全景图:全面呈现50余套设备的高清效果,污水厂高清效果图,3D渲染后高清图 基本污水厂的工艺段该有的都有,
- Modbus RTU通讯在西门子S7 PLC中实现简易源码编程管理多个从站:RS485总线与SCL语言应用,Modbus RTU通讯S7-1200主站程序简易实现指南:TIA博图SCL语言轻松读写RS
- SRM12-8开关磁阻电机:2200W功率、高效能额定转速3450rpm,开关磁阻电机SRM12-8型功率2200w,高效能额定转速3450rpm的电机技术详解,开关磁阻电机SRM12-8,功率220
- PCB封装库:类型齐全,命名规范,分类管理,附3D模型,可信赖的电子元器件库资源,高质量PCB封装库:类型齐全,命名规范,分类管理,附3D模型,可靠易用,Allegro PCB封装库(分类、命名很规范
- ,No.26 基于FPGA的cordic算法实现,输出sin和cos波形(quartusii版本),包括程序操作录像,算法程序 CORDIC为Coordinate rotation digital c
- 基于COMSOL模型的原油脱水技术:双液滴聚结行为研究及其关键因素分析,COMSOL原油脱水模型:基于流体场与电-磁场耦合的乳化液双液滴聚结行为研究,影响因素与速率关系的深入探讨,[COMSOL原油脱
- 基于多算法融合的智能数据预测与分类:SVM、BP、RBF神经网络与灰色算法的联合应用及优化研究,基于多算法融合的智能数据预测与分类技术研究:包括SVM、BP、RBF神经网络及灰色算法的应用,以及PSO
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈


