import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
public class MqttServer {
public static ChannelGroup MQTTdeviceChannelGroup;
public static ConcurrentHashMap<Integer, MqttMessage> mqttMessageIdMap=new ConcurrentHashMap<>();
private EventLoopGroup bossGroup,workerGroup;
//保存订阅地址和chanelid,当推送数据时,会向此订阅地址的多个channel发送数据
public static Map<String, List<ChannelId>> topicChannelIds =new ConcurrentHashMap<>();
//保存所有订阅的topic
public static List<String> subTopics=new ArrayList<>();
public static void main(String[] args) throws Exception{
new MqttServer().start();
}
public void start() throws Exception {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
MQTTdeviceChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(1883))
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_RCVBUF,10485760)
.option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_TIMEOUT,15)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new IdleStateHandler(600,600,1200))
.addLast("decoder",new MqttDecoder())
.addLast("encoder",MqttEncoder.INSTANCE)
.addLast(new MqttHandler());
}
});
ChannelFuture future=b.bind().sync();
if (future.isSuccess()){
System.out.println("mqtt broker start success");
}else{
System.out.println("mqtt broker start failed");
}
} catch(Exception e) {
shutdown();
e.printStackTrace();
}
}
public void shutdown() throws InterruptedException {
if (workerGroup != null && bossGroup != null) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
System.out.println("shutdown success");
}
}
public class MqttHandler extends ChannelInboundHandlerAdapter {
private MqttMsgBack mqttMsgBack=new MqttMsgBack();
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception{
System.out.println("channelId: "+ctx.channel().id()+" 的客户端注册成功-已保存"+"\r");
MQTTdeviceChannelGroup.add(ctx.channel());
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception{
System.out.println("channelId: "+ctx.channel().id()+" 的客户端取消注册成功-已删除"+"\r");
rem(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception{
System.out.println("channelId: "+ctx.channel().id()+" 的客户端强制断开"+"\r");
}
private void rem(ChannelHandlerContext ctx){
ChannelId channelId=ctx.channel().id();
if (ctx.channel()!=null){
//如果有客户端强制断开,并没有取消注册,就删除channelId
MQTTdeviceChannelGroup.remove(ctx.channel());
for (int i=0;i<subTopics.size();i++){
//从topicChannelIds中移除对应的channelId
String topicName=subTopics.get(i).toString().trim();
if (!topicName.equals("")){
List<ChannelId> channelIds=topicChannelIds.get(topicName);
channelIds.remove(channelId);
//订阅的topic的channelIds--》{topic1=[2a6fa0fb], topic2=[2a6fa0fb], topic11=[], topic22=[]}
//不保留topic键值
if (channelIds.isEmpty()){
//如果移除后topic对应的值为空,就删除这个topic
topicChannelIds.remove(topicName);
}
}
}
System.out.println("rem-topicchannelIds: "+topicChannelIds.toString()+"\r");
}
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg!=null){
MqttMessage mqttMessage=(MqttMessage) msg;
MqttFixedHeader mqttFixedHeader=mqttMessage.fixedHeader();
Channel channel=ctx.channel();
if (msg instanceof MqttMessage){
if (mqttFixedHeader==null){
ctx.close();
return;
}
System.out.println("channelId: "+channel.id()+" 接收到的信息类型 "+mqttFixedHeader.messageType().toString());
switch(mqttFixedHeader.messageType()){
case CONNECT:
mqt