/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - bug 384016 - segv setting will message
* Ian Craggs - bug 384053 - v1.0.0.7 - stop MQTTClient_receive on socket error
* Ian Craggs, Allan Stockdill-Mander - add ability to connect with SSL
* Ian Craggs - multiple server connection support
* Ian Craggs - fix for bug 413429 - connectionLost not called
* Ian Craggs - fix for bug 421103 - trying to write to same socket, in publish/retries
* Ian Craggs - fix for bug 419233 - mutexes not reporting errors
* Ian Craggs - fix for bug 420851
* Ian Craggs - fix for bug 432903 - queue persistence
* Ian Craggs - MQTT 3.1.1 support
* Ian Craggs - fix for bug 438176 - MQTT version selection
* Rong Xiang, Ian Craggs - C++ compatibility
* Ian Craggs - fix for bug 443724 - stack corruption
* Ian Craggs - fix for bug 447672 - simultaneous access to socket structure
*******************************************************************************/
/**
* @file
* \brief Synchronous API implementation
*
*/
#define _GNU_SOURCE /* for pthread_mutexattr_settype */
#include <stdlib.h>
#if !defined(WIN32) && !defined(WIN64)
#include <sys/time.h>
#endif
#include "MQTTClient.h"
#if !defined(NO_PERSISTENCE)
#include "MQTTPersistence.h"
#endif
#include "utf-8.h"
#include "MQTTProtocol.h"
#include "MQTTProtocolOut.h"
#include "Thread.h"
#include "SocketBuffer.h"
#include "StackTrace.h"
#include "Heap.h"
#if defined(OPENSSL)
#include <openssl/ssl.h>
#endif
#define URI_TCP "tcp://"
#define BUILD_TIMESTAMP "##MQTTCLIENT_BUILD_TAG##"
#define CLIENT_VERSION "##MQTTCLIENT_VERSION_TAG##"
char* client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP;
char* client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION;
static ClientStates ClientState =
{
CLIENT_VERSION, /* version */
NULL /* client list */
};
ClientStates* bstate = &ClientState;
MQTTProtocol state;
#if defined(WIN32) || defined(WIN64)
static mutex_type mqttclient_mutex = NULL;
static mutex_type socket_mutex = NULL;
extern mutex_type stack_mutex;
extern mutex_type heap_mutex;
extern mutex_type log_mutex;
BOOL APIENTRY DllMain(HANDLE hModule,
DWORD ul_reason_for_call,
LPVOID lpReserved)
{
switch (ul_reason_for_call)
{
case DLL_PROCESS_ATTACH:
Log(TRACE_MAX, -1, "DLL process attach");
if (mqttclient_mutex == NULL)
{
mqttclient_mutex = CreateMutex(NULL, 0, NULL);
stack_mutex = CreateMutex(NULL, 0, NULL);
heap_mutex = CreateMutex(NULL, 0, NULL);
log_mutex = CreateMutex(NULL, 0, NULL);
socket_mutex = CreateMutex(NULL, 0, NULL);
}
case DLL_THREAD_ATTACH:
Log(TRACE_MAX, -1, "DLL thread attach");
case DLL_THREAD_DETACH:
Log(TRACE_MAX, -1, "DLL thread detach");
case DLL_PROCESS_DETACH:
Log(TRACE_MAX, -1, "DLL process detach");
}
return TRUE;
}
#else
static pthread_mutex_t mqttclient_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type mqttclient_mutex = &mqttclient_mutex_store;
static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type socket_mutex = &socket_mutex_store;
void MQTTClient_init()
{
pthread_mutexattr_t attr;
int rc;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
if ((rc = pthread_mutex_init(mqttclient_mutex, &attr)) != 0)
printf("MQTTClient: error %d initializing client_mutex\n", rc);
if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
printf("MQTTClient: error %d initializing socket_mutex\n", rc);
}
#define WINAPI
#endif
static volatile int initialized = 0;
static List* handles = NULL;
static time_t last;
static int running = 0;
static int tostop = 0;
static thread_id_type run_id = 0;
MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long timeout);
MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc);
int MQTTClient_cleanSession(Clients* client);
void MQTTClient_stop();
int MQTTClient_disconnect_internal(MQTTClient handle, int timeout);
int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop);
void MQTTClient_writeComplete(int socket);
typedef struct
{
MQTTClient_message* msg;
char* topicName;
int topicLen;
unsigned int seqno; /* only used on restore */
} qEntry;
typedef struct
{
char* serverURI;
#if defined(OPENSSL)
int ssl;
#endif
Clients* c;
MQTTClient_connectionLost* cl;
MQTTClient_messageArrived* ma;
MQTTClient_deliveryComplete* dc;
void* context;
sem_type connect_sem;
int rc; /* getsockopt return code in connect */
sem_type connack_sem;
sem_type suback_sem;
sem_type unsuback_sem;
MQTTPacket* pack;
} MQTTClients;
void MQTTClient_sleep(long milliseconds)
{
FUNC_ENTRY;
#if defined(WIN32) || defined(WIN64)
Sleep(milliseconds);
#else
usleep(milliseconds*1000);
#endif
FUNC_EXIT;
}
#if defined(WIN32) || defined(WIN64)
#define START_TIME_TYPE DWORD
START_TIME_TYPE MQTTClient_start_clock(void)
{
return GetTickCount();
}
#elif defined(AIX)
#define START_TIME_TYPE struct timespec
START_TIME_TYPE MQTTClient_start_clock(void)
{
static struct timespec start;
clock_gettime(CLOCK_REALTIME, &start);
return start;
}
#else
#define START_TIME_TYPE struct timeval
START_TIME_TYPE MQTTClient_start_clock(void)
{
static struct timeval start;
gettimeofday(&start, NULL);
return start;
}
#endif
#if defined(WIN32) || defined(WIN64)
long MQTTClient_elapsed(DWORD milliseconds)
{
return GetTickCount() - milliseconds;
}
#elif defined(AIX)
#define assert(a)
long MQTTClient_elapsed(struct timespec start)
{
struct timespec now, res;
clock_gettime(CLOCK_REALTIME, &now);
ntimersub(now, start, res);
return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
}
#else
long MQTTClient_elapsed(struct timeval start)
{
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&now, &start, &res);
return (res.tv_sec)*1000 + (res.tv_usec)/1000;
}
#endif
int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context)
{
int rc = 0;
MQTTClients *m = NULL;
FUNC_ENTRY;
rc = Thread_lock_mutex(mqttclient_mutex);
if (serverURI == NULL || clientId == NULL)
{
rc = MQTTCLIENT_NULL_PARAMETER;
goto exit;
}
if (!UTF8_validateString(clientId))
{
rc = MQTTCLIENT_BAD_UTF8_STRING;
goto exit;
}
if (!initialized)
{
#if defined(HEAP_H)
Heap_initialize();
#endif
Log_initialize((Log_nameValue*)MQTTClient_getVersionInfo());
bstate->clients = ListInitialize();
Socket_outInitialize();
Socket_setWriteCompleteCallback(MQTTClient_writeComplete);
handles = ListInitialize();
#if defined(OPENSSL)
SSLSocket_initialize();
#endif
initialized = 1;
}
m = malloc(sizeof(MQTTClients));
*handle = m;
memset(m, '\0', sizeof(MQTTClients));
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
serverURI += strlen(URI_TCP);
#if defined(OPENSSL)
else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
{
serverURI += strlen(URI_SSL);
m->ssl = 1;
}
#endif
m->serverURI = MQTTStrdup(serverURI);
ListAppend(handles, m, sizeof(MQTTClients));
m->c = malloc(sizeof(Clients));
memset(m->c, '\0', sizeof(Clients));
m->c->context = m;
m->c->outboundMsgs = ListInitialize();
m->c->inboundMsgs = ListInitialize();
m->c->messageQueue = ListInitialize();
m->c->clientID = MQTTStrdup(clientId);
m->connect_sem = Thread_cre
没有合适的资源?快使用搜索试试~ 我知道了~
MQTT客户端C语言代码(Paho库及二次封装)
共50个文件
c:23个
h:22个
ds_store:2个
5星 · 超过95%的资源 需积分: 48 628 下载量 161 浏览量
2016-05-30
00:57:49
上传
评论 16
收藏 124KB ZIP 举报
温馨提示
MQTT客户端实现(使用Eclipse Paho C库,进行了二次封装)。在Linkit7688单片机上运行测试通过,附例程。
资源推荐
资源详情
资源评论
收起资源包目录
mqtt_client_2.zip (50个子文件)
mqtt_client_2
main.c 236B
mqtt
MQTTPacketOut.h 1KB
Thread.c 10KB
SocketBuffer.c 9KB
utf-8.h 862B
Heap.h 2KB
mqtt_client.c 7KB
Heap.c 13KB
MQTTPersistence.h 3KB
utf-8.c 5KB
SocketBuffer.h 2KB
Socket.c 21KB
MQTTClient.h 58KB
mqtt_client.h 6KB
Log.h 2KB
MQTTProtocolClient.c 22KB
MQTTPersistence.c 17KB
MQTTProtocolOut.h 2KB
MQTTClient.c 48KB
.DS_Store 6KB
LinkedList.c 13KB
Tree.c 14KB
Log.c 14KB
SSLSocket.h 2KB
LinkedList.h 3KB
MQTTPacket.c 20KB
Clients.h 4KB
MQTTPersistenceDefault.c 18KB
Clients.c 2KB
Messages.c 4KB
Thread.h 2KB
StackTrace.c 5KB
Socket.h 4KB
MQTTProtocolOut.c 6KB
SSLSocket.c 22KB
MQTTPersistenceDefault.h 1KB
StackTrace.h 3KB
Tree.h 3KB
Messages.h 815B
MQTTProtocol.h 1KB
MQTTPacketOut.c 7KB
MQTTProtocolClient.h 2KB
MQTTClientPersistence.h 10KB
MQTTPacket.h 7KB
.DS_Store 6KB
.settings
language.settings.xml 2KB
.project 763B
.cproject 12KB
subscribe.c 2KB
publish.c 1KB
共 50 条
- 1
JoStudio
- 粉丝: 1066
- 资源: 28
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
- 1
- 2
- 3
前往页