package com.sunjie.mqtt;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import com.sunjie.application.WebApplicationContext;
public class SendMQTT {
private static Logger logger = LoggerFactory.getLogger(SendMQTT.class);
// tcp://MQTT安装的服务器地址:MQTT定义的端口号
// public static final String HOST = "tcp://10.58.100.35:61613";
// 定义一个主题
// public static final String TOPIC = "topic_1";
// 定义MQTT的ID,可以在MQTT服务配置中指定
// private static final String clientid = "publisher_1";
private static MqttConfig mqttConfig;
static{
ApplicationContext a = WebApplicationContext.getApplicationContext();
mqttConfig = WebApplicationContext.getBean(MqttConfig.class);
}
private static MqttClient client;
private static MqttTopic topic11;
// private static MqttMessage message;
/**
* 构造函数
*
* @throws MqttException
*/
public static void connect(MqttConfig mqttConfig){
try {
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
if(client == null){
synchronized (MqttClient.class) {
if(client == null){
client = new MqttClient(mqttConfig.getHost(), mqttConfig.getSendId(), new MemoryPersistence());
client.setCallback(new PushCallback());
}
}
}
MqttConnectOptions options = getOption();
client.connect(options);
logger.info("成功链接服务器。。。");
topic11 = client.getTopic(mqttConfig.getSendTopic());
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
*
* @param topic
* @param message
* @throws MqttPersistenceException
* @throws MqttException
*/
public static void publish(String body)
throws MqttPersistenceException, MqttException {
MqttMessage message = new MqttMessage();
message.setQos(1);
message.setRetained(true);
message.setPayload(body.getBytes());
MqttDeliveryToken token = topic11.publish(message);
token.waitForCompletion();
System.out.println("message is published completely! "
+ token.isComplete());
}
public static void reconnect(){
try {
client.reconnect();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static MqttConnectOptions getOption(){
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(mqttConfig.getUserName());
options.setPassword(mqttConfig.getPassWord().toCharArray());
// 设置超时时间
options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
return options;
}
}