package org.springblade.reservoir.data.acess.gnss.mqtt;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.cache.utils.CacheUtil;
import org.springblade.core.tool.constant.BladeConstant;
import org.springblade.core.tool.utils.DateUtil;
import org.springblade.core.tool.utils.Func;
import org.springblade.reservior.biz.entity.GnssData;
import org.springblade.reservoir.data.acess.gnss.entity.GnssRawData;
import org.springblade.reservoir.data.acess.gnss.mapper.GnssDataMapper;
import org.springblade.reservoir.data.acess.gnss.mapper.GnssRawDataMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
import java.time.LocalDateTime;
import java.util.Base64;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* @author: hejingping
* @Description: 回调类
*/
@Component
public class MqttCallback implements MqttCallbackExtended {
@Value("${mqtt.host}")
String host;
@Value("${mqtt.username}")
String username;
@Value("${mqtt.password}")
String password;
@Value("${mqtt.clientId}")
String clientId;
private static final String PARANUM_Q = "11";
private static final String PARANUM_PZTBWL = "12";
private static final String PARANUM_X = "13";
private static final String PARANUM_Y = "14";
private static final String PARANUM_Z = "15";
private static final Logger log = LoggerFactory.getLogger(MqttCallback.class);
private MqttClient mqttClient;
public MqttCallback(MqttClient mqttClient) {
this.mqttClient = mqttClient;
}
@Resource
private GnssRawDataMapper gnssRawDataMapper;
@Resource
private GnssDataMapper gnssDataMapper;
public static MqttCallback mqttCallback;
//通过@PostConstruct实现初始化bean之前进行的操作
@PostConstruct
public void init() {
mqttCallback = this;
mqttCallback.gnssRawDataMapper = this.gnssRawDataMapper;
mqttCallback.gnssDataMapper = this.gnssDataMapper;
}
/**
* 丢失连接,可在这里做重连
* 只会调用一次
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
log.error("mqtt connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage());
long reconnectTimes = 1;
while (true) {
try {
if (MqttClient.getClient().isConnected()) {
//判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择
log.warn("mqtt reconnect success end 重新连接 重新订阅成功");
return;
}
reconnectTimes+=1;
log.warn("mqtt reconnect times = {} try again... mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
MqttClient.getClient().reconnect();
} catch (MqttException e) {
log.error("mqtt断连异常", e);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
}
}
}
/**
* @param topic
* @param mqttMessage
* @throws Exception
* subscribe后得到的消息会执行到这里面
* 订阅者收到消息之后执行
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println("客户端接收到消息之后调用");
insertGnssRawData(mqttMessage);
log.info("接收消息主题 : {},接收消息内容 : {}", topic, mqttMessage);
}
private void insertGnssRawData(MqttMessage mqttMessage) throws JsonProcessingException {
JSONObject jsonObject = JSONObject.parseObject(mqttMessage.toString());
String payload = decode(jsonObject.getString("payload"));
String mess = payload.substring(payload.indexOf("{"));
JSONObject messObject = JSONObject.parseObject(mess);
String stcd = messObject.keySet().iterator().next();
GnssRawData gnssRawData = new GnssRawData();
gnssRawData.setId(IdWorker.getId());
gnssRawData.setTenantId(BladeConstant.ADMIN_TENANT_ID);
gnssRawData.setMessage(mess);
gnssRawData.setMqttMessage(jsonObject.toString());
gnssRawData.setStcd(stcd);
gnssRawData.setCreateTime(DateUtil.now());
gnssRawData.setIsDeleted(BladeConstant.DB_NOT_DELETED);
mqttCallback.gnssRawDataMapper.insert(gnssRawData);
insertGnssData(jsonObject.getString("messageId"), gnssRawData.getCreateTime(), mess);
}
private void insertGnssData(String cacheValue,Date now,String mess) throws JsonProcessingException {
Object value = CacheUtil.get("id", "type-", "messageId");
if(Func.isNotEmpty(value)){
if(!value.toString().equals(cacheValue)){
CacheUtil.clear("id");
insertData(now, cacheValue,mess);
}
}else {
insertData(now, cacheValue,mess);
}
}
private void insertData(Date now, String cacheValue, String mess) {
Gson gson = new Gson();
GnssData gnssData = new GnssData();
Map<String, Map<String, Map<String, List<Map<String, String>>>>> messMap = gson.fromJson(mess, Map.class);
String dvcd = messMap.keySet().iterator().next();
Map<String, Map<String, List<Map<String, String>>>> valueMap1 = messMap.get(dvcd);
String time = valueMap1.keySet().iterator().next();
Map<String, List<Map<String, String>>> valueMap2 = valueMap1.get(time);
String mpcd = valueMap2.keySet().iterator().next();
List<Map<String, String>> valuesList = valueMap2.get(mpcd);
for (Map<String, String> map : valuesList) {
String paranum = map.get("paranum");
String value = map.get("value");
switch (paranum) {
case PARANUM_Q:
gnssData.setQ(value);
break;
case PARANUM_PZTBWL:
gnssData.setPztbwl(value);
break;
case PARANUM_X:
gnssData.setX(value);
break;
case PARANUM_Y:
gnssData.setY(value);
break;
case PARANUM_Z:
gnssData.setZ(value);
break;
default:
break;
}
}
gnssData.setId(IdWorker.getId());
gnssData.setTenantId(BladeConstant.ADMIN_TENANT_ID);
gnssData.setDvcd(dvcd);
gnssData.setMpcd(mpcd);
gnssData.setMstm(LocalDateTime.now());
gnssData.setCreateTime(now);
gnssData.setIsDeleted(BladeConstant.DB_NOT_DELETED);
mqttCallback.gnssDataMapper.insert(gnssData);
CacheUtil.put("id", "type-", "messageId", cacheValue);
}
public static String decode(String base64Str) {
// 解码后的字符串
String str = "";
// 解码
byte[] base64Data = Base64.getDecoder().decode(base64Str);
try {
// byte[]-->String
str = new String(base64Data, "utf-8");
} catch (UnsupportedEncodingException
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
springboot-mqtt.zip (42个子文件)
reservoir-data-access-gnss
pom.xml 4KB
src
main
resources
application-test.yml 233B
application-dev.yml 451B
application-prod.yml 231B
java
org
springblade
reservoir
data
acess
gnss
mqtt
MqttClient.java 4KB
MqttCallback.java 11KB
mapper
GnssRawDataMapper.java 1KB
GnssDataMapper.java 1KB
GnssRawDataMapper.xml 1KB
GnssDataMapper.xml 1KB
ReservoirDataAccessGnssApplication.java 1KB
controller
MqttController.java 2KB
SubscribeMultipleTopics.java 3KB
service
IGnssDataService.java 1KB
IGnssRawDataService.java 1KB
impl
GnssRawDataServiceImpl.java 2KB
GnssDataServiceImpl.java 2KB
entity
GnssData.java 3KB
GnssRawData.java 2KB
MqttMsg.java 543B
config
MqttConfiguration.java 2KB
target
classes
application-test.yml 233B
application-dev.yml 451B
application-prod.yml 231B
org
springblade
reservoir
data
acess
gnss
mqtt
MqttCallback.class 12KB
MqttClient.class 5KB
mapper
GnssDataMapper.class 352B
GnssRawDataMapper.class 373B
GnssRawDataMapper.xml 1KB
GnssDataMapper.xml 1KB
ReservoirDataAccessGnssApplication.class 889B
controller
SubscribeMultipleTopics.class 5KB
MqttController.class 2KB
service
IGnssDataService.class 343B
IGnssRawDataService.class 364B
impl
GnssRawDataServiceImpl.class 1KB
GnssDataServiceImpl.class 1022B
entity
MqttMsg.class 2KB
GnssRawData.class 3KB
config
MqttConfiguration.class 2KB
generated-sources
annotations
Dockerfile 338B
reservoir-data-access-gnss.iml 28KB
共 42 条
- 1
资源评论
hejingping
- 粉丝: 0
- 资源: 2
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- JavaScript《基于自动分析数据并给出营业建议的餐厅管理系统(接入AI) 》+源代码+项目说明及资料
- 355670834783295707ad04e-427f-4cde-9589-e578224a8459.zip
- 动态sql解析引擎,类似mybatis动态sql的功能
- EDA365-Skill-V2.5安装包,支持Allegro17.x版本
- C# 常用单词汇总,常用单词汇总
- 【ERP标准流程-标准流程-库内业务管理】(DOC 14页).doc
- Python《数据库期末作业-餐厅点单系统 》+源代码+设计资料
- 学生成绩管理系统(C++课程设计
- 双指针法判断链表有环-go语言实现
- MyBatis动态SQL是一种强大的特性,它允许我们在SQL语句中根据条件动态地添加或删除某些部分,从而实现更加灵活和高效的数据
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功