/**
* @file mqttClientService.c
*
* Implementation of MQTT Client Interface
*
*
*
* Copyright (C) Sierra Wireless Inc.
*
*/
#include "legato.h"
#include "interfaces.h"
#include "MQTTClient.h"
#include "Socket.h"
//--------------------------------------------------------------------------------------------------
/**
* This define specifies the max # of MQTT sessions to be created in memory pools & reference maps
* like the reference map for message arrived handlers
*/
//--------------------------------------------------------------------------------------------------
#define MQTT_SESSION_MAX 16
//--------------------------------------------------------------------------------------------------
/**
* Path to the SSL certificates file
*/
//--------------------------------------------------------------------------------------------------
static const char *SslCaCertsPathPtr = "/etc/ssl/certs/ca-certificates.crt";
//--------------------------------------------------------------------------------------------------
/**
* MQTT Session structure
*/
//--------------------------------------------------------------------------------------------------
//IG
mqtt_SessionRef_t MQTTSession;
le_result_t res;
char DeviceIMEI[LE_INFO_IMEI_MAX_BYTES];
static void OnConnectionLost
(
void* context
)
{
LE_ERROR("Connection lost!");
}
static void OnMessageArrived
(
const char* topic,
const uint8_t* payload,
size_t payloadLen,
void* context
)
{
LE_ERROR("The publisher received a message!");
}
typedef struct mqtt_Session
{
MQTTClient client;
MQTTClient_connectOptions connectOptions;
MQTTClient_SSLOptions sslOptions;
mqtt_MessageArrivedHandlerFunc_t messageArrivedHandler;
mqtt_MessageArrivedHandlerRef_t messageArrivedHandlerRef;
void* messageArrivedHandlerContextPtr;
mqtt_ConnectionLostHandlerFunc_t connectionLostHandler;
mqtt_ConnectionLostHandlerRef_t connectionLostHandlerRef;
void* connectionLostHandlerContextPtr;
// The legato client session that owns this MQTT session
le_msg_SessionRef_t clientSession;
mqtt_SessionRef_t sessionRef;
} mqtt_Session;
static int QosEnumToValue(mqtt_Qos_t qos);
static void ConnectionLostHandler(void* contextPtr, char* causePtr);
static void ConnectionLostEventHandler(void* reportPtr);
static int MessageArrivedHandler(
void* contextPtr, char* topicNamePtr, int topicLen, MQTTClient_message* messagePtr);
static void MessageReceivedEventHandler(void* reportPtr);
//--------------------------------------------------------------------------------------------------
/**
* Safe ref map for mqtt_Session objects returned to clients.
*/
//--------------------------------------------------------------------------------------------------
static le_ref_MapRef_t SessionRefMap;
//--------------------------------------------------------------------------------------------------
/**
* Safe ref map for MQTT message handlers.
*/
//--------------------------------------------------------------------------------------------------
static le_ref_MapRef_t MessageHandlerRefMap = NULL;
//--------------------------------------------------------------------------------------------------
/**
* Event id that is used to signal that a message has been received from the MQTT broker. Events
* must be used because paho spawns a thread for receiving messages and that means that the
* callback can't call IPC methods because it is from a non-legato thread.
*/
//--------------------------------------------------------------------------------------------------
static le_event_Id_t ReceiveThreadEventId;
//--------------------------------------------------------------------------------------------------
/**
* Event id for connection lost events from paho. The justification for this event is the same as
* for ReceiveThreadEventId.
*/
//--------------------------------------------------------------------------------------------------
static le_event_Id_t ConnectionLostThreadEventId;
//--------------------------------------------------------------------------------------------------
/**
* MQTT session memory pool.
*/
//--------------------------------------------------------------------------------------------------
static le_mem_PoolRef_t MQTTSessionPoolRef = NULL;
//--------------------------------------------------------------------------------------------------
/**
* Username memory pool.
*/
//--------------------------------------------------------------------------------------------------
static le_mem_PoolRef_t UsernamePoolRef = NULL;
//--------------------------------------------------------------------------------------------------
/**
* Password memory pool.
*/
//--------------------------------------------------------------------------------------------------
static le_mem_PoolRef_t PasswordPoolRef = NULL;
//--------------------------------------------------------------------------------------------------
/**
* Message memory pool.
*/
//--------------------------------------------------------------------------------------------------
static le_mem_PoolRef_t MessagePoolRef = NULL;
//--------------------------------------------------------------------------------------------------
/**
* Topic memory pool.
*/
//--------------------------------------------------------------------------------------------------
static le_mem_PoolRef_t TopicPoolRef = NULL;
//--------------------------------------------------------------------------------------------------
/**
* Payload memory pool.
*/
//--------------------------------------------------------------------------------------------------
static le_mem_PoolRef_t PayloadPoolRef = NULL;
//--------------------------------------------------------------------------------------------------
/**
* Represents a message which has been received from the MQTT broker.
*/
//--------------------------------------------------------------------------------------------------
typedef struct mqtt_Message
{
// Safe reference to mqtt_Session
mqtt_SessionRef_t sessionRef;
char* topicPtr;
size_t topicLength;
uint8_t* payloadPtr;
size_t payloadLength;
} mqtt_Message;
//--------------------------------------------------------------------------------------------------
/**
* Creates an MQTT session object.
*
* @return
* LE_OK on success or LE_FAULT on failure
*/
//--------------------------------------------------------------------------------------------------
le_result_t mqtt_CreateSession
(
const char* brokerURIPtr, ///< [IN] The URI of the MQTT broker to connect to. Should be in
/// the form protocol://host:port. eg. tcp://1.2.3.4:1883 or
/// ssl://example.com:8883
const char* clientIdPtr, ///< [IN] Any unique string. If a client connects to an MQTT
/// broker using the same clientId as an existing session, then
/// the existing session will be terminated.
mqtt_SessionRef_t* sessionRefPtr ///< [OUT] The created session if the return result is LE_OK
)
{
mqtt_Session* s = le_mem_ForceAlloc(MQTTSessionPoolRef);
LE_ASSERT(s);
memset(s, 0, sizeof(*s));
const MQTTClient_connectOptions initConnOpts = MQTTClient_connectOptions_initializer;
memcpy(&(s->connectOptions), &initConnOpts, sizeof(initConnOpts));
const MQTTClient_SSLOptions initSslOpts = MQTTClient_SSLOptions_initializer;
memcpy(&(s->sslOptions), &initSslOpts, sizeof(initSslOpts));
s->sslOptions.trustStore = SslCaCertsPathPtr;
const int createResult = MQTTClient_create(
&(s->client),
brokerURIPtr,
clientIdPtr,
MQTTCLIENT_PERSISTENCE_NONE,
NULL);
if (createResult != MQTTCLIENT_SUCCESS)
{
LE_ERROR("Couldn't create MQTT session. Paho error code: %d", createResult);
le_mem_Release(s);
return LE_FAULT;
}
le_msg_SessionRef_t clientSession = mqtt_GetClientSessionRef();
s->clientSession = clientSession;
s->sessionRef = *sessionRefPtr = le_ref_CreateRef(SessionRefMap, s);
LE_ASSERT(MQTTClient_setCallbacks(
s->client,
*sessionRefPtr,
&ConnectionLostHandler,
&MessageArrivedHandler,
NULL) == MQTTCLIENT_SUCCESS);
return LE_OK;
}
//--------------------------------------------------------------------------------------------------
/**
* Destroy the given session.
*
* @note
* All MQTT sessions associated with the client session will be destroyed automatically when
* the client disconnects from the MQTT service.
*/
//--------------------------------------------------------------------------------------------------
void mqtt_DestroySession
(
mqtt_SessionRef_t sessionRef ///< [IN] Session to destroy
)
{
mqtt_Session* s = le_ref_Lookup(SessionRefMap, sessionRef);
if (s == NULL)
{
LE_KILL_CLIENT("Session doesn't exist");
return;
}
if (s->clientSession != mqtt_GetClientSessionRef())
{
LE_KILL_CLIENT("Session doesn't belong to this client");
return;
}
le_mem_Release(s);
}
//--------------------------------------------------------------------------------------------------
/**
* Destroy the MQTT session: internal cleanup.
*/
//--------------------------------------------------------------------------------------------------
static void DestroySessionInternal
(
mqtt_Session* sessionPtr
)
{
MQTTClient_destroy(&(sessionPtr->client));
// It is necessary to cast to char* from const char* in order to free the memory
// associated with the username and password.
le_mem_Release((char*)sessionPtr->connectOptions.username);
le_mem_Release((char*)sessionPtr->connectOptions.password);
}
//--------------------------------------------------------------------------------------------------
/**
* This is the internal session destructor for mqtt_Session allocated from MQTTSessionPoolRef that
* will run to clean it up upon an le_mem_Release() of it.
*/
//--------------------------------------------------------------------------------------------------
static void mqttSessionDestructor
(
void* objPtr
)
{
mqtt_Session* s = (mqtt_Session *)objPtr;
if (!s)
{
LE_KILL_CLIENT("Session doesn't exist");
return;
}
if (s->messageArrivedHandlerRef)
{
le_ref_DeleteRef(MessageHandlerRefMap, s->messageArrivedHandlerRef);
s->messageArrivedHandlerRef = NULL;
}
if (s->connectionLostHandlerRef)
{
le_ref_DeleteRef(MessageHandlerRefMap, s->connectionLostHandlerRef);
s->connectionLostHandlerRef = NULL;
}
DestroySessionInternal(s);
le_ref_DeleteRef(SessionRefMap, s->sessionRef);
}
//--------------------------------------------------------------------------------------------------
/**
* Set the connections options which will be used during subsequent calls to mqtt_Connect().
*/
//--------------------------------------------------------------------------------------------------
void mqtt_SetConnectOptions
(
mqtt_SessionRef_t sessionRef, ///< [IN] Session to set connection options in
uint16_t keepAliveInterval, ///< [IN] How often to send an MQTT PINGREQ packet if no other
/// packets are received
bool cleanSession, ///< [IN] When false, restore the previous state on a reconnect
const char* usernamePtr, ///< [IN] Username to connect with.
/// NULL if username is not required
const uint8_t* passwordPtr, ///< [IN] Password to connect with.
/// NULL if password is not required
size_t passwordLength, ///< [IN] Length of the password in bytes
uint16_t connectTimeout, ///< [IN] Connect timeout in seconds
uint16_t retryInterval ///< [IN] Retry interval in seconds
)
{
mqtt_Session* s = le_ref_Lookup(SessionRefMap, sessionRef);
if (s == NULL)
{
LE_KILL_CLIENT("Session doesn't exist");
return;
}
if (s->clientSession != mqtt_GetClientSessionRef())
{
LE_KILL_CLIENT("Session doesn't belong to this client");
return;
}
s->connectOptions.keepAliveInterval = keepAliveInterval;
s->connectOptions.cleansession = cleanSession;
// username
if ((char*)s->connectOptions.username != NULL)
{
le_mem_Release((char*)s->connectOptions.username);
}
if (usernamePtr != NULL)
{
s->connectOptions.username = le_mem_ForceAlloc(UsernamePoolRef);
LE_ASSERT(s->connectOptions.username != NULL);
strcpy((char*)s->connectOptions.username, usernamePtr);
}
else
{
s->connectOptions.username = NULL;
}
// password
if ((char*)s->connectOptions.password != NULL)
{
le_mem_Release((char*)s->connectOptions.password);
}
if (passwordPtr != NULL)
{
// paho uses null terminated strings for passwords, so the password may not contain any
// embedded null characters.
for (size_t i = 0; i < passwordLength; i++)
{
if (passwordPtr[i] == 0)
{
LE_KILL_CLIENT(
"Password contains embedded null characters and this is not currently "
"supported by this implementation");
break;
}
}
s->connectOptions.password = le_mem_ForceAlloc(PasswordPoolRef);
LE_ASSERT(s->connectOptions.password != NULL);
memcpy((uint8_t*)s->connectOptions.password, passwordPtr, passwordLength);
((uint8_t*)s->connectOptions.password)[passwordLength] = '\0';
}
else
{
s->connectOptions.username = NULL;
s->connectOptions.password = NULL;
if (usernamePtr != NULL)
{
LE_KILL_CLIENT("It is illegal to specify a password without a username");
}
}
s->connectOptions.connectTimeout = connectTimeout;
s->connectOptions.retryInterval = retryInterval;
s->connectOptions.ssl = &s->sslOptions;
}
//--------------------------------------------------------------------------------------------------
/**
* Connect to the MQTT broker using the provided session.
*
* @return
* - LE_OK on success
* - LE_BAD_PARAMETER if the connection options are bad
* - LE_FAULT for general failures
*/
//--------------------------------------------------------------------------------------------------
le_result_t mqtt_Connect
(
mqtt_SessionRef_t sessionRef ///< [IN] Session to connect
)
{
mqtt_Session* s = le_ref_Lookup(SessionRefMap, sessionRef);
if (s == NULL)
{
LE_KILL_CLIENT("Session doesn't exist");
return LE_FAULT;
}
if (s->clientSession != mqtt_GetClientSessionRef())
{
LE_KILL_CLIENT("Session doesn't belong to this client");
return LE_FAULT;
}
const int connectResult = MQTTClient_connect(s->client, &s->connectOptions);
le_result_t result;
switch (connectResult)
{
case SOCKET_ERROR:
LE_WARN("Socket error");
result = LE_FAULT;
break;
case MQTTCLIENT_NULL_PARAMETER:
case MQTTCLIENT_BAD_STRUCTURE:
case MQTTCLIENT_BAD_UTF8_STRING:
result = LE_BAD_PARAMETER;
break;
case MQTTCLIENT_SUCCESS:
result = LE_OK;
break;
default:
LE_WARN("Paho connect returned (%d)", connectResult);
result = LE_FAULT;
break;
}
return result;
}
//--------------------------------------------------------------------------------------------------
/**
* Disconnect a currently connected session
*
* @return
* - LE_OK on success
* - LE_FAULT on failure
*
* @note
* TODO: If the connection is lost right as disconnect is called, I think that this function
* will return LE_FAULT and the client will not know why.
*/
//--------------------------------------------------------------------------------------------------
le_result_t mqtt_Disconnect
(
mqtt_SessionRef_t sessionRef ///< [IN] Session to disconnect
)
{
mqtt_Session* s = le_ref_Lookup(SessionRefMap, sessionRef);
if (s == NULL)
{
LE_KILL_CLIENT("Session doesn't exist");
return LE_FAULT;
}
if (s->clientSession != mqtt_GetClientSessionRef())
{
LE_KILL_CLIENT("Session doesn't belong to this client");
return LE_FAULT;
}
const int waitBeforeDisconnectMs = 0;
const int disconnectResult = MQTTClient_disconnect(s->client, waitBeforeDisconnectMs);
le_result_t result;
switch (disconnectResult)
{
case MQTTCLIENT_SUCCESS:
result = LE_OK;
break;
case MQTTCLIENT_FAILURE:
result = LE_FAULT;
break;
case MQTTCLIENT_DISCONNECTED:
result = LE_FAULT;
LE_WARN("Already disconnected");
break;
default:
LE_WARN("Paho disconnect returned (%d)", disconnectResult);
result = LE_FAULT;
break;
}
return result;
}
//--------------------------------------------------------------------------------------------------
/**
* Publish the supplied payload to the MQTT broker on the given topic.
*
* @return
* LE_OK on success or LE_FAULT on failure
*/
//--------------------------------------------------------------------------------------------------
le_result_t mqtt_Publish
(
mqtt_SessionRef_t sessionRef, ///< [IN] Session
const char* topicPtr, ///< [IN] Topic
const uint8_t* payloadPtr, ///< [IN] Message
size_t payloadLen, ///< [IN] Message length
mqtt_Qos_t qos, ///< [IN] QoS mode
bool retain ///< [IN] Retain flag for message
)
{
mqtt_Session* s = le_ref_Lookup(SessionRefMap, sessionRef);
if (s == NULL)
{
LE_KILL_CLIENT("Session doesn't exist");
return LE_FAULT;
}
if (s->clientSession != mqtt_GetClientSessionRef())
{
LE_KILL_CLIENT("Session doesn't belong to this client");
return LE_FAULT;
}
MQTTClient_deliveryToken* dtNotUsed = NULL;
const int publishResult = MQTTClient_publish(
s->client, topicPtr, payloadLen, (void*)payloadPtr, QosEnumToValue(qos), retain, dtNotUsed);
le_result_t result = LE_OK;
if (publishResult != MQTTCLIENT_SUCCESS)
{
LE_WARN("Publish failed with error code (%d)", publishResult);
result = LE_FAULT;
}
return result;
}
//--------------------------------------------------------------------------------------------------
/**
* Subscribe to the given topic pattern. Topics look like UNIX filesystem paths. Eg.
* "/bedroom/sensors/motion". Patterns may include special wildcard characters "+" and "#" to match
* one or multiple levels of a topic. For example. "/#/motion" will match topics
* "/bedroom/sensors/motion" and "/car/data/motion", but not "/bedroom/sensors/motion/enabled".
*
* @return
* LE_OK on success or LE_FAULT on failure
*/
//--------------------------------------------------------------------------------------------------
le_result_t mqtt_Subscribe
(
mqtt_SessionRef_t sessionRef, ///< [IN] Session
const char* topicPatternPtr, ///< [IN] Topic pattern
mqtt_Qos_t qos ///< [IN] QoS mode
)
{
mqtt_Session* s = le_ref_Lookup(SessionRefMap, sessionRef);
if (s == NULL)
{
LE_KILL_CLIENT("Session doesn't exist");
return LE_FAULT;
}
if (s->clientSession != mqtt_GetClientSessionRef())
{
LE_KILL_CLIENT("Session doesn't belong to this client");
return LE_FAULT;
}
const int subscribeResult = MQTTClient_subscribe(s->client, topicPatternPtr, qos);
le_result_t result = LE_OK;
if (subscribeResult != MQTTCLIENT_SUCCESS)
{
LE_WARN("Subscribe failed with error code (%d)", subscribeResult);
result = LE_FAULT;
}
return result;
}
//--------------------------------------------------------------------------------------------------
/**
* Unsubscribe from the given topic pattern.
*
* @return
* LE_OK on success or LE_FAULT on failure.
*/
//--------------------------------------------------------------------------------------------------
le_result_t mqtt_Unsubscribe
(
mqtt_SessionRef_t sessionRef, ///< [IN] Session
const char* topicPatternPtr ///< [IN] Topic pattern
)
{
mqtt_Session* s = le_ref_Lookup(SessionRefMap, sessionRef);
if (s == NULL)
{
LE_KILL_CLIENT("Session doesn't exist");
return LE_FAULT;
}
if (s->clientSession != mqtt_GetClientSessionRef())
{
LE_KILL_CLIENT("Session doesn't belong to this client");
return LE_FAULT;
}
const int unsubscribeResult = MQTTClient_unsubscribe(sessionRef->client, topicPatternPtr);
le_result_t result = LE_OK;
if (unsubscribeResult != MQTTCLIENT_SUCCESS)
{
LE_WARN("Unsubscribe failed with error code (%d)", unsubscribeResult);
result = LE_FAULT;
}
return result;
}
//--------------------------------------------------------------------------------------------------
/**
* Set the connection lost handler for the session.
*
* @return
* A handle which allows the connection lost handler to be removed.
*
* @note
* Only one handler may be registered
*/
//--------------------------------------------------------------------------------------------------
mqtt_ConnectionLostHandlerRef_t mqtt_AddConnectionLostHandler
(
mqtt_SessionRef_t sessionRef, ///< [IN] Session
mqtt_ConnectionLostHandlerFunc_t handler, ///< [IN] Connection lost handler
void* contextPtr ///< [IN] Context
)
{
mqtt_Session* s = le_ref_Lookup(SessionRefMap, sessionRef);
if (s == NULL)
{
LE_KILL_CLIENT("Session doesn't exist");
return NULL;
}
if (s->clientSession != mqtt_GetClientSessionRef())
{
LE_KILL_CLIENT("Session doesn't belong to this client");
return NULL;
}
if (s->connectionLostHandler != NULL)
{
LE_KILL_CLIENT("A registered connection lost handler is present; only 1 allowed");
return NULL;
}
s->connectionLostHandler = handler;
s->connectionLostHandlerContextPtr = contextPtr;
s->connectionLostHandlerRef = le_ref_CreateRef(MessageHandlerRefMap, s);
return s->connectionLostHandlerRef;
}
//--------------------------------------------------------------------------------------------------
/**
* Deregister the connection lost handler for the session.
*/
//--------------------------------------------------------------------------------------------------
void mqtt_RemoveConnectionLostHandler
(
mqtt_ConnectionLostHandlerRef_t handlerRef ///< [IN] Connection lost handler
)
{
mqtt_Session* s = le_ref_Lookup(MessageHandlerRefMap, handlerRef);
if (s == NULL)
{
LE_KILL_CLIENT("Session doesn't exist");
return;
}
if (s->clientSession != mqtt_GetClientSessionRef())
{
LE_KILL_CLIENT("Session doesn't belong to this client");
return;
}
s->connectionLostHandler = NULL;
s->connectionLostHandlerContextPtr = NULL;
le_ref_DeleteRef(MessageHandlerRefMap, handlerRef);
s->connectionLostHandlerRef = NULL;
}
//--------------------------------------------------------------------------------------------------
/**
* Set the message arrived handler for the session.
*
* @return
* A handle which can be used to deregister the message arrived handler.
*
* @note
* Only one message arrived handler may be registered for a session.
*/
//--------------------------------------------------------------------------------------------------
mqtt_MessageArrivedHandlerRef_t mqtt_AddMessageArrivedHandler
(
mqtt_SessionRef_t sessionRef, ///< [IN] Session
mqtt_MessageArrivedHandlerFunc_t handler, ///< [IN] Message arrived handler
void* contextPtr ///< [IN] Context
)
{
mqtt_Session* s = le_ref_Lookup(SessionRefMap, sessionRef);
if (s == NULL)
{
LE_KILL_CLIENT("Session doesn't exist");
return NULL;
}
if (s->clientSession != mqtt_GetClientSessionRef())
{
LE_KILL_CLIENT("Session doesn't belong to this client");
return NULL;
}
if (s->messageArrivedHandler != NULL)
{
LE_KILL_CLIENT("A registered message arrived handler is present; only 1 allowed");
return NULL;
}
s->messageArrivedHandler = handler;
s->messageArrivedHandlerContextPtr = contextPtr;
s->messageArrivedHandlerRef = le_ref_CreateRef(MessageHandlerRefMap, s);
return s->messageArrivedHandlerRef;
}
//--------------------------------------------------------------------------------------------------
/**
* Deregister the message arrived handler for the session.
*/
//--------------------------------------------------------------------------------------------------
void mqtt_RemoveMessageArrivedHandler
(
mqtt_MessageArrivedHandlerRef_t handlerRef ///< [IN] Message arrived handler
)
{
mqtt_Session* s = le_ref_Lookup(MessageHandlerRefMap, handlerRef);
if (s == NULL)
{
LE_KILL_CLIENT("Session doesn't exist");
return;
}
if (s->clientSession != mqtt_GetClientSessionRef())
{
LE_KILL_CLIENT("Session doesn't belong to this client");
return;
}
s->messageArrivedHandler = NULL;
s->messageArrivedHandlerContextPtr = NULL;
le_ref_DeleteRef(MessageHandlerRefMap, handlerRef);
s->messageArrivedHandlerRef = NULL;
}
//--------------------------------------------------------------------------------------------------
/**
* Gets the correct quality of service integer value as defined by the MQTT specification from the
* enum type.
*
* @return
* The QoS value as defined by the MQTT specification
*/
//--------------------------------------------------------------------------------------------------
static int QosEnumToValue
(
mqtt_Qos_t qos ///< [IN] The QoS enum value to convert.
)
{
int result = 0;
switch (qos)
{
case MQTT_QOS0_TRANSMIT_ONCE:
result = 0;
break;
case MQTT_QOS1_RECEIVE_AT_LEAST_ONCE:
result = 1;
break;
case MQTT_QOS2_RECEIVE_EXACTLY_ONCE:
result = 2;
break;
default:
LE_KILL_CLIENT("Invalid QoS setting (%d)", qos);
result = 0;
break;
}
return result;
}
//--------------------------------------------------------------------------------------------------
/**
* This is the connection lost callback function that is supplied to the paho library. The
* function generates an event rather than calling the client supplied callback because this
* function will be called on a non-Legato thread.
*/
//--------------------------------------------------------------------------------------------------
static void ConnectionLostHandler
(
void* contextPtr, ///< context parameter contains the session for which the connection was lost
char* causePtr ///< paho library doesn't currently populate this
)
{
le_event_Report(ConnectionLostThreadEventId, &contextPtr, sizeof(void*));
}
//--------------------------------------------------------------------------------------------------
/**
* The event handler for the connection lost event that is generated by ConnectionLostHandler.
* This function calls the handler supplied by the client.
*/
//--------------------------------------------------------------------------------------------------
static void ConnectionLostEventHandler
(
void* reportPtr
)
{
mqtt_Session* s = le_ref_Lookup(SessionRefMap, reportPtr);
if (s == NULL)
{
LE_KILL_CLIENT("Session doesn't exist");
return;
}
if (s->connectionLostHandler != NULL)
{
s->connectionLostHandler(s->connectionLostHandlerContextPtr);
}
else
{
LE_WARN("Connection was lost, but no handler is registered to receive the notification");
}
}
//--------------------------------------------------------------------------------------------------
/**
* This is the message arrived callback function that is supplied to the paho library. The
* function generates an event rather than calling the client supplied callback because this
* function will be called on a non-Legato thread.
*/
//--------------------------------------------------------------------------------------------------
static int MessageArrivedHandler
(
void* contextPtr,
char* topicNamePtr,
int topicLen,
MQTTClient_message* messagePtr
)
{
mqtt_Session* s = le_ref_Lookup(SessionRefMap, contextPtr);
if (s == NULL)
{
LE_WARN("Session doesn't exist");
return true;
}
mqtt_Message* storedMsgPtr = le_mem_ForceAlloc(MessagePoolRef);
LE_ASSERT(storedMsgPtr);
memset(storedMsgPtr, 0, sizeof(*storedMsgPtr));
LE_DEBUG("MessageArrivedHandler called for topic=%s. Storing session=0x%p", topicNamePtr, contextPtr);
storedMsgPtr->sessionRef = contextPtr;
// When topicLen is 0 it means that the topic contains embedded nulls and can't be treated as a
// normal C string
storedMsgPtr->topicLength = (topicLen == 0) ? (strlen(topicNamePtr) + 1) : topicLen;
storedMsgPtr->topicPtr = le_mem_ForceAlloc(TopicPoolRef);
LE_ASSERT(storedMsgPtr->topicPtr);
memset(storedMsgPtr->topicPtr, 0, sizeof(MQTT_MAX_TOPIC_LENGTH));
memcpy(storedMsgPtr->topicPtr, topicNamePtr, storedMsgPtr->topicLength);
storedMsgPtr->payloadLength = messagePtr->payloadlen;
storedMsgPtr->payloadPtr = le_mem_ForceAlloc(PayloadPoolRef);
LE_ASSERT(storedMsgPtr->payloadPtr);
memset(storedMsgPtr->payloadPtr, 0, sizeof(MQTT_MAX_PAYLOAD_LENGTH));
memcpy(storedMsgPtr->payloadPtr, messagePtr->payload, storedMsgPtr->payloadLength);
le_event_Report(ReceiveThreadEventId, &storedMsgPtr, sizeof(mqtt_Message*));
return true;
}
//--------------------------------------------------------------------------------------------------
/**
* The event handler for the message arrived event that is generated by MessageArrivedHandler.
* This function calls the handler supplied by the client.
*/
//--------------------------------------------------------------------------------------------------
static void MessageReceivedEventHandler
(
void* reportPtr
)
{
mqtt_Message* storedMsgPtr = *((mqtt_Message**)reportPtr);
mqtt_Session* s = le_ref_Lookup(SessionRefMap, storedMsgPtr->sessionRef);
if (s == NULL)
{
LE_WARN("Session lookup failed for session=0x%p", storedMsgPtr->sessionRef);
return;
}
if (s->messageArrivedHandler != NULL)
{
if (storedMsgPtr->topicLength <= MQTT_MAX_TOPIC_LENGTH &&
storedMsgPtr->payloadLength <= MQTT_MAX_PAYLOAD_LENGTH)
{
s->messageArrivedHandler(
storedMsgPtr->topicPtr,
storedMsgPtr->payloadPtr,
storedMsgPtr->payloadLength,
s->messageArrivedHandlerContextPtr);
}
else
{
LE_WARN(
"Message arrived from broker, but it is too large to deliver using Legato IPC - "
"topicLength=%zu, payloadLength=%zu",
storedMsgPtr->topicLength,
storedMsgPtr->payloadLength);
}
}
else
{
LE_WARN(
"Message has arrived, but no handler is registered to receive the notification");
}
le_mem_Release(storedMsgPtr->topicPtr);
le_mem_Release(storedMsgPtr->payloadPtr);
le_mem_Release(storedMsgPtr);
}
//--------------------------------------------------------------------------------------------------
/**
* Destroy all owned sessions
*/
//--------------------------------------------------------------------------------------------------
static void DestroyAllOwnedSessions
(
le_msg_SessionRef_t clientSessionRef,
void* contextPtr
)
{
le_ref_IterRef_t it = le_ref_GetIterator(SessionRefMap);
le_result_t iterRes = le_ref_NextNode(it);
while (iterRes == LE_OK)
{
mqtt_Session* s = le_ref_GetValue(it);
LE_ASSERT(s != NULL);
LE_ASSERT(s->sessionRef != NULL);
// Advance the interator before deletion to prevent invalidation
iterRes = le_ref_NextNode(it);
if (s->clientSession == clientSessionRef)
{
le_mem_Release(s);
}
}
}
//--------------------------------------------------------------------------------------------------
/**
* Initialize MQTT Client service
*/
//--------------------------------------------------------------------------------------------------
COMPONENT_INIT
{
const char mqttBrokerURI[] = "tcp://192.168.2.3:1883";
//const char mqttBrokerURI[] = "tcp://eu.airvantage.net";
//const uint8_t mqttPassword[] = {'S', 'W', 'I'};
//char clientId[32] = "kineton";
LE_ASSERT_OK(le_info_GetImei(DeviceIMEI, NUM_ARRAY_MEMBERS(DeviceIMEI)));
char clientId[32];
snprintf(clientId, sizeof(clientId), "%s-pub", DeviceIMEI);
// Create memory pools
MQTTSessionPoolRef = le_mem_CreatePool("MQTT session pool", sizeof(mqtt_Session));
le_mem_ExpandPool(MQTTSessionPoolRef, MQTT_SESSION_MAX);
le_mem_SetDestructor(MQTTSessionPoolRef, mqttSessionDestructor);
UsernamePoolRef = le_mem_CreatePool("MQTT username pool", MQTT_MAX_USERNAME_LENGTH);
PasswordPoolRef = le_mem_CreatePool("MQTT password pool", MQTT_MAX_PASSWORD_LENGTH);
MessagePoolRef = le_mem_CreatePool("MQTT message pool", sizeof(mqtt_Message));
TopicPoolRef = le_mem_CreatePool("MQTT topic pool", MQTT_MAX_TOPIC_LENGTH);
PayloadPoolRef = le_mem_CreatePool("MQTT payload pool", MQTT_MAX_PAYLOAD_LENGTH);
// MessageHandlerRefMap is created with size (MQTT_SESSION_MAX * 2) since each MQTT session
// may have 2 handlers, i.e. 1 message arrived handler and 1 connection lost handler
SessionRefMap = le_ref_CreateMap("MQTT sessions", MQTT_SESSION_MAX);
MessageHandlerRefMap = le_ref_CreateMap("MQTT message handlers", MQTT_SESSION_MAX * 2);
ReceiveThreadEventId = le_event_CreateId(
"MqttClient receive notification", sizeof(mqtt_Message*));
le_event_AddHandler(
"MqttClient receive notification", ReceiveThreadEventId, MessageReceivedEventHandler);
ConnectionLostThreadEventId = le_event_CreateId(
"MqttClient connection lost notification", sizeof(mqtt_Message*));
le_event_AddHandler(
"MqttClient connection lost notification",
ConnectionLostThreadEventId,
ConnectionLostEventHandler);
res = mqtt_CreateSession(mqttBrokerURI, clientId, &MQTTSession);
if (res != LE_OK){
LE_INFO ("tutto male :( ");
}
else {
LE_INFO("tutto ok");
}
//inizia il mio codice
const uint16_t keepAliveInSeconds = 60;
const bool cleanSession = true;
//const char* username = DeviceIMEI;
const uint16_t connectTimeout = 20;
const uint16_t retryInterval = 10;
mqtt_SetConnectOptions(
MQTTSession,
keepAliveInSeconds,
cleanSession,
NULL,
NULL,
0,
connectTimeout,
retryInterval);
mqtt_AddConnectionLostHandler(MQTTSession, &OnConnectionLost, NULL);
mqtt_AddMessageArrivedHandler(MQTTSession, &OnMessageArrived, NULL);
res = mqtt_Connect(MQTTSession);
if (res != LE_OK)
{
LE_ERROR("Connection failed! error %d", res);
}
else
{
LE_INFO("Connected to server '%s'", mqttBrokerURI);
}
//finisce il mio codice
le_msg_AddServiceCloseHandler(mqtt_GetServiceRef(), DestroyAllOwnedSessions, NULL);
MQTTClient_init_options initOptions = MQTTClient_init_options_initializer;
initOptions.do_openssl_init = 1;
MQTTClient_global_init(&initOptions);
}