package com.mqtt.util;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import com.mqtt.data.MessageMqtt;
import com.mqtt.data.MqttDevice;
import com.mqtt.vm.MqttClientManager;
/**
*
* @author wangyankai
*
*/
public class MqttUtils {
private static Logger logger = Logger.getLogger(MqttUtils.class);
/**
* 启动需要连接或订阅的客户端
*
* @param deviceGroupList 设备信息集合
* @return MqttClientManager集合
*/
public static List<MqttClientManager> mainSubscribeStartByList(List<MqttDevice> deviceGroupList) {
if (deviceGroupList.size() < 1) {
logger.info(">>>>>>>>>>>>>>>>>>>>Config deviceGroupList error!!!!!!!");
return null;
}
logger.info(">>>>>>>>>>>>>>>>>>>>Config deviceGroupList read success!!!!!!!");
List<MqttClientManager> clientGroup = new ArrayList<MqttClientManager>();
for (int i = 0; i < deviceGroupList.size(); i++) {
MqttClientManager mqttClientManager = mainSubscribeStart(deviceGroupList.get(i));
clientGroup.add(mqttClientManager);
}
return clientGroup;
}
/**
* 启动需要连接或订阅的客户端
*
* @param deviceGroupList 设备信息集合
* @return MqttClientManager集合
*/
public static MqttClientManager mainSubscribeStart(MqttDevice mqttDevice) {
if (mqttDevice == null) {
logger.info(">>>>>>>>>>>>>>>>>>>>Config mqttDevice error!!!!!!!");
return null;
}
logger.info(">>>>>>>>>>>>>>>>>>>>Config mqttDevice read success!!!!!!!");
MqttClientManager mqttClientManager = new MqttClientManager(mqttDevice);
// 与服务器建立连接
boolean connectResult = mqttClientManager.deviceConnect(mqttDevice);
if(connectResult){
// 订阅消息
mqttClientManager.deviceSubscribe( mqttDevice.getTopicArray(),
mqttDevice.getQosArray());
}
logger.info(">>>>>>>>>>>>>>>>>>>>【" + mqttDevice.getClientName() + "】"
+ mqttDevice.getClientId() + " init success!!!!!!!");
return mqttClientManager;
}
/**
* 主启动方法,用于启动需要推送消息的客户端
*
* @param mqttDevice 单个Mqtt设备对象
* @param topic topic名称
* @param messageMqttList 要发送的消息列表
*/
public static void mainPublishStart(MqttDevice mqttDevice,String topic,List<MessageMqtt> messageMqttList){
MqttClientManager mqttClientManager = new MqttClientManager(mqttDevice);
// 与服务器建立连接
boolean connectResult = mqttClientManager.deviceConnect(mqttDevice);
if(connectResult){
// 推送消息
MqttTopic mqttTopic = mqttClientManager.deviceGetMqttTopicFromMap(topic);
int publishSucNum=0;
for(int i=0;i<messageMqttList.size();i++){
MessageMqtt messageMqtt = messageMqttList.get(i);
MqttMessage mqttMessage=MqttUtils.buildMqttMessage(messageMqtt.getMessage() ,
messageMqtt.getQos() ,
messageMqtt.isRetained());
boolean publishResult = mqttClientManager.devicePublish(mqttTopic, mqttMessage);
if(publishResult){
publishSucNum++;
logger.info(">>>>>>>>>>>>>>>>>>>>【" + mqttDevice.getClientId()
+ "】"
+ topic + "推送成功,消息推送成功总数:"
+ publishSucNum);
}
}
logger.info(">>>>>>>>>>>>>>>>>>>>【" + mqttDevice.getClientId()
+ "】推送消息总数:" + messageMqttList.size()
+ ",消息推送成功总数:" + publishSucNum);
mqttClientManager.deviceDisconnect(10000);
}
}
/**
* 获得配置文件中用,分割的配置参数,并返回String数组
* 如propName=aaa,bbb,ccc,ddd
*
* @param propFileName 配置文件名称
* @param propName 配置参数名
* @return String[]
*/
public static String[] getClientGroupArray(String propFileName, String propName){
String clientNumber=getStringConfigFromProp( propFileName, propName);
String[] clientNumberArray=clientNumber.split(",");
return clientNumberArray;
}
/**
* 获取配置文件中的某个参数,并转换为数字类型
*
* @param propFileName 配置文件名称
* @param propName 配置参数名
* @return int
*/
public static int getIntConfigFromProp(String propFileName, String propName) {
String propValue = getStringConfigFromProp(propFileName, propName);
if (isEmptyString(propValue)) {
return 0;
}
return Integer.parseInt(propValue);
}
/**
* 获取配置文件中的某个参数
*
* @param propFileName 配置文件名称
* @param propName 配置参数名
* @return
*/
public static String getStringConfigFromProp(String propFileName, String propName) {
String propValue = PropertyLoader.getPropertyValue(propFileName, propName);
if (!isEmptyString(propValue)) {
propValue = propValue.trim();
}
return propValue;
}
/**
* 生成随机数,包含字母和数字
*
* @param length 需要的长度
* @return
*/
public static String getRandomStringOnlyCharNumber(int length) {
String val = "";
Random random = new Random();
for (int i = 0; i < length; i++) {
String str = random.nextInt(2) % 2 == 0 ? "num" : "char";
if ("char".equalsIgnoreCase(str)) { // 产生字母
int nextInt = random.nextInt(2) % 2 == 0 ? 65 : 97;
// System.out.println(nextInt + "!!!!"); 1,0,1,1,1,0,0
val += (char) (nextInt + random.nextInt(26));
} else if ("num".equalsIgnoreCase(str)) { // 产生数字
val += String.valueOf(random.nextInt(10));
}
}
return val;
}
/**
* 生成随机数,仅包含字母
*
* @param length 需要的长度
* @return
*/
public static String getRandomStringOnlyChar(int length) {
String base = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
Random random = new Random();
StringBuffer sb = new StringBuffer();
for (int i = 0; i < length; i++) {
int number = random.nextInt(base.length());
sb.append(base.charAt(number));
}
return sb.toString();
}
/**
* 构建Mqtt消息
* @param message 消息内容
* @param qos QOS服务等级
* @param retained 保留消息
* @return MqttMessage对象
*/
public static MqttMessage buildMqttMessage(String message , int qos , boolean retained){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.getBytes());
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
return mqttMessage;
}
public static boolean isEmptyString(String s) {
if (s == null || "".equals(s.trim())) {
return true;
} else {
return false;
}
}
}
评论5
最新资源