package com.yeexun.xiot.simulator.srv.mqttSimulator;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Maps;
import com.yeexun.xiot.base.dto.R;
import com.yeexun.xiot.simulator.srv.message.MessageHandler;
import com.yeexun.xiot.simulator.srv.web.domain.dto.StartDto;
import com.yeexun.xiot.simulator.srv.web.domain.entity.MQTTAuth;
import com.yeexun.xiot.simulator.srv.web.domain.entity.XiotSimMessage;
import com.yeexun.xiot.simulator.srv.web.mapper.XiotSimMessageMapper;
import com.yeexun.xiot.simulator.srv.web.service.SimulatorUserService;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.util.concurrent.Future;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.expands.script.engine.DynamicScriptEngine;
import org.hswebframework.expands.script.engine.DynamicScriptEngineFactory;
import org.jetlinks.mqtt.client.MqttClient;
import org.jetlinks.mqtt.client.MqttClientCallback;
import org.jetlinks.mqtt.client.MqttClientConfig;
import org.jetlinks.mqtt.client.MqttConnectResult;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
* <p>
* 模拟器实现类
* </p>
*
* @author wangtao
* @since 2021-12-02
*/
@Service("MqttSimulator")
@Scope("prototype")
@Slf4j
public class MqttSimulator implements SimulatorUserService {
private BlockingQueue<Runnable> createMqttJob;
private ScheduledExecutorService executorService;
@Autowired
private StartDto startDto;
public MqttSimulator(StartDto startDto) {
this.startDto = startDto;
}
private int threadSize = Runtime.getRuntime().availableProcessors() * 2;
Map<String, ClientSession> clientMap = new ConcurrentHashMap<>();
BiConsumer<Integer, ClientSession> eventDataSuppliers;
BiConsumer<Integer, ClientSession> propertyDataSuppliers;
Map<String, MessageHandler> messageHandlerMap = new HashMap<>();
private Map<String, AtomicInteger> portCounter = new ConcurrentHashMap<>();
private Consumer<ClientSession> onConnect;
// ArrayList<ScheduledExecutorService> scheduledExecutorServices=new ArrayList<>();
transient EventLoopGroup eventLoopGroup;
transient Class channelClass;
private static Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>();
private static Map<String, ScheduledExecutorService> serviceMap = new ConcurrentHashMap<>();
private static Map<String, Map<String, MqttClient>> taskMap = new ConcurrentHashMap<>();
private static Map<String, Map<String, ClientSession>> clientSessionMap = new ConcurrentHashMap<>();
public boolean addTask(ScheduledTask scheduledTask, ScheduledExecutorService executorService) {
if (futureMap.get(scheduledTask.getTaskId()) != null) {
return false;
}
ScheduledFuture<?> scheduledFuture = executorService.scheduleAtFixedRate(scheduledTask.getRunnable(), scheduledTask.getInitialDelay(), scheduledTask.getPeriod(), TimeUnit.MILLISECONDS);
futureMap.put(scheduledTask.getTaskId(), scheduledFuture);
return true;
}
public boolean remove(XiotSimMessage simulator) {
if (futureMap.get(simulator.getId()) == null) {
return false;
}
ScheduledFuture scheduledFuture = futureMap.get(simulator.getId());
scheduledFuture.cancel(false);
futureMap.remove(simulator.getId());
ScheduledExecutorService scheduledExecutorService = serviceMap.get(simulator.getId());
scheduledExecutorService.shutdown();
serviceMap.remove(simulator.getId());
Map<String, ClientSession> stringClientSessionMap = clientSessionMap.get(simulator.getId());
for (int i = 0; i < (simulator.getStart() + simulator.getTotal()); i++) {
ClientSession clientSession = stringClientSessionMap.get(simulator.getClientId() + i);
// System.out.println(clientSession.getClient().getClientConfig().getClientId());
clientSession.getClient().disconnect();
}
taskMap.remove(simulator.getId());
return true;
}
public void shutdown() {
executorService.shutdown();
}
public void runRate(Runnable runnable, long time) {
executorService.scheduleAtFixedRate(runnable, 2000, time, TimeUnit.MILLISECONDS);
}
public void init(StartDto startDto) {
executorService = Executors.newScheduledThreadPool(startDto.getBatchSize());
serviceMap.put(startDto.getId(), executorService);
createMqttJob = new LinkedBlockingQueue<>(startDto.getBatchSize());
if (KQueue.isAvailable()) {
eventLoopGroup = new KQueueEventLoopGroup(threadSize);
channelClass = KQueueSocketChannel.class;
} else if (Epoll.isAvailable()) {
eventLoopGroup = new EpollEventLoopGroup(threadSize);
channelClass = EpollSocketChannel.class;
} else {
eventLoopGroup = new NioEventLoopGroup(threadSize);
channelClass = NioSocketChannel.class;
}
}
@Override
public R stop(XiotSimMessage simulator) {
boolean remove = this.remove(simulator);
if (!remove) {
return new R<>(R.FAIL, "停止失败", null);
}
return new R<>(R.SUCCESS, "停止成功", null);
}
@Override
public R start() throws Exception {
String[] binds = startDto.getBinds().split(",");
System.out.println("************" + binds);
init(startDto);
//String s = new String(Files.readAllBytes(Paths.get(scriptFile)));
String s = startDto.getContent();
DynamicScriptEngine js = DynamicScriptEngineFactory.getEngine("js");
js.compile("handler", s);
HashMap<String, Object> context = Maps.newHashMap();
context.put("simulator", this);
context.put("logger", LoggerFactory.getLogger("message.handler"));
js.execute("handler", context).getIfSuccess();
int end = startDto.getStart() + startDto.getTotal();
AtomicInteger started = new AtomicInteger();//用于诸如原子递增计数器之类的应用程序
int queueSizePeekSize = Math.
没有合适的资源?快使用搜索试试~ 我知道了~
Mqtt设备模拟器(RPC)
共115个文件
class:44个
java:42个
xml:9个
需积分: 13 2 下载量 58 浏览量
2023-03-16
10:26:11
上传
评论
收藏 92.83MB ZIP 举报
温馨提示
采用自定义js脚本来实现设备模拟。其中包括属性上报模拟还有事件上报模拟。用定时器来进行定时上报,向mqtt broker中发送模拟消息,以达到设备模拟的效果,可通过kafka来监听mqtt的消息来进行储存,来在系统中进行分析转换。有新建模拟器、修改模拟器、删除模拟器、查询模拟器,修改模拟器的脚本。
资源推荐
资源详情
资源评论
收起资源包目录
Mqtt设备模拟器(RPC) (115个子文件)
MqttSimulator.class 25KB
SimulatorDto.class 15KB
XiotSimMessage.class 13KB
StartDto.class 13KB
XiotSimMessageServiceImpl.class 13KB
StartDto.class 13KB
XiotSimMessageVo.class 12KB
MqttController.class 9KB
XiotSimFunction.class 8KB
FunctionDto.class 8KB
XiotSimFunctionVo.class 7KB
MyBatisPlusTenantSqlParser.class 5KB
ScheduledTask.class 4KB
SimulatorBaseController.class 4KB
MqttSimulator$ClientSession.class 3KB
MqttSimulator$1.class 3KB
ContextInterceptor.class 3KB
XiotSimMessageController.class 3KB
AllVo.class 3KB
XiotSimMessageMapper.class 3KB
MyBatisPlusTenantHandler.class 3KB
BasePageDto.class 2KB
SimulatorErrorCodeEnum.class 2KB
MQTTAuth.class 2KB
BizContext.class 2KB
MybatisPlusConfig.class 2KB
SimulatorManageDefeat.class 2KB
RunStatusEnum.class 2KB
NetworkAddress.class 2KB
ContextWebMvcConfigurer.class 2KB
MultiClientConfig.class 2KB
PortEntity.class 2KB
SimulatorManageApi.class 2KB
XiotSimFunctionMapper.class 2KB
XiotSimulatorApplication.class 1KB
SimulatorUseDefeat.class 1KB
SimulatorUseApi.class 1KB
SimulatorPageDto.class 993B
XiotSimFunctionServiceImpl.class 928B
XiotSimMessageService.class 668B
XiotSimFunctionController.class 581B
MessageHandler.class 517B
SimulatorUserService.class 419B
XiotSimFunctionService.class 397B
xiot-simulator-srv.iml 26KB
xiot-simulator-api.iml 18KB
xiot-simulator.iml 14KB
xiot-simulator-srv-1.0-SNAPSHOT.jar 102.11MB
xiot-simulator-api-1.0-SNAPSHOT.jar 36KB
MqttSimulator.java 19KB
XiotSimMessageServiceImpl.java 9KB
MqttController.java 7KB
SimulatorDto.java 3KB
XiotSimMessageController.java 3KB
XiotSimMessageMapper.java 3KB
MyBatisPlusTenantSqlParser.java 3KB
XiotSimMessage.java 2KB
StartDto.java 2KB
StartDto.java 2KB
XiotSimFunction.java 2KB
ContextInterceptor.java 2KB
FunctionDto.java 2KB
SimulatorBaseController.java 2KB
ScheduledTask.java 2KB
XiotSimMessageVo.java 2KB
SimulatorManageApi.java 2KB
NetworkAddress.java 2KB
BizContext.java 2KB
MybatisPlusConfig.java 1KB
MyBatisPlusTenantHandler.java 1KB
MultiClientConfig.java 1KB
XiotSimFunctionMapper.java 1KB
XiotSimFunctionVo.java 1KB
SimulatorManageDefeat.java 1KB
SimulatorErrorCodeEnum.java 1002B
SimulatorUseApi.java 990B
ContextWebMvcConfigurer.java 958B
XiotSimulatorApplication.java 934B
RunStatusEnum.java 929B
XiotSimFunctionController.java 764B
MQTTAuth.java 705B
XiotSimFunctionServiceImpl.java 691B
XiotSimMessageService.java 686B
SimulatorUseDefeat.java 627B
BasePageDto.java 587B
XiotSimFunctionService.java 412B
AllVo.java 343B
MessageHandler.java 324B
SimulatorUserService.java 296B
SimulatorPageDto.java 273B
PortEntity.java 174B
inputFiles.lst 4KB
inputFiles.lst 2KB
createdFiles.lst 2KB
createdFiles.lst 1KB
xiot-simulator-srv-1.0-SNAPSHOT.jar.original 64KB
pom.properties 73B
pom.properties 73B
pom.xml 6KB
logback.xml 4KB
共 115 条
- 1
- 2
资源评论
皖江一支花
- 粉丝: 1
- 资源: 2
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- SVID_20240523_141155_1.mp4
- Code for the complete guide to tkinter tutorial
- 关于百货中心供应链管理系统.zip
- SimpleFolderIcon-master 修改Unity的Project下的文件夹图标
- A python Tkinter widget to display tile based maps
- A pure Python library for adding tables to a Tkinter application
- Vector资源文件.zip
- MobaXterm-Installer
- MicroMsg.xlsx
- 88-520告白(520气球).zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功