import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.sql.Timestamp;
import java.util.Arrays;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
/**
* A sample application that demonstrates how to use the Paho MQTT v3.1 Client API in
* non-blocking callback/notification mode.
*
* It can be run from the command line in one of two modes:
* - as a publisher, sending a single message to a topic on the server
* - as a subscriber, listening for messages from the server
*
* There are three versions of the sample that implement the same features
* but do so using using different programming styles:
* <ol>
* <li>Sample which uses the API which blocks until the operation completes</li>
* <li>SampleAsyncWait shows how to use the asynchronous API with waiters that block until
* an action completes</li>
* <li>SampleAsyncCallBack (this one) shows how to use the asynchronous API where events are
* used to notify the application when an action completes<li>
* </ol>
*
* If the application is run with the -h parameter then info is displayed that
* describes all of the options / parameters.
*/
public class MqttAsyncCallBack implements MqttCallback {
int state = BEGIN;
static final int BEGIN = 0;
static final int CONNECTED = 1;
static final int PUBLISHED = 2;
static final int SUBSCRIBED = 3;
static final int DISCONNECTED = 4;
static final int FINISH = 5;
static final int ERROR = 6;
static final int DISCONNECT = 7;
// Private instance variables
MqttAsyncClient client;
String brokerUrl;
private boolean quietMode;
private MqttConnectOptions conOpt;
private boolean clean;
Throwable ex = null;
Object waiter = new Object();
boolean donext = false;
private String password;
private String userName;
public static boolean isSubDisconnect = false;
/**
* Constructs an instance of the sample client wrapper
* @param brokerUrl the url to connect to
* @param clientId the client id to connect with
* @param cleanSession clear state at end of connection or not (durable or non-durable subscriptions)
* @param quietMode whether debug should be printed to standard out
* @param userName the username to connect with
* @param password the password for the user
* @throws MqttException
*/
public MqttAsyncCallBack(String brokerUrl, String clientId,
boolean cleanSession, boolean quietMode, String userName,
String password) throws MqttException {
this.brokerUrl = brokerUrl;
this.quietMode = quietMode;
this.clean = cleanSession;
this.password = password;
this.userName = userName;
// This sample stores in a temporary directory... where messages
// temporarily
// stored until the message has been delivered to the server.
// ..a real application ought to store them somewhere
// where they are not likely to get deleted or tampered with
String tmpDir = System.getProperty("java.io.tmpdir");
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(
tmpDir);
try {
// Construct the object that contains connection parameters
// such as cleanSession and LWT
conOpt = new MqttConnectOptions();
conOpt.setCleanSession(clean);
if (password != null) {
conOpt.setPassword(this.password.toCharArray());
}
if (userName != null) {
conOpt.setUserName(this.userName);
}
// Construct the MqttClient instance
client = new MqttAsyncClient(this.brokerUrl, clientId, dataStore);
// Set this wrapper as the callback handler
client.setCallback(this);
} catch (MqttException e) {
//TODO log something
e.printStackTrace();
log("Unable to set up client: " + e.toString());
}
}
/**
* Publish / send a message to an MQTT server
* @param topicName the name of the topic to publish to
* @param qos the quality of service to delivery the message at (0,1,2)
* @param payload the set of bytes to send to the MQTT server
* @throws MqttException
*/
public void publish(String topicName, int qos, byte[] payload)
throws Throwable {
// Use a state machine to decide which step to do next. State change
// occurs
// when a notification is received that an MQTT action has completed
while (state != FINISH) {
switch (state) {
case BEGIN:
// Connect using a non-blocking connect
MqttConnector con = new MqttConnector();
con.doConnect();
break;
case CONNECTED:
// Publish using a non-blocking publisher
Publisher pub = new Publisher();
pub.doPublish(topicName, qos, payload);
break;
case PUBLISHED:
state = DISCONNECT;
donext = true;
break;
case DISCONNECT:
Disconnector disc = new Disconnector();
disc.doDisconnect();
break;
case ERROR:
throw ex;
case DISCONNECTED:
state = FINISH;
donext = true;
break;
}
// if (state != FINISH) {
// Wait until notified
/* 进入wait状态,直到各监听器回调中carryOn改变waiter和notifyAll才会继续下一循环
* 进入下一状态的处理。
*/
waitForStateChange(10000);
// }
}
}
/**
* Subscribe to a topic on an MQTT server
* Once subscribed this method waits for the messages to arrive from the server
* that match the subscription. It continues listening for messages until the enter key is
* pressed.
* @param topicName to subscribe to (can be wild carded)
* @param qos the maximum quality of service to receive messages at for this subscription
* @throws MqttException
*/
public void subscribe(String topicName, int qos) throws Throwable {
// Use a state machine to decide which step to do next. State change
// occurs
// when a notification is received that an MQTT action has completed
while (state != FINISH) {
switch (state) {
case BEGIN:
// Connect using a non-blocking connect
isSubDisconnect = false;
MqttConnector con = new MqttConnector();
con.doConnect();
break;
case CONNECTED:
// Subscribe using a non-blocking subscribe
Subscriber sub = new Subscriber();
sub.doSubscribe(topicName, qos);
break;
case SUBSCRIBED:
// Block until Enter is pressed allowing messages to arrive
log("Press <Enter> to exit");
try {
System.in.read();
} catch (IOException e) {
// If we can't read we'll just exit
}
state = DISCONNECT;
donext = true;
break;
case DISCONNECT:
Disconnector disc = new Disconnector();
disc.doDisconnect();
break;
case ERROR:
throw ex;
case DISCONNECTED:
state = FINISH;
donext = true;
break;
}
// if (state != FINISH && state != DISCONNECT) {
waitForStateChange(10000);
}
// }
}
/**
* Connect in a non-blocking way and then sit back and wait to be
* notified that the action has completed.
*/
public class MqttConnector {
public MqttConnector() {
}
public void doConnect() {
// Connect to the server
// Get a token and setup an asynchronous listener on the token which
// will be notified once the connect completes
log("Connecting to " + brokerUrl + " with client ID "
+ client.getClientId());
IMqttActionListener conListener = new IMqttActionListener() {
public void onSuccess(IMqttToken asyncActionToken) {
log("Connected");
state = CONNECTED;
carryOn();
}
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
ex = exception;
state = ERROR;
log("connect failed" + exception);
carryOn();
}
public void carryOn() {
synchroni