package mqtt.receive;
import lombok.extern.slf4j.Slf4j;
import mqtt.mqttssss.MqttProperties;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @param null:
* @return null
* @author axian
* @description TODO
* @date 15:27
*/
@Component
@Slf4j
public class MqttAcceptCallback implements MqttCallbackExtended {
private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class);
@Autowired
private MqttAcceptClient mqttAcceptClient;
@Autowired
private MqttProperties mqttProperties;
/**
* 客户端断开后触发
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
logger.info("连接断开,可以重连");
if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
logger.info("【emqx重新连接】....................................................");
mqttAcceptClient.reconnection();
}
}
/**
* 客户端收到消息触发
*
* @param topic 主题
* @param mqttMessage 消息
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
//接收消息
logger.info("【接收的载体消息】:" + mqttMessage.getPayload());
logger.info("【接收消息主题】:" + topic);
logger.info("【接收消息Qos】:" + mqttMessage.getQos());
// //将字节数组解码为字符串
// String jsonString = new String(mqttMessage.getPayload());
// //使用JSON解析器解析字符串
// ObjectMapper objectMapper = new ObjectMapper();
// JsonNode jsonNode = objectMapper.readTree(jsonString);
//// CsiSmartMeter csiSmartMeter = (CsiSmartMeter) ObjectByteExchangeUtils.byteArrayToObject(mqttMessage.getPayload());
// logger.info("【接收消息内容】:" + jsonNode);
// //获取到key,时间戳
// JsonNode data = jsonNode.get("data");
// long time = data.get(0).get("tp").asLong();
// JsonNode point = data.get(0).get("point");
// logger.info("【当前时间戳】:" + time);
// if(oConvertUtils.isNotEmpty(map.get(time))) {
// JSONObject object = map.get(time);
// logger.info("从map中获取一半的对象是" + object);
// JSONObject fullObject = getFullObject(object, point);
// logger.info("拼接好的完整的对象是" + fullObject + ",此时map的长度" + map.size());
// //构造待存储的csiSmartMeter对象
// CsiSmartMeter csiSmartMeter = generateSmartMeter(fullObject, time);
// logger.info("待存储的智能电表对象是" + csiSmartMeter);
// csiSmartMeterService.save(csiSmartMeter);
// map.remove(time);
// logger.info("执行清除操作后,此时map的长度是" + map.size());
// }else {
// JSONObject halfObject = getHalfObject(point);
// map.put(time,halfObject);
// logger.info("拼接一半的对象是" + halfObject);
// }
}
/**
* 发布消息成功
*
* @param token token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
String[] topics = token.getTopics();
for (String topic : topics) {
logger.info("向主题【" + topic + "】发送消息成功!");
}
try {
MqttMessage message = token.getMessage();
byte[] payload = message.getPayload();
String s = new String(payload, "UTF-8");
logger.info("【接收回调消息内容】:" + s);
} catch (Exception e) {
logger.error("MqttAcceptCallback deliveryComplete error,message:{}", e.getMessage());
e.printStackTrace();
}
}
/**
* 连接emq服务器后触发
*
* @param b
* @param s
*/
@Override
public void connectComplete(boolean b, String s) {
log.info("============================= 客户端【" + MqttAcceptClient.client.getClientId() + "】连接成功!=============================");
// 以/#结尾表示订阅所有以test开头的主题
// 订阅所有机构主题
mqttAcceptClient.subscribe(mqttProperties.getDefaultTopic(), 1);
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
mymqtt.zip (89个子文件)
mymqtt.iml 81B
pom.xml 4KB
src
test
java
main
resources
application-dev.yml 346B
application-prod.yml 346B
application-prerelease.yml 346B
application.yml 90B
java
mqtt
controller
MqttController.java 1KB
utils
ObjectByteExchangeUtils.java 2KB
MyMqttApplication.java 410B
send
MqttSendClient.java 4KB
MqttSendCallBack.java 2KB
receive
MqttAcceptClient.java 3KB
MqttAcceptCallback.java 5KB
config
MqttCondition.java 959B
MqttConfig.java 662B
mqttssss
MqttProperties.java 2KB
.idea
jarRepositories.xml 880B
mymqtt.iml 6KB
libraries
Maven__org_projectlombok_lombok_1_18_18.xml 531B
Maven__org_springframework_integration_spring_integration_mqtt_5_3_8_RELEASE.xml 748B
Maven__org_springframework_spring_beans_5_2_15_RELEASE.xml 630B
Maven__com_fasterxml_jackson_datatype_jackson_datatype_jdk8_2_11_4.xml 681B
Maven__org_springframework_spring_jcl_5_2_15_RELEASE.xml 616B
Maven__org_reactivestreams_reactive_streams_1_0_3.xml 595B
Maven__org_springframework_spring_messaging_5_2_15_RELEASE.xml 658B
Maven__org_springframework_spring_tx_5_2_15_RELEASE.xml 609B
Maven__ch_qos_logback_logback_core_1_2_3.xml 547B
Maven__org_springframework_spring_core_5_2_15_RELEASE.xml 623B
Maven__com_fasterxml_jackson_module_jackson_module_parameter_names_2_11_4.xml 736B
Maven__org_slf4j_jul_to_slf4j_1_7_30.xml 534B
Maven__org_springframework_integration_spring_integration_core_5_3_8_RELEASE.xml 748B
Maven__org_aspectj_aspectjweaver_1_9_6.xml 542B
Maven__org_springframework_boot_spring_boot_starter_tomcat_2_3_12_RELEASE.xml 748B
Maven__org_apache_logging_log4j_log4j_api_2_13_3.xml 573B
Maven__org_slf4j_slf4j_api_1_7_30.xml 513B
Maven__org_eclipse_paho_org_eclipse_paho_client_mqttv3_1_2_4.xml 681B
Maven__com_fasterxml_jackson_core_jackson_annotations_2_11_4.xml 651B
Maven__org_springframework_spring_aop_5_2_15_RELEASE.xml 616B
Maven__org_springframework_spring_expression_5_2_15_RELEASE.xml 665B
Maven__org_springframework_boot_spring_boot_starter_integration_2_3_12_RELEASE.xml 783B
Maven__ch_qos_logback_logback_classic_1_2_3.xml 568B
Maven__org_springframework_integration_spring_integration_stream_5_3_8_RELEASE.xml 762B
Maven__com_fasterxml_jackson_datatype_jackson_datatype_jsr310_2_11_4.xml 695B
Maven__jakarta_annotation_jakarta_annotation_api_1_3_5.xml 633B
Maven__org_springframework_boot_spring_boot_starter_2_6_10.xml 643B
Maven__org_springframework_boot_spring_boot_starter_json_2_3_12_RELEASE.xml 734B
Maven__org_springframework_boot_spring_boot_starter_aop_2_3_12_RELEASE.xml 727B
Maven__org_yaml_snakeyaml_1_26.xml 495B
Maven__org_apache_tomcat_embed_tomcat_embed_websocket_9_0_46.xml 660B
Maven__org_glassfish_jakarta_el_3_0_3.xml 529B
Maven__org_springframework_boot_spring_boot_autoconfigure_2_3_12_RELEASE.xml 741B
Maven__com_fasterxml_jackson_core_jackson_databind_2_11_4.xml 630B
Maven__org_springframework_spring_webmvc_5_2_15_RELEASE.xml 637B
Maven__org_springframework_boot_spring_boot_starter_logging_2_3_12_RELEASE.xml 755B
Maven__org_springframework_retry_spring_retry_1_2_5_RELEASE.xml 647B
Maven__org_springframework_spring_web_5_2_15_RELEASE.xml 616B
Maven__com_fasterxml_jackson_core_jackson_core_2_11_4.xml 602B
Maven__io_projectreactor_reactor_core_3_3_17_RELEASE.xml 622B
Maven__org_springframework_boot_spring_boot_2_3_12_RELEASE.xml 643B
Maven__org_springframework_boot_spring_boot_starter_web_2_7_10.xml 671B
Maven__org_apache_logging_log4j_log4j_to_slf4j_2_13_3.xml 608B
Maven__org_apache_tomcat_embed_tomcat_embed_core_9_0_46.xml 625B
Maven__org_springframework_spring_context_5_2_15_RELEASE.xml 644B
workspace.xml 10KB
misc.xml 512B
inspectionProfiles
Project_Default.xml 1KB
compiler.xml 873B
modules.xml 271B
encodings.xml 191B
target
classes
mqtt
MyMqttApplication.class 782B
controller
MqttController.class 610B
utils
ObjectByteExchangeUtils.class 1KB
send
MqttSendCallBack.class 3KB
MqttSendClient.class 4KB
receive
MqttAcceptCallback.class 4KB
MqttAcceptClient.class 4KB
config
MqttCondition.class 2KB
MqttConfig.class 866B
mqttssss
MqttProperties.class 7KB
application-dev.yml 346B
application-prod.yml 346B
application-prerelease.yml 346B
application.yml 79B
test-classes
maven-status
maven-compiler-plugin
compile
default-compile
createdFiles.lst 341B
inputFiles.lst 641B
testCompile
default-testCompile
createdFiles.lst 0B
inputFiles.lst 0B
mymqtt-1.0-SNAPSHOT.jar 18KB
maven-archiver
pom.properties 62B
generated-test-sources
test-annotations
generated-sources
annotations
共 89 条
- 1
资源评论
axian西一言仙
- 粉丝: 7
- 资源: 3
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功