#include "MessageCentrer.h"
#include "Subscriber.h"
#include "Publisher.h"
#define MAX_PUBLISHES 10000
MessageCentrer* MessageCentrer::mSgMC = nullptr;
std::mutex MessageCentrer::mMCMutex;
MessageCentrer* MessageCentrer::GetMC()
{
if (mSgMC == nullptr)
{
std::unique_lock<std::mutex> lock(mMCMutex);
if (mSgMC == nullptr)
{
volatile auto temp = new (std::nothrow) MessageCentrer();
mSgMC = temp;
}
}
return mSgMC;
}
void MessageCentrer::Run()
{
mCoreProcss.reset(new std::thread(&MessageCentrer::CoreProcss, this));
}
void MessageCentrer::RegistPublish(std::string tpcKey, void* msgdata, unsigned int datasize)
{
std::unique_lock<std::mutex> lock(mPublishMutex);
if ((this->mSubscriber.find(tpcKey) != this->mSubscriber.end()) &&
(this->mPublisher[tpcKey].size() > MAX_PUBLISHES)) return;
void* tmpdata = new char[datasize];
memcpy(tmpdata, msgdata, datasize);
this->mPublisher[tpcKey].push_back(tmpdata);
}
void MessageCentrer::RegistSubscribe(std::string tpcKey, Subscriber* subscriber)
{
std::unique_lock<std::mutex> lock(mSubscribeMutex);
this->mSubscriber[tpcKey].remove(subscriber);
this->mSubscriber[tpcKey].push_back(subscriber);
}
void MessageCentrer::CancelSubscribe(std::string tpcKey, Subscriber* subscriber)
{
std::unique_lock<std::mutex> lock(mSubscribeMutex);
if (this->mSubscriber.find(tpcKey) != this->mSubscriber.end())
this->mSubscriber.find(tpcKey)->second.remove(subscriber);
}
MessageCentrer::MessageCentrer()
{
this->mPublisher.clear();
this->mSubscriber.clear();
}
MessageCentrer::~MessageCentrer()
{
}
void MessageCentrer::CoreProcss()
{
while (true)
{
auto it = this->mSubscriber.begin();
while (it != this->mSubscriber.end())
{
if (this->mPublisher.find(it->first) != this->mPublisher.end())
{
auto itt = it->second.begin();
while (itt != it->second.end())
{
auto mpitr = this->mPublisher.find(it->first)->second.begin();
auto mpitrend = this->mPublisher.find(it->first)->second.end();
while (mpitr != mpitrend)
{
(*itt)->HandeEnvent(it->first, *mpitr);
++mpitr;
}
++itt;
}
mPublishMutex.lock();
auto mpitr = this->mPublisher.find(it->first)->second.begin();
auto mpitrend = this->mPublisher.find(it->first)->second.end();
while (mpitr != mpitrend)
{
delete[](*mpitr);
++mpitr;
}
this->mPublisher.find(it->first)->second.clear();
this->mPublisher.erase(it->first);
mPublishMutex.unlock();
}
++it;
}
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
发布-订阅(Publish-Subscribe)模式 C++实现
共8个文件
h:6个
cpp:2个
需积分: 1 1 下载量 31 浏览量
2024-05-12
23:14:44
上传
评论
收藏 4KB RAR 举报
温馨提示
发布-订阅(Publish-Subscribe)模式是一种消息传递模式,用于构建分布式系统中的通信机制。 本实现主要包括3个部分:Publisher、Subscriber和MessageCentrer。 MessageCentrer主要做3个功能:Publish Table —— 发布消息的主题与数据的映射表;Subscriber Table —— 订阅消息的主题与订阅者表;Run() —— 队列维护的主过程处理接口。 功能支持:N:M模式发布订阅,一个发布者可以发布多个主题消息,一个订阅者可以接收多个发布者发布的主体消息。 参考地址:https://blog.csdn.net/A_Pointer/article/details/138617089
资源推荐
资源详情
资源评论
收起资源包目录
Publisher-Subscriber.rar (8个子文件)
Publisher-Subscriber
MessageCentrer.cpp 3KB
AppPublisher.h 330B
main.cpp 619B
Publisher.h 220B
Topics.h 148B
AppSubscriber.h 1KB
Subscriber.h 307B
MessageCentrer.h 1KB
共 8 条
- 1
资源评论
东风吹柳
- 粉丝: 924
- 资源: 11
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功