package com.ypp.mqtt.sdk.service.subscribe.impl;
import com.ypp.mqtt.sdk.exception.MqttSubscribeException;
import com.ypp.mqtt.sdk.service.AbstractMqttClient;
import com.ypp.mqtt.sdk.service.subscribe.MqttMessageCallback;
import com.ypp.mqtt.sdk.service.subscribe.MqttSubscribe;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* MQTT订阅消息客户端
*
* @author ypp
* @version 1.0.0
* @date 2020-04-19 19:41
**/
public class MqttSubscribeClient extends AbstractMqttClient implements MqttSubscribe {
private static Logger logger = LoggerFactory.getLogger(MqttSubscribeClient.class);
private MqttConnectOptions mqttConnectOptions;
@Override
public void subscribe(String topic, int qos, MqttMessageCallback mqttMessageCallback) throws MqttSubscribeException {
MqttClient mqttClient = super.getMqttClient();
this.buildCallback(mqttClient, new String[]{topic}, new int[]{qos}, mqttMessageCallback);
try {
mqttClient.connect(this.mqttConnectOptions);
System.out.println("成功建立连接");
} catch (MqttException e) {
logger.error("MQTT连接发生异常", e);
throw new MqttSubscribeException("MQTT连接发生异常", e);
}
}
@Override
public void subscribe(String[] topic, int[] qos, MqttMessageCallback mqttMessageCallback) throws MqttSubscribeException {
MqttClient mqttClient = super.getMqttClient();
this.buildCallback(mqttClient, topic, qos, mqttMessageCallback);
try {
mqttClient.connect(this.mqttConnectOptions);
} catch (MqttException e) {
logger.error("MQTT连接发生异常", e);
throw new MqttSubscribeException("MQTT连接发生异常", e);
}
}
private void buildCallback(final MqttClient mqttClient, final String[] topic, final int[] qos, final MqttMessageCallback mqttMessageCallback) {
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean b, String s) {
System.out.println("连接成功");
logger.debug("MQTT连接成功");
try {
mqttClient.subscribe(topic, qos);
System.out.println("订阅成功");
} catch (MqttException e) {
e.printStackTrace();
logger.error("订阅"+ topic +"出现异常", e);
}
}
@Override
public void connectionLost(Throwable throwable) {
System.out.println("连接断开");
logger.error("MQTT消费者断开连接", throwable);
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
logger.debug("收到主题:" + s + ",发送来的消息:" + new String(mqttMessage.getPayload()));
mqttMessageCallback.messageArrived(s, mqttMessage);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("delivery complete");
logger.debug("delivery complete" + iMqttDeliveryToken);
}
});
}
@Override
public MqttClient buildMqttClientConnection(MqttClient mqttClient, MqttConnectOptions mqttConnectOptions) throws MqttException {
this.mqttConnectOptions = mqttConnectOptions;
return mqttClient;
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
基于org.eclipse.paho.client.mqttv3实现的MQTT发布、订阅客户端 (1)订阅接口(MqttSubscribe) void subscribe(String topic, int qos, MqttMessageCallback mqttMessageCallback) throws MqttSubscribeException; void subscribe(String[] topic, int[] qos, MqttMessageCallback mqttMessageCallback) throws MqttSubscribeException; (2)发布接口(MqttPublish) void publish(String topic, int qos, String message) throws MqttPublishException; void publish(String topic, int qos, byte[] message) throws MqttPublishException;
资源推荐
资源详情
资源评论
收起资源包目录
基于paho的MQTT Java SDK.zip (11个子文件)
pom.xml 837B
src
main
java
com
ypp
mqtt
sdk
service
AbstractMqttClient.java 3KB
publish
MqttPublish.java 802B
impl
MqttPublishClient.java 2KB
subscribe
MqttMessageCallback.java 457B
MqttSubscribe.java 916B
impl
MqttSubscribeClient.java 4KB
model
MqttProperty.java 1KB
exception
MqttClientException.java 744B
MqttSubscribeException.java 760B
MqttPublishException.java 748B
共 11 条
- 1
资源评论
来了就走下去
- 粉丝: 111
- 资源: 19
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功