package org.example;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQTTUtils {
private MqttClient mqttClient;
//TODO 需要根据自己环境调整
/**
* mqtt服务器的地址和端口号
*/
private String serviceURI = "tcp://127.0.0.1:1883";
/**
* 客户端Id
*/
private final String clientId = "ARMT" + (int) (Math.random() * 1000000);
/**
* 客户端connect连接mqtt服务器
*
* @param user 用户名
* @param pwd 密码
* @param mqttCallback 回调函数 , 不为null
**/
public void mqttClientConnect(String user, String pwd, MqttCallback mqttCallback) throws MqttException {
MqttConnectOptions options = mqttConnectOptions(user, pwd);
if (mqttCallback == null) {
throw new RuntimeException("MqttCallback is null");
}
mqttClient.setCallback(mqttCallback);
mqttClient.connect(options);
}
/**
* MQTT设置参数设置
*
* @param user 用户名
* @param pwd 密码
* @return MqttConnectOptions
*/
private MqttConnectOptions mqttConnectOptions(String user, String pwd) throws MqttException {
mqttClient = new MqttClient(serviceURI, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(user);
options.setPassword(pwd.toCharArray());
//设置超时时间,单位为秒
options.setConnectionTimeout(30);
//自动重连,也可以自定义自动重连
options.setAutomaticReconnect(true);//默认:false
//是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
//设置为true表示每次连接到服务端都是以新的身份
options.setCleanSession(false);
//设置心跳时间
options.setKeepAliveInterval(60);
return options;
}
/**
* 关闭MQTT连接
*/
public void close() throws MqttException {
mqttClient.close();
mqttClient.disconnect();
}
/**
* 往主题发布消息 默认qos==0 “至多一次”
*
* @param topic:发布的主题
* @param msg:发布的消息
*/
public void publish(String topic, String msg) throws MqttException {
publish(topic, msg, 0);
}
/**
* 往主题发布消息
*
* @param topic: 发布的主题
* @param msg: 发布的消息
* @param qos: 消息质量 Qos:0、“至多一次”,会发生消息丢失
* 1、“至少一次”,确保消息到达,但消息重复可能会发生
* 2、“只有一次”,确保消息到达一次
*/
public void publish(String topic, String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 订阅主题 ,默认的的Qos等级为:0
*
* @param topic 主题
*/
public void subscribe(String topic) throws MqttException {
subscribe(topic, 0);
}
/**
* 订阅某一个主题,可携带Qos
*
* @param topic 所要订阅的主题
* @param qos 消息质量:0、1、2
*/
public void subscribe(String topic, int qos) throws MqttException {
mqttClient.subscribe(topic, qos);
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
JavaMqtt.rar (12个子文件)
ubuntuJavaMqtt
pom.xml 931B
src
test
java
main
resources
java
org
example
MQTTUtils.java 4KB
Main.java 3KB
.idea
jarRepositories.xml 864B
workspace.xml 3KB
misc.xml 541B
compiler.xml 547B
.gitignore 184B
encodings.xml 267B
target
classes
org
example
MQTTUtils.class 4KB
Main$1.class 2KB
Main.class 3KB
generated-sources
annotations
共 12 条
- 1
资源评论
qaz96801
- 粉丝: 3
- 资源: 3
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功