package com.iteaj.iot.client.mqtt;
import cn.hutool.core.collection.CollectionUtil;
import com.iteaj.iot.*;
import com.iteaj.iot.client.*;
import com.iteaj.iot.client.mqtt.message.MqttClientMessage;
import com.iteaj.iot.client.protocol.ClientSocketProtocol;
import com.iteaj.iot.client.protocol.ServerInitiativeProtocol;
import com.iteaj.iot.config.ConnectProperties;
import com.iteaj.iot.event.ClientStatus;
import com.iteaj.iot.event.StatusEvent;
import com.iteaj.iot.utils.ByteUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.mqtt.*;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.function.Consumer;
import static com.iteaj.iot.CoreConst.CLIENT_DECODER_HANDLER;
/**
* 基于mqtt协议的客户端
* @see MqttConnectProperties#getAllIdleTime() 作为keepAlive字段 单位毫秒
* 详情资料{@code https://blog.csdn.net/weixin_40973138/article/details/90036953}
* @see MqttEncoder
* @see MqttDecoder
*/
public class MqttClient extends TcpSocketClient implements
ChannelInboundHandler, ChannelOutboundHandler, MultiStageConnect {
/**
* 用来标识mqtt客户端是否已经得到服务端的connAck
*/
private ChannelPromise connAckFinished;
private MessageIdManager messageIdManager;
public MqttClient(MqttClientComponent clientComponent) {
this(clientComponent, clientComponent.getConfig());
}
public MqttClient(MqttClientComponent clientComponent, MqttConnectProperties config) {
super(clientComponent, config);
this.messageIdManager = new SimpleMessageIdManager(this, clientComponent);
}
public MqttClient(MqttClientComponent clientComponent, MqttConnectProperties config, MessageIdManager messageIdManager) {
super(clientComponent, config);
this.messageIdManager = messageIdManager;
}
@Override
protected ChannelInboundHandler createProtocolDecoder() {
// https://gitee.com/iteaj/iot/issues/I5NLVV
return new MqttDecoder(getConfig().getMaxBytesInMessage());
}
@Override
protected ChannelOutboundHandlerAdapter createProtocolEncoder() {
return MqttEncoder.INSTANCE;
}
/**
* 新增MqttMessage到MqttClientMessage的处理器 包括心跳处理器和解码处理器
* @param channel
*/
@Override
protected void doInitChannel(Channel channel) {
super.doInitChannel(channel);
channel.pipeline().addAfter(CLIENT_DECODER_HANDLER, "MqttMessageToClientMessageHandler", this);
}
/**
* 连接成功后发起 connect 请求
* @param future
*/
@Override
protected void successCallback(ChannelFuture future) {
boolean willFlag = getConfig().getWillMessage() != null && getConfig().getWillTopic() != null;
MqttConnectMessage connectMessage = MqttMessageBuilders.connect()
.willFlag(willFlag)
.willQoS(getConfig().getWillQos())
.clientId(getConfig().getClientId())
.username(getConfig().getUsername())
.willTopic(getConfig().getWillTopic())
.keepAlive(getConfig().getKeepAlive())
.willRetain(getConfig().isWillRetain())
.protocolVersion(getConfig().getVersion())
.cleanSession(getConfig().isCleanSession())
.password(getConfig().getPasswordByCharset())
.willMessage(getConfig().getWillMessageByCharset())
.build();
this.getChannel().writeAndFlush(connectMessage).addListener(future1 -> {
if(future1.isSuccess()) {
if(logger.isTraceEnabled()) {
logger.trace("mqtt({}) {} - 远程主机:{} - 状态:成功 - 报文:{}"
, getName(), MqttMessageType.CONNECT, remoteKey(), connectMessage);
}
} else {
logger.warn("mqtt({}) {} - 远程主机:{} - 状态:失败 - 报文:{}"
, getName(), MqttMessageType.CONNECT, remoteKey(), connectMessage, future1.cause());
}
});
}
@Override
public ChannelFuture close() {
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.DISCONNECT
,false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage disconnectMqttMessage = new MqttMessage(header);
// 发送断开连接报文
return this.getChannel().writeAndFlush(disconnectMqttMessage).addListener(future -> {
if(future.isSuccess()) {
getChannel().close().addListener(disFuture -> {
if(disFuture.isSuccess()) {
disconnectSuccessCall();
if(logger.isTraceEnabled()) {
logger.trace("mqtt({}) 关闭客户端({}) - 远程主机:{} - 状态:成功"
, getName(), MqttMessageType.DISCONNECT, remoteKey());
}
} else {
logger.warn("mqtt({}) 关闭客户端({}) - 远程主机:{} - 状态:失败"
, getName(), MqttMessageType.DISCONNECT, remoteKey(), disFuture.cause());
}
});
if(logger.isTraceEnabled()) {
logger.trace("mqtt({}) {} - 远程主机:{} - 状态:成功"
, getName(), MqttMessageType.DISCONNECT, remoteKey());
}
} else if(future.cause() instanceof ClosedChannelException) { // 服务端已经关闭此连接, 也属于完成
disconnectSuccessCall();
if(logger.isTraceEnabled()) {
logger.trace("mqtt({}) 关闭客户端({}) - 远程主机:{} - 状态:成功"
, getName(), MqttMessageType.DISCONNECT, remoteKey());
}
} else {
logger.warn("mqtt({}) {} - 远程主机:{} - 状态:失败"
, getName(), MqttMessageType.DISCONNECT, remoteKey(), future.cause());
}
});
}
@Override
public ChannelFuture writeAndFlush(ClientSocketProtocol clientProtocol) {
ClientMessage clientMessage = clientProtocol.requestMessage();
if(!(clientMessage instanceof MqttClientMessage)) {
throw new ProtocolException("Mqtt报文必须使用类型[MqttClientMessage]");
}
return super.writeAndFlush(clientProtocol);
}
private Object debugMessage(ClientMessage message, MqttPublishMessage mqttMessage) {
final byte[] binMsg = message.getMessage();
final MqttFixedHeader fixedHeader = mqttMessage.fixedHeader();
final MqttPublishVariableHeader variableHeader = mqttMessage.variableHeader();
return new StringBuilder("MqttPublishMessage")
.append('[')
.append(fixedHeader != null ? fixedHeader.toString() : "").append(',')
.append(variableHeader != null ? variableHeader.toString() : "")
.append(", payload=[").append(binMsg != null ? ByteUtil.bytesToHexByFormat(binMsg) : "")
.append("]]")
.toString();
}
/**
* 构建要发布的报文
* @param message
* @return
* @throws IOException
*/
public MqttPublishMessage buildPublishMqttMessage(MqttClientMessage message) {
final int nextId = getMessageIdManager().nextId();
return this.buildPublishMqttMessage(message, nextId);
}
protected MqttPublishMessage buildPublishMqttMessage(MqttClientMessage message, Integer packetId) {
if(message.getMessage() == null) {
message.writeBuild();
}
return MqttMessageBuilders.publish()
.messageId(pack
没有合适的资源?快使用搜索试试~ 我知道了~
iot-ucy是使用java语言且基于netty, spring boot, redis等开源项目开发来的物联网网络中间件
共631个文件
java:607个
xml:14个
sql:2个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 187 浏览量
2024-04-04
13:51:10
上传
评论
收藏 600KB ZIP 举报
温馨提示
iot-ucy是基于netty, spring boot等框架实现的物联网中间件, 已支持tcp、udp、mqtt、mqtt网关、websocket、modbus、dtu适配(AT协议)、dtu+modbus(tcp和rtu) 适配,plc (西门子, 欧姆龙),串口等常用物联网协议,并且支持快速接入redis、emqx、TDengine等数据库或消息队列
资源推荐
资源详情
资源评论
收起资源包目录
iot-ucy是使用java语言且基于netty, spring boot, redis等开源项目开发来的物联网网络中间件 (631个子文件)
com.iteaj.iot.DeviceManagerFactory 49B
spring.factories 549B
spring.factories 217B
MqttClient.java 35KB
ModbusDtuTestHandle.java 29KB
ByteUtil.java 20KB
ModbusRtuClientProtocol.java 15KB
ModbusTestHandle.java 15KB
ClientInitiativeProtocol.java 13KB
SocketClient.java 13KB
ServerInitiativeProtocol.java 12KB
DefaultWebSocketServerProtocol.java 12KB
IotTestAutoConfiguration.java 12KB
ModbusTcpForDtuCommonProtocol.java 11KB
IotTestProperties.java 11KB
ModbusRtuForDtuCommonProtocol.java 11KB
FrameworkManager.java 11KB
EventManagerHandler.java 11KB
PlcClientProtocol.java 11KB
RedisConsumerOperaManager.java 10KB
SqlStatementUtil.java 10KB
ModbusTcpClientCommonProtocol.java 10KB
SocketServerComponent.java 10KB
MqttClientTestHandle.java 9KB
ModbusRtuBody.java 9KB
SocketClientComponent.java 9KB
FixedLengthRequestHandle.java 8KB
SiemensS7Client.java 8KB
DataAcceptHandle.java 8KB
PlcReadWrite.java 8KB
WebSocketServerDecoder.java 7KB
ServerHealthEntity.java 7KB
DefaultDtuMessageAware.java 7KB
OkHttpManager.java 7KB
SiemensS7TestHandle.java 7KB
OmronAddressResolver.java 7KB
MqttClientComponent.java 7KB
DefaultWebSocketClientProtocol.java 7KB
ClientServiceHandler.java 7KB
OmronTcpTestHandle.java 7KB
SiemensAddressResolver.java 7KB
ClientHttpProtocol.java 7KB
TcpDeviceManager.java 7KB
IotApplication.java 7KB
ModbusTcpBody.java 7KB
TaosSqlMeta.java 7KB
FixedLengthClientRequestHandle.java 7KB
WebSocketServerComponentAbstract.java 7KB
PlcErrorConst.java 7KB
MessageCreator.java 6KB
DefaultComponentFactory.java 6KB
ModbusTcpMessageBuilder.java 6KB
SqlContext.java 6KB
UdpEventManagerHandler.java 6KB
TestGroupClientListener.java 6KB
ProtocolTimeoutManager.java 6KB
SerialClient.java 6KB
ModbusRtuMessageBuilder.java 6KB
SocketMessageDecoder.java 6KB
MqttConnectProperties.java 6KB
ProtocolBusinessHandler.java 6KB
UdpServerComponent.java 5KB
MultiStageConnect.java 5KB
BreakerDataHandle.java 5KB
SerialComponent.java 5KB
DefaultUdpServerProtocol.java 5KB
ModbusTcpHeader.java 5KB
DefaultRdbmsSqlManager.java 5KB
ProtocolHandleFactory.java 5KB
IdWorker.java 5KB
WebSocketClientDecoder.java 5KB
SocketMessage.java 5KB
SimpleMessageIdManager.java 5KB
DefaultTaosSqlManager.java 5KB
SiemensS7Protocol.java 5KB
TcpServerComponent.java 5KB
SerialPortCreator.java 5KB
OmronTcpProtocol.java 5KB
WebSocketClientConnectProperties.java 5KB
ModbusRtuMessageBuilder.java 5KB
SerialEventProtocolHandle.java 5KB
OmronUtils.java 5KB
UdpServerProtocolEncoder.java 5KB
IotServerConfiguration.java 5KB
OmronMessageBody.java 5KB
SiemensMessageBody.java 4KB
ModbusRtuForDtuClientTestProtocol.java 4KB
ServerInitiativeSyncProtocol.java 4KB
ModbusTcpForDtuClientTestProtocol.java 4KB
SerialTestHandle.java 4KB
ModbusUtil.java 4KB
DtuTestHandle.java 4KB
DataFormatConvert.java 4KB
OmronTcpClient.java 4KB
IotThreadManager.java 4KB
ClientSocketProtocol.java 4KB
SimulatorDtuClient.java 4KB
ClientConnectProperties.java 4KB
LengthFieldBasedFrameDecoderServerComponent.java 4KB
IotRedisAutoConfiguration.java 4KB
共 631 条
- 1
- 2
- 3
- 4
- 5
- 6
- 7
资源评论
Java程序员-张凯
- 粉丝: 1w+
- 资源: 6718
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功