package io.github.quickmsg.core.protocol;
import io.github.quickmsg.common.acl.AclAction;
import io.github.quickmsg.common.acl.AclManager;
import io.github.quickmsg.common.auth.AuthManager;
import io.github.quickmsg.common.channel.ChannelRegistry;
import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.config.AuthConfig;
import io.github.quickmsg.common.config.ConnectModel;
import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.enums.ChannelStatus;
import io.github.quickmsg.common.enums.Event;
import io.github.quickmsg.common.message.*;
import io.github.quickmsg.common.metric.CounterType;
import io.github.quickmsg.common.metric.MetricManager;
import io.github.quickmsg.common.metric.MetricManagerHolder;
import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.topic.SubscribeTopic;
import io.github.quickmsg.common.topic.TopicRegistry;
import io.github.quickmsg.core.mqtt.MqttReceiveContext;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.ContextView;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author luxurong
*/
@Slf4j
public class ConnectProtocol implements Protocol<MqttConnectMessage> {
private static List<MqttMessageType> MESSAGE_TYPE_LIST = new ArrayList<>();
private static final int MILLI_SECOND_PERIOD = 1_000;
static {
MESSAGE_TYPE_LIST.add(MqttMessageType.CONNECT);
}
private static void accept(MqttChannel mqttChannel1) {
// metric.getMetricCounter(CounterEnum.CONNECT_COUNTER).decrement();
}
@Override
public Mono<Void> parseProtocol(SmqttMessage<MqttConnectMessage> smqttMessage, MqttChannel mqttChannel, ContextView contextView) {
try {
MqttConnectMessage message = smqttMessage.getMessage();
MetricManagerHolder.metricManager.getMetricRegistry().getMetricCounter(CounterType.CONNECT_EVENT).increment();
MqttReceiveContext mqttReceiveContext = (MqttReceiveContext) contextView.get(ReceiveContext.class);
EventRegistry eventRegistry = mqttReceiveContext.getEventRegistry();
MqttConnectVariableHeader mqttConnectVariableHeader = message.variableHeader();
MqttConnectPayload mqttConnectPayload = message.payload();
String clientIdentifier = mqttConnectPayload.clientIdentifier();
ChannelRegistry channelRegistry = mqttReceiveContext.getChannelRegistry();
TopicRegistry topicRegistry = mqttReceiveContext.getTopicRegistry();
MetricManager metricManager = mqttReceiveContext.getMetricManager();
byte mqttVersion = (byte) mqttConnectVariableHeader.version();
AuthManager authManager = mqttReceiveContext.getAuthManager();
/*check clientIdentifier exist*/
MqttChannel existMqttChannel = channelRegistry.get(clientIdentifier);
if (mqttReceiveContext.getConfiguration().getConnectModel() == ConnectModel.UNIQUE) {
if (existMqttChannel != null && existMqttChannel.getStatus() == ChannelStatus.ONLINE) {
return mqttChannel.write(
MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttVersion),
false).then(mqttChannel.close());
}
} else {
if (existMqttChannel != null && existMqttChannel.getStatus() == ChannelStatus.ONLINE) {
if (System.currentTimeMillis() - existMqttChannel.getConnectTime() > (mqttReceiveContext.getConfiguration().getNotKickSecond() * 1000)) {
existMqttChannel.close().subscribe();
} else {
return mqttChannel.write(
MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttVersion),
false).then(mqttChannel.close());
}
}
}
/*protocol version support*/
if (MqttVersion.MQTT_3_1.protocolLevel() != mqttVersion &&
MqttVersion.MQTT_3_1_1.protocolLevel() != mqttVersion
&& MqttVersion.MQTT_5.protocolLevel() != mqttVersion) {
return mqttChannel.write(
MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, mqttVersion),
false).then(mqttChannel.close());
}
/*password check*/
if (authManager.auth(mqttConnectPayload.userName(),mqttConnectPayload.passwordInBytes(), clientIdentifier)) {
/*cancel defer close not authenticate channel */
mqttChannel.disposableClose();
mqttChannel.setClientIdentifier(mqttConnectPayload.clientIdentifier());
if (mqttConnectVariableHeader.isWillFlag()) {
mqttChannel.setWill(MqttChannel.Will.builder()
.isRetain(mqttConnectVariableHeader.isWillRetain())
.willTopic(mqttConnectPayload.willTopic())
.willMessage(mqttConnectPayload.willMessageInBytes())
.mqttQoS(MqttQoS.valueOf(mqttConnectVariableHeader.willQos()))
.build());
}
mqttChannel.setAuthTime(System.currentTimeMillis());
mqttChannel.setConnectTime(System.currentTimeMillis());
mqttChannel.setKeepalive(mqttConnectVariableHeader.keepAliveTimeSeconds());
mqttChannel.setSessionPersistent(!mqttConnectVariableHeader.isCleanSession());
mqttChannel.setStatus(ChannelStatus.ONLINE);
mqttChannel.setUsername(mqttConnectPayload.userName());
/*registry unread event close channel */
mqttChannel.getConnection().onReadIdle((long) mqttConnectVariableHeader.keepAliveTimeSeconds() * MILLI_SECOND_PERIOD << 1,
() -> close(metricManager, mqttChannel, mqttReceiveContext, eventRegistry));
CloseMqttMessage closeMqttMessage = new CloseMqttMessage();
closeMqttMessage.setClientIdentifier(clientIdentifier);
ClusterMessage clusterMessage = new ClusterMessage(closeMqttMessage);
mqttReceiveContext.getClusterRegistry().spreadPublishMessage(clusterMessage)
.subscribe();
/*registry will message send */
mqttChannel.registryClose(channel -> Optional.ofNullable(mqttChannel.getWill())
.ifPresent(will ->
topicRegistry.getSubscribesByTopic(will.getWillTopic(), will.getMqttQoS())
.forEach(subscribeTopic -> {
MqttChannel subscribeChannel = subscribeTopic.getMqttChannel();
subscribeChannel.write(
MqttMessageBuilder
.buildPub(false,
subscribeTopic.getQoS(),
subscribeTopic.getQoS() == MqttQoS.AT_MOST_ONCE
? 0 : subscribeChannel.generateMessageId(),
will.getWillTopic(),
Unpooled.wrappedBuffer(wil
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
SMQTT基于reactor-netty(spring-webflux底层依赖) 开发,底层采用Reactor3反应堆模型,支持单机部署,支持容器化部署;一款高性、高吞吐量、高扩展性的物联网mqtt集群broker!支持千万级+链接,同时支持自定义扩展功能,同时支持多种协议交互,是一款非常优秀的消息中间件,功能非常强大,
资源推荐
资源详情
资源评论
收起资源包目录
一款高性、高吞吐量、高扩展性的物联网mqtt集群broker (455个子文件)
io.github.quickmsg.common.cluster.ClusterRegistry 48B
basic_policy.csv 147B
.env.development 44B
.env 321B
.gitignore 349B
.gitignore 261B
index.html 1KB
io.github.quickmsg.common.http.HttpActor 2KB
favicon.ico 41KB
ConnectProtocol.java 12KB
MqttChannel.java 12KB
DatabasechangelogRecord.java 11KB
BootstrapConfig.java 10KB
MqttMessageBuilder.java 10KB
DbMessageRegistry.java 9KB
Bootstrap.java 8KB
MetricManager.java 8KB
AbstractReceiveContext.java 8KB
CommonProtocol.java 7KB
RedisMessageRegistry.java 6KB
PublishProtocol.java 6KB
MqttSourceBean.java 6KB
ScubeClusterRegistry.java 5KB
Databasechangelog.java 5KB
SmqttSessionRecord.java 5KB
JCasBinAclManager.java 5KB
MessageProxy.java 5KB
SmqttRetainRecord.java 5KB
TreeNode.java 5KB
DatabasechangeloglockRecord.java 4KB
Databasechangeloglock.java 4KB
SubscribeProtocol.java 4KB
SmqttSession.java 4KB
SmqttRetain.java 4KB
Event.java 4KB
ClusterReceiver.java 4KB
AbstractStarter.java 4KB
HttpRouterAcceptor.java 4KB
RabbitmqSourceBean.java 3KB
DefaultTopicRegistry.java 3KB
MessageUtils.java 3KB
JacksonUtil.java 3KB
WebSocketMqttReceiver.java 3KB
RuleQueryActor.java 3KB
InfluxDbMetricBean.java 3KB
AutoMqttConfiguration.java 3KB
DefaultProtocolAdaptor.java 3KB
FixedTopicFilter.java 3KB
ClusterClientStrategy.java 3KB
AbstractSslHandler.java 3KB
SessionMessage.java 3KB
LoginResourceActor.java 3KB
ReceiveContext.java 2KB
MqttReceiver.java 2KB
DefaultTransport.java 2KB
TopicRuleNode.java 2KB
BootstrapKey.java 2KB
RocketmqSourceBean.java 2KB
RetainMessage.java 2KB
CloseConnectionActor.java 2KB
UnSubscribeProtocol.java 2KB
SqlAuthManager.java 2KB
SourceQueryActor.java 2KB
RuleChain.java 2KB
AclAddPolicyActor.java 2KB
AclQueryPolicyActor.java 2KB
ClusterNode1.java 2KB
HttpAuthManager.java 2KB
SpringBootstrapConfig.java 2KB
TreeTopicFilter.java 2KB
KafkaSourceBean.java 2KB
Configuration.java 2KB
TopicRegistry.java 2KB
HttpSourceBean.java 2KB
HttpTransport.java 2KB
DbSourceBean.java 2KB
ConnectionActor.java 2KB
TopicTest.java 2KB
PrometheusMetricBean.java 2KB
SubscribeTopic.java 2KB
MqttConfiguration.java 2KB
DefaultChannelRegistry.java 2KB
PublishAckProtocol.java 2KB
HttpActor.java 2KB
SentinelClientStrategy.java 2KB
DefaultObjectMapper.java 2KB
AbsAck.java 2KB
HttpConfiguration.java 2KB
FormatUtils.java 2KB
HttpReceiver.java 2KB
DefaultMessageRegistry.java 2KB
ClusterRegistry.java 2KB
MqttReceiveContext.java 2KB
RuleExecute.java 2KB
TransmitRuleNode.java 2KB
AclDeletePolicyActor.java 2KB
Smqtt.java 2KB
HikariCPConnectionProvider.java 1KB
HikariCPConnectionProvider.java 1KB
PublishActor.java 1KB
共 455 条
- 1
- 2
- 3
- 4
- 5
资源评论
Java程序员-张凯
- 粉丝: 1w+
- 资源: 6746
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 毕业设计基于Python卷积神经网络CNN的图像分类系统源码+模型+说明文档+全部数据资料.zip
- matlab 基于SVM的手写字体识别源代码+详细教程
- 课程管理平台 JAVA+Vue.js+SpringBoot+MySQL
- 毕业设计 基于Python卷积神经网络CNN的图像分类系统源码+模型+说明文档+全部数据资料.zip
- matlab 基于SVM的图像分割-真彩色图像分割源代码+详细教程
- go-admin框架vue权限字符-全网最透彻讲解
- matlab 基于SVM的信息粒化时序回归预测-上证指数开盘指数变化趋势和变化空间预测源代码+详细教程
- OBS多平台推流支持插件
- matlab 基于SVM的回归预测分析-上证指数开盘指数预测源代码+详细教程
- comfyui的BrushNet电商公司和摄影公司都在用的AI工作流
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功