#include "mqttClient.h"
#include <string.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
void *pubClient(void *threadId){
for(int i=0;i<TOTAL_PUBLISH_COUNT;i++){
publish_topics[i].pubmsg.payload=publish_topics[i].payload;
publish_topics[i].pubmsg.payloadlen=strlen(publish_topics[i].payload);
publish_topics[i].pubmsg.qos=publish_topics[i].qos;
publish_topics[i].pubmsg.retained=publish_topics[i].retained;
}
while(1){
for(int i=0;i<TOTAL_PUBLISH_COUNT;i++){
MQTTClient_publishMessage(options.client, publish_topics[i].topic, &publish_topics[i].pubmsg, &publish_topics[i].token);
MQTTClient_waitForCompletion(options.client, publish_topics[i].token, 10000L);
if(publish_topics[i].token!=MQTTCLIENT_SUCCESS){
printf("publish faild:%d,topic name:%s",publish_topics[i].token,publish_topics[i].topic);
}else{
printf("publish success\n");
}
}
usleep(options.cycleTime);
}
}
void delivered(void *context, MQTTClient_deliveryToken dt){
deliveredtoken = dt;
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message){
printf("receive massage:");
//INT
__IB0=atoi(message->payload);
printf("%d\n", __IB0);
//
return 1;
}
int __cleanup_0 ();
int mqtt_connect();
//连接失败的回调函数
void connlost(void *context, char *cause){
printf("\nConnection lost\n");
printf("cause:%s\n", cause);
__cleanup_0();
while(1){
if(MQTTClient_isConnected(options.client)){
printf("i am connected\n");
break;
}
int rc=mqtt_connect();
if(rc==MQTTCLIENT_SUCCESS){
printf("reconnect success\n");
}else{
printf("reconnect faild,try again\n");
}
sleep(5);
}
}
void *subClient(void *threadId){
int rc;
//订阅
for(int i=0;i<TOTAL_SUBSCRIBE_COUNT;i++)
rc = MQTTClient_subscribe(options.client, subscribe_topics[i].topic, subscribe_topics[i].qos);
if(rc!=MQTTCLIENT_SUCCESS){
printf("subscribe faild:%d\n",rc);
}
}
int mqtt_connect(){
int rc=0;
rc = MQTTClient_create(&options.client, options.connection, options.clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
options.opts.keepAliveInterval = options.keepAliveInterval;
options.opts.cleansession = options.cleanSession;
options.opts.username = options.userName;
options.opts.password = options.password;
options.opts.MQTTVersion = options.mqttVersion;
options.opts.maxInflightMessages=options.maxInFlight;
options.opts.connectTimeout=options.connectTimeOut;
//遗嘱设置
options.will_opts.retained=options.will_retained;
options.will_opts.qos=options.will_qos;
options.will_opts.topicName=options.will_topicName;
options.will_opts.message=options.will_message;
options.opts.will=&options.will_opts;
if(options.isTLSConnect){
//ssl加密支持
MQTTClient_SSLOptions ssl_opts=MQTTClient_SSLOptions_initializer;
if(options.trustStore !=NULL && access(options.trustStore,0))
printf("ca file is not accessible\n");
if(options.keyStore !=NULL && access(options.keyStore,0))
printf("client certificate file is not accessible\n");
if(options.privateKey !=NULL && access(options.privateKey,0))
printf("client key file is not accessible\n");
ssl_opts.trustStore=options.trustStore;
ssl_opts.keyStore=options.keyStore;
ssl_opts.privateKey=options.privateKey;
ssl_opts.privateKeyPassword=options.privateKeyPassword;
ssl_opts.enabledCipherSuites=options.tlsVersion;
ssl_opts.enableServerCertAuth=1;
options.opts.ssl=&ssl_opts;
}
//若有订阅topic,则设置回调函数
if(TOTAL_SUBSCRIBE_COUNT>=0){
MQTTClient_setCallbacks(options.client, NULL, connlost, msgarrvd, delivered);
}
rc = MQTTClient_connect(options.client, &options.opts);
if(rc==MQTTCLIENT_SUCCESS){
if(TOTAL_SUBSCRIBE_COUNT>=0)
pthread_create(&options.sub_thread_id, NULL, subClient, (void *)0);
if(TOTAL_PUBLISH_COUNT>=0)
pthread_create(&options.pub_thread_id, NULL, pubClient, (void *)1);
}else{
printf("connect faild:%d\n",rc);
}
return rc;
}
int __init_0(int argc, char** argv)
{
int rc=mqtt_connect();
return rc;
}
/* stop client */
int __cleanup_0 (){
for(int i=0;i<TOTAL_SUBSCRIBE_COUNT;i++)
MQTTClient_unsubscribe(options.client, subscribe_topics[i].topic);
MQTTClient_disconnect(options.client, 10000);
MQTTClient_destroy(&options.client);
pthread_cancel(options.pub_thread_id);
pthread_cancel(options.sub_thread_id);
return 1;
}