/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - bug 384016 - segv setting will message
* Ian Craggs - bug 384053 - v1.0.0.7 - stop MQTTClient_receive on socket error
* Ian Craggs, Allan Stockdill-Mander - add ability to connect with SSL
* Ian Craggs - multiple server connection support
* Ian Craggs - fix for bug 413429 - connectionLost not called
* Ian Craggs - fix for bug 421103 - trying to write to same socket, in publish/retries
* Ian Craggs - fix for bug 419233 - mutexes not reporting errors
* Ian Craggs - fix for bug 420851
* Ian Craggs - fix for bug 432903 - queue persistence
* Ian Craggs - MQTT 3.1.1 support
* Ian Craggs - fix for bug 438176 - MQTT version selection
* Rong Xiang, Ian Craggs - C++ compatibility
* Ian Craggs - fix for bug 443724 - stack corruption
* Ian Craggs - fix for bug 447672 - simultaneous access to socket structure
*******************************************************************************/
/**
* @file
* \brief Synchronous API implementation
*
*/
#define _GNU_SOURCE /* for pthread_mutexattr_settype */
#include <stdlib.h>
#if !defined(WIN32) && !defined(WIN64)
#include <sys/time.h>
#endif
#include "MQTTClient.h"
#if !defined(NO_PERSISTENCE)
#include "MQTTPersistence.h"
#endif
#include "utf-8.h"
#include "MQTTProtocol.h"
#include "MQTTProtocolOut.h"
#include "Thread.h"
#include "SocketBuffer.h"
#include "StackTrace.h"
#include "Heap.h"
#if defined(OPENSSL)
#include <openssl/ssl.h>
#endif
#define URI_TCP "tcp://"
#define BUILD_TIMESTAMP "##MQTTCLIENT_BUILD_TAG##"
#define CLIENT_VERSION "##MQTTCLIENT_VERSION_TAG##"
char* client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP;
char* client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION;
static ClientStates ClientState =
{
CLIENT_VERSION, /* version */
NULL /* client list */
};
ClientStates* bstate = &ClientState;
MQTTProtocol state;
#if defined(WIN32) || defined(WIN64)
static mutex_type mqttclient_mutex = NULL;
static mutex_type socket_mutex = NULL;
extern mutex_type stack_mutex;
extern mutex_type heap_mutex;
extern mutex_type log_mutex;
BOOL APIENTRY DllMain(HANDLE hModule,
DWORD ul_reason_for_call,
LPVOID lpReserved)
{
switch (ul_reason_for_call)
{
case DLL_PROCESS_ATTACH:
Log(TRACE_MAX, -1, "DLL process attach");
if (mqttclient_mutex == NULL)
{
mqttclient_mutex = CreateMutex(NULL, 0, NULL);
stack_mutex = CreateMutex(NULL, 0, NULL);
heap_mutex = CreateMutex(NULL, 0, NULL);
log_mutex = CreateMutex(NULL, 0, NULL);
socket_mutex = CreateMutex(NULL, 0, NULL);
}
case DLL_THREAD_ATTACH:
Log(TRACE_MAX, -1, "DLL thread attach");
case DLL_THREAD_DETACH:
Log(TRACE_MAX, -1, "DLL thread detach");
case DLL_PROCESS_DETACH:
Log(TRACE_MAX, -1, "DLL process detach");
}
return TRUE;
}
#else
static pthread_mutex_t mqttclient_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type mqttclient_mutex = &mqttclient_mutex_store;
static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type socket_mutex = &socket_mutex_store;
void MQTTClient_init()
{
pthread_mutexattr_t attr;
int rc;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
if ((rc = pthread_mutex_init(mqttclient_mutex, &attr)) != 0)
printf("MQTTClient: error %d initializing client_mutex\n", rc);
if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
printf("MQTTClient: error %d initializing socket_mutex\n", rc);
}
#define WINAPI
#endif
static volatile int initialized = 0;
static List* handles = NULL;
static time_t last;
static int running = 0;
static int tostop = 0;
static thread_id_type run_id = 0;
MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long timeout);
MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc);
int MQTTClient_cleanSession(Clients* client);
void MQTTClient_stop();
int MQTTClient_disconnect_internal(MQTTClient handle, int timeout);
int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop);
void MQTTClient_writeComplete(int socket);
typedef struct
{
MQTTClient_message* msg;
char* topicName;
int topicLen;
unsigned int seqno; /* only used on restore */
} qEntry;
typedef struct
{
char* serverURI;
#if defined(OPENSSL)
int ssl;
#endif
Clients* c;
MQTTClient_connectionLost* cl;
MQTTClient_messageArrived* ma;
MQTTClient_deliveryComplete* dc;
void* context;
sem_type connect_sem;
int rc; /* getsockopt return code in connect */
sem_type connack_sem;
sem_type suback_sem;
sem_type unsuback_sem;
MQTTPacket* pack;
} MQTTClients;
void MQTTClient_sleep(long milliseconds)
{
FUNC_ENTRY;
#if defined(WIN32) || defined(WIN64)
Sleep(milliseconds);
#else
usleep(milliseconds*1000);
#endif
FUNC_EXIT;
}
#if defined(WIN32) || defined(WIN64)
#define START_TIME_TYPE DWORD
START_TIME_TYPE MQTTClient_start_clock(void)
{
return GetTickCount();
}
#elif defined(AIX)
#define START_TIME_TYPE struct timespec
START_TIME_TYPE MQTTClient_start_clock(void)
{
static struct timespec start;
clock_gettime(CLOCK_REALTIME, &start);
return start;
}
#else
#define START_TIME_TYPE struct timeval
START_TIME_TYPE MQTTClient_start_clock(void)
{
static struct timeval start;
gettimeofday(&start, NULL);
return start;
}
#endif
#if defined(WIN32) || defined(WIN64)
long MQTTClient_elapsed(DWORD milliseconds)
{
return GetTickCount() - milliseconds;
}
#elif defined(AIX)
#define assert(a)
long MQTTClient_elapsed(struct timespec start)
{
struct timespec now, res;
clock_gettime(CLOCK_REALTIME, &now);
ntimersub(now, start, res);
return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
}
#else
long MQTTClient_elapsed(struct timeval start)
{
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&now, &start, &res);
return (res.tv_sec)*1000 + (res.tv_usec)/1000;
}
#endif
int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context)
{
int rc = 0;
MQTTClients *m = NULL;
FUNC_ENTRY;
rc = Thread_lock_mutex(mqttclient_mutex);
if (serverURI == NULL || clientId == NULL)
{
rc = MQTTCLIENT_NULL_PARAMETER;
goto exit;
}
if (!UTF8_validateString(clientId))
{
rc = MQTTCLIENT_BAD_UTF8_STRING;
goto exit;
}
if (!initialized)
{
#if defined(HEAP_H)
Heap_initialize();
#endif
Log_initialize((Log_nameValue*)MQTTClient_getVersionInfo());
bstate->clients = ListInitialize();
Socket_outInitialize();
Socket_setWriteCompleteCallback(MQTTClient_writeComplete);
handles = ListInitialize();
#if defined(OPENSSL)
SSLSocket_initialize();
#endif
initialized = 1;
}
m = malloc(sizeof(MQTTClients));
*handle = m;
memset(m, '\0', sizeof(MQTTClients));
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
serverURI += strlen(URI_TCP);
#if defined(OPENSSL)
else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
{
serverURI += strlen(URI_SSL);
m->ssl = 1;
}
#endif
m->serverURI = MQTTStrdup(serverURI);
ListAppend(handles, m, sizeof(MQTTClients));
m->c = malloc(sizeof(Clients));
memset(m->c, '\0', sizeof(Clients));
m->c->context = m;
m->c->outboundMsgs = ListInitialize();
m->c->inboundMsgs = ListInitialize();
m->c->messageQueue = ListInitialize();
m->c->clientID = MQTTStrdup(clientId);
m->connect_sem = Thread_cre
- 1
- 2
- 3
前往页