package Server;
import Server.LogSystem.AheadLog;
//import Server.ZeroMqService.ActionExecutor;
import Server.Tools.OpsConcurrentHashMap;
import Server.ZeroMqService.ActionExecutorError;
import Server.ZeroMqService.ActionExecutorLog;
import Server.ZeroMqService.ActionExecutorSub;
import Server.ZeroMqService.ActionExecutorWatering;
import Server.entiy.SubMessage;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.*;
import org.apache.log4j.Logger;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class ServerMethods {
private static AtomicInteger atomicInteger=new AtomicInteger(0);
public static ServerMethods methods;
private ExecutorService executor= Executors.newCachedThreadPool();
private org.apache.log4j.Logger logger= Logger.getLogger(ServerMethods.class);
public void doConnect(Channel channel, MqttMessage message){
MqttConnectMessage connectMessage=(MqttConnectMessage)message;
MqttFixedHeader mqttFixedHeader=connectMessage.fixedHeader();
MqttConnectVariableHeader mqttConnectVariableHeader
=connectMessage.variableHeader();
//反馈报文的可变头
MqttConnAckVariableHeader
mqttConnAckVariableHeader=new MqttConnAckVariableHeader(
MqttConnectReturnCode.CONNECTION_ACCEPTED,
mqttConnectVariableHeader.isCleanSession()
);
MqttFixedHeader requestmqttfixedheader=new MqttFixedHeader(
MqttMessageType.CONNACK,mqttFixedHeader.isDup(),
MqttQoS.AT_MOST_ONCE,mqttFixedHeader.isRetain(),
0x02
);
MqttConnAckMessage
connAckMessage=new MqttConnAckMessage(requestmqttfixedheader,
mqttConnAckVariableHeader);
channel.writeAndFlush(connAckMessage);
}
public void doPublish(Channel channel,MqttMessage message){
MqttPublishMessage publishMessage=(MqttPublishMessage)message;
MqttFixedHeader mqttFixedHeader=publishMessage.fixedHeader();
MqttPublishVariableHeader publishVariableHeader=
publishMessage.variableHeader();
byte[] getbytes=new byte[publishMessage.payload().readableBytes()];
publishMessage.payload().readBytes(getbytes);
String data=new String(getbytes);
while (!AheadLog.actionHeadLog("dopublish")){//TODO:需要调时序数据库
logger.info("aheadLog insert failed\n");
}
logger.info("aheadLog insert success\n");
switch (publishVariableHeader.topicName()){
case "watering":
Thread watering=new Thread(new ActionExecutorWatering());
watering.setName(watering.getId()+"");
OpsConcurrentHashMap.putvalue(ActionExecutorWatering.datamap,watering.getName(),data);
// ActionExecutorWatering.datamap.put(watering.getName(),data);
executor.submit(watering);
break;
case "log":
Thread log=new Thread(new ActionExecutorLog());
log.setName(log.getId()+"");
OpsConcurrentHashMap.putvalue(ActionExecutorLog.datamap,log.getName(),data);
// ActionExecutorLog.datamap.put(log.getName(),data);
executor.submit(log);
break;
case "error":
Thread error=new Thread(new ActionExecutorError());
error.setName(error.getId()+"");
OpsConcurrentHashMap.putvalue(ActionExecutorError.datamap,error.getName(),data);
// ActionExecutorError.datamap.put(error.getName(),data);
executor.submit(error);
break;
}
MqttQoS qos=(MqttQoS)mqttFixedHeader.qosLevel();
MqttMessageIdVariableHeader messageIdVariableHeader
=MqttMessageIdVariableHeader.from(publishVariableHeader.packetId());
switch (qos){
case AT_MOST_ONCE:
break;
case AT_LEAST_ONCE:
MqttFixedHeader requestfixedheaderALO=new MqttFixedHeader(
MqttMessageType.PUBACK,mqttFixedHeader.isDup(),
MqttQoS.AT_MOST_ONCE,mqttFixedHeader.isRetain(),
0x02
);
MqttPubAckMessage requestpubackmessage=new MqttPubAckMessage(
requestfixedheaderALO,messageIdVariableHeader
);
logger.info("back to client:"+requestpubackmessage.toString());
channel.writeAndFlush(requestpubackmessage);
break;
case EXACTLY_ONCE:
MqttFixedHeader requestfixedheaderEO=new MqttFixedHeader(
MqttMessageType.PUBREC,false,
MqttQoS.AT_MOST_ONCE,false,
0x02
);
MqttMessage requestpubrecmessage=new MqttMessage(requestfixedheaderEO,messageIdVariableHeader);
logger.info("back to client:"+requestpubrecmessage.toString());
break;
default:
break;
}
}
public MqttMessage doDisconnect(Channel channel,MqttMessage message){
return null;
}
public void doSubscribe(Channel channel,MqttMessage message){
MqttSubscribeMessage mqttSubscribeMessage=(MqttSubscribeMessage)message;
// MqttSubscribePayload mqttSubscribePayload=mqttSubscribeMessage.payload();
//订阅消息队列
InetSocketAddress inetSocketAddress= (InetSocketAddress) channel.remoteAddress();
Thread thread=new Thread(new ActionExecutorSub());
SubMessage subMessage=new SubMessage();
subMessage.setIpaddress(inetSocketAddress.getAddress().toString());
subMessage.setPort(String.valueOf(inetSocketAddress.getPort()));
subMessage.setCtx(channel);
subMessage.setMessageid(mqttSubscribeMessage.variableHeader().messageId());
thread.setName(thread.getId()+"");
ActionExecutorSub.datamap.put(thread.getName(),subMessage);
executor.submit(thread);
}
public void doPingreq(Channel channel,MqttMessage message){
MqttFixedHeader fixedHeader=new MqttFixedHeader(
MqttMessageType.PINGRESP,false,
MqttQoS.AT_MOST_ONCE,false,
0x02
);
MqttMessage response=new MqttMessage(fixedHeader);
channel.writeAndFlush(response);
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
基于java开发的农作物灌溉系统源码(毕业设计项目).zip 基于java开发的农作物灌溉系统源码(毕业设计项目).zip 基于java开发的农作物灌溉系统源码(毕业设计项目).zip 【备注】 主要针对计算机相关专业的正在做毕设的学生和需要项目实战的Java学习者。 也可作为课程设计、期末大作业。包含:项目源码、项目说明等,该项目可以直接作为毕设、课程设计使用。 也可以用来学习参考借鉴!
资源推荐
资源详情
资源评论
收起资源包目录
基于java开发的农作物灌溉系统源码(毕业设计项目).zip (53个子文件)
项目说明.md 39B
CropIrrigationSystem.iml 80B
pom.xml 3KB
src
main
resources
log4j.properties 1KB
java
Client
Client.java 2KB
ClientHandler.java 2KB
Server
TheServer.java 2KB
LogSystem
AheadLog.java 184B
ZeroMqService
ActionExecutorLog.java 990B
ActionExecutorSub.java 2KB
ActionExecutorWatering.java 697B
ActionExecutorError.java 1KB
DBUtil
InfluxDBUtil.java 3KB
Tools
OpsConcurrentHashMap.java 315B
ServerHandler.java 2KB
ServerMethods.java 7KB
entiy
SubMessage.java 762B
.idea
jarRepositories.xml 1KB
codeStyles
codeStyleConfig.xml 149B
Project.xml 263B
uiDesigner.xml 9KB
vcs.xml 167B
misc.xml 513B
compiler.xml 541B
.gitignore 176B
maven-build.xml 11KB
target
classes
TestEmqxpub.class 2KB
Client
ClientHandler.class 3KB
Client$1.class 1KB
Client.class 3KB
TestEmqx.class 3KB
OnMessageCallBack.class 2KB
log4j.properties 1KB
PushCallback.class 1KB
Testemqxtwo.class 2KB
Server
ServerMethods$1.class 780B
LogSystem
AheadLog.class 413B
ZeroMqService
ActionExecutorSub.class 3KB
ActionExecutorError.class 2KB
ActionExecutorWatering.class 2KB
ActionExecutorLog.class 2KB
DBUtil
InfluxDBUtil.class 4KB
TheServer$1.class 2KB
ServerMethods.class 8KB
TheServer.class 3KB
ServerHandler.class 2KB
entiy
SubMessage.class 1KB
ServerHandler$1.class 897B
maven-build.properties 752B
information
参考链接.html 693B
picture
农作物自动灌溉系统架构图.jpg 94KB
MQTT 282B
build.xml 981B
共 53 条
- 1
资源评论
- qq_452491762024-03-08终于找到了超赞的宝藏资源,果断冲冲冲,支持!
- m0_515134312024-01-02资源简直太好了,完美解决了当下遇到的难题,这样的资源很难不支持~
极客程序设计
- 粉丝: 7102
- 资源: 3583
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功