TLS 如下强调:
1、每个IOT设备应该有一对独有的公钥/私钥
2、SERVER的认证通过SERVER的"root certificate"
SSL产生过程:
$ openssl genrsa -out deviceCertOne.key 2048 $ openssl req -new -key deviceCertOne.key -out deviceCertOne.csr $ openssl x509 -req -in deviceCertOne.csr -CA sampleCACertificateOne.pem -Cakey sampleCACertificateOne.key -CAcreateserial -out deviceCertOne.crt -days 365 -sha256
数据区分配:
程序架构:
关键数据封装:
/** Protocol definitions */ typedef enum e_iot_service_protocol { IOT_SERVICE_PROTO_MQTT, ///< MQTT protocol } iot_service_protocol_t; /** IoT Service configuration structure */ typedef struct st_iot_service_cfg { const char *p_name; iot_service_protocol_t protocol; mqtt_client_instance_t *p_mqtt_client; } iot_service_cfg_t; /** IoT Service control structure */ typedef struct st_iot_service_ctrl_t { mqtt_client_instance_t * p_mqtt_client; TX_SEMAPHORE prop_sem; TX_MUTEX prop_mutex; int debug; } iot_service_ctrl_t; static iot_service_ctrl_t baidu_iot_ctrl; mqtt_client_instance_t g_mqtt_client = { .p_cfg = &g_mqtt_cfg, .p_ctrl = &g_mqtt_ctrl, .p_api = &nx_mqtt_api, }; static iot_service_cfg_t baidu_iot_cfg = { .p_name = "BAIDU", .protocol = IOT_SERVICE_PROTO_MQTT, .p_mqtt_client = &g_mqtt_client, }; static iot_service_instance_t baidu_iot = { .p_cfg = &baidu_iot_cfg, .p_ctrl = &baidu_iot_ctrl, .p_api = &baidu_iot_api, }; static ssp_err_t BAIDU_Open(iot_service_ctrl_t * p_ctrl, iot_service_cfg_t const * const p_cfg) { ssp_err_t status; mqtt_client_api_t *p_api; mqtt_client_ctrl_t *p_mqtt_ctrl; p_ctrl->p_mqtt_client = p_cfg->p_mqtt_client; p_api = p_ctrl->p_mqtt_client->p_api; p_mqtt_ctrl = p_ctrl->p_mqtt_client->p_ctrl; p_mqtt_ctrl->p_iot_service_ctrl = (void *)p_ctrl; p_mqtt_ctrl->notify = baidu_callback; p_ctrl->prop_sem = g_baidu_semaphore; p_ctrl->prop_mutex = g_baidu_mutex; } void turbine_main(ULONG data) { ULONG i = 0, freq_ms = CLOUD_UPDATE_FREQ_MS; ULONG ticks; struct sensors old, new; iot_service_instance_t *p_iots; const iot_service_api_t *p_api; ssp_err_t status; UINT retry_count = SECT_UPDATE_RETRY_COUNT; p_iots = (iot_service_instance_t *)data; p_api = p_iots->p_api; /* Send a pointer to the IoT Service instance via the message * queue. This will be picked up by the MQTT Notifier Thread */ tx_queue_send(&g_msg_queue, &p_iots, TX_WAIT_FOREVER); tx_thread_resume(&mqtt_callback_thread); status = p_api->open(p_iots->p_ctrl, p_iots->p_cfg); }
MQTT相关API:
#define nxd_mqtt_client_create _nxde_mqtt_client_create #define nxd_mqtt_client_login_set _nxde_mqtt_client_login_set #define nxd_mqtt_client_will_message_set _nxde_mqtt_client_will_message_set #define nxd_mqtt_client_delete _nxde_mqtt_client_delete #define nxd_mqtt_client_connect _nxde_mqtt_client_connect #define nxd_mqtt_client_secure_connect _nxde_mqtt_client_secure_connect #define nxd_mqtt_client_publish _nxde_mqtt_client_publish #define nxd_mqtt_client_subscribe _nxde_mqtt_client_subscribe #define nxd_mqtt_client_unsubscribe _nxde_mqtt_client_unsubscribe #define nxd_mqtt_client_disconnect _nxde_mqtt_client_disconnect #define nxd_mqtt_client_receive_notify_set _nxde_mqtt_client_receive_notify_set #define nxd_mqtt_client_message_get _nxde_mqtt_client_message_get #define nxd_mqtt_client_disconnect_notify_set _nxde_mqtt_client_disconnect_notify_set //Client Id是客户端到服务器唯一的标识,必须是在客户端到服务器唯一的ID,是处理QoS级别1和2消息ID的关键 //一般使用设备的Unique ID,配置ClientID回调: mqtt_client_id_callback()
//设备Unique ID g_fmi.p_api->uniqueIdGet(&uid)//SERVER获取MQTT数据: nxd_mqtt_client_message_get //MQTT反控设备事件回调: nxd_mqtt_client_receive_notify_set->NX_MQTT_notify_callback->aws_callback->AWS_PropertyGet //MQTT主动上报数据 turbine_main->update_iot_service-> //信号量用于MQTT事件同步: g_mqtt_notify_sem //MQTT AWS地址&认证 g_mqtt_cfg.p_root_ca = (uint8_t *) aws_ca_cert_der; #define AWS_SERVER_ADDRESS "iotmoonraker.us-west-2.prod.iot.us-west-2.amazonaws.com"
综上:MQTT要注意两个回调,一个是配置获取Client ID的回调,如上:
另一个为配置获取订阅消息的回调,如下:
status = nxd_mqtt_client_receive_notify_set(p_ctrl->p_secure_client, NX_MQTT_notify_callback); if(status) { nxd_mqtt_client_unsubscribe(p_ctrl->p_secure_client, (CHAR *)p_name, strlen(p_name)); NX_MQTT_log_msg(p_ctrl, "Could not set notify function (0x%02x) ", status); return SSP_ERR_ABORTED; }
回调:信号量的方式通知可以接受MQTT服务器推送的消息,即客户端订阅的消息
static void NX_MQTT_notify_callback(NXD_MQTT_CLIENT * p_client, UINT number_of_messages) { mqtt_client_ctrl_t *p_ctrl; p_ctrl = g_mqtt_client.p_ctrl; p_ctrl->num_messages += number_of_messages; tx_semaphore_put(&g_mqtt_notify_sem); }
MQTT接收事件处理线程:
void mqtt_callback_thread_entry(void) { UINT tstatus; mqtt_client_ctrl_t *p_ctrl; mqtt_client_instance_t *p_mqtt_client; iot_service_instance_t *p_iots; /* Get the IoT cloud provider via the message queue */ tx_queue_receive(&g_msg_queue, &p_iots, TX_WAIT_FOREVER); while (1) { /* This thread will be woken up by the MQTT notify function when * a message is received on the subscribed topic. */ tstatus = tx_semaphore_get(&g_mqtt_notify_sem, TX_WAIT_FOREVER); if (tstatus != TX_SUCCESS) continue; /* Get the selected IoT service and MQTT client and call the notify callback */ p_mqtt_client = p_iots->p_ctrl->p_mqtt_client; p_ctrl = p_mqtt_client->p_ctrl; while (p_ctrl->num_messages > 0) { p_mqtt_client->p_api->messageGet(p_ctrl); if (p_ctrl->notify) p_ctrl->notify(p_iots->p_ctrl); p_ctrl->num_messages--; } } }
static ssp_err_t NX_MQTT_Subscribe(mqtt_client_ctrl_t * p_ctrl, char const * p_name, mqtt_client_qos_t qos) { UINT status; status = nxd_mqtt_client_subscribe(p_ctrl->p_secure_client, (CHAR *)p_name, strlen(p_name), qos); if(status) { NX_MQTT_log_msg(p_ctrl, "Could not subscribe to MQTT topic %s (0x%02x) ", p_name, status); return SSP_ERR_ABORTED; } status = nxd_mqtt_client_receive_notify_set(p_ctrl->p_secure_client, NX_MQTT_notify_callback); if(status) { nxd_mqtt_client_unsubscribe(p_ctrl->p_secure_client, (CHAR *)p_name, strlen(p_name)); NX_MQTT_log_msg(p_ctrl, "Could not set notify function (0x%02x) ", status); return SSP_ERR_ABORTED; } return SSP_SUCCESS; } static void NX_MQTT_notify_callback(NXD_MQTT_CLIENT * p_client, UINT number_of_messages) { mqtt_client_ctrl_t *p_ctrl; SSP_PARAMETER_NOT_USED (p_client); p_ctrl = g_mqtt_client.p_ctrl; p_ctrl->num_messages += number_of_messages; tx_semaphore_put(&g_mqtt_notify_sem); } void mqtt_callback_thread_entry(void) { UINT tstatus; mqtt_client_ctrl_t *p_ctrl; mqtt_client_instance_t *p_mqtt_client; iot_service_instance_t *p_iots; /* Get the IoT cloud provider via the message queue */ tx_queue_receive(&g_msg_queue, &p_iots, TX_WAIT_FOREVER); while (1) { /* This thread will be woken up by the MQTT notify function when * a message is received on the subscribed topic. */ tstatus = tx_semaphore_get(&g_mqtt_notify_sem, TX_WAIT_FOREVER); if (tstatus != TX_SUCCESS) continue; /* Get the selected IoT service and MQTT client and call the notify callback */ p_mqtt_client = p_iots->p_ctrl->p_mqtt_client; p_ctrl = p_mqtt_client->p_ctrl; while (p_ctrl->num_messages > 0) { p_mqtt_client->p_api->messageGet(p_ctrl); if (p_ctrl->notify) p_ctrl->notify(p_iots->p_ctrl); p_ctrl->num_messages--; } } } static void aws_callback(void *p_context) { iot_service_ctrl_t *p_ctrl = (iot_service_ctrl_t *)p_context; tx_semaphore_put(&p_ctrl->prop_sem); } static ssp_err_t AWS_Open(iot_service_ctrl_t * p_ctrl, iot_service_cfg_t const * const p_cfg) { ssp_err_t status; mqtt_client_api_t *p_api; mqtt_client_ctrl_t *p_mqtt_ctrl; p_ctrl->p_mqtt_client = p_cfg->p_mqtt_client; p_api = p_ctrl->p_mqtt_client->p_api; p_mqtt_ctrl = p_ctrl->p_mqtt_client->p_ctrl; p_mqtt_ctrl->p_iot_service_ctrl = (void *)p_ctrl; p_mqtt_ctrl->notify = aws_callback; p_ctrl->prop_sem = g_aws_semaphore; p_ctrl->prop_mutex = g_aws_mutex; } static ssp_err_t AWS_PropertyGet(iot_service_ctrl_t const * p_ctrl, char const * p_name[], char *p_value[], uint32_t valLen, uint32_t num) { ssp_err_t status; UINT tstatus; mqtt_client_api_t *p_api; mqtt_client_ctrl_t *p_mqtt_ctrl; UCHAR *msg, *buf, *q; unsigned int i; p_api = p_ctrl->p_mqtt_client->p_api; p_mqtt_ctrl = p_ctrl->p_mqtt_client->p_ctrl; msg = p_mqtt_ctrl->msg; status = tx_mutex_get((TX_MUTEX *)&p_ctrl->prop_mutex, TX_WAIT_FOREVER); //snprintf(p_mqtt_ctrl->msg, sizeof(p_mqtt_ctrl->msg), ""); status = p_api->publish(p_mqtt_ctrl, aws_topics[AWS_IOT_TOPIC_GET], NULL, 0, MQTT_QOS_0); if (status != SSP_SUCCESS) { tx_mutex_put((TX_MUTEX *)&p_ctrl->prop_mutex); return status; } tstatus = tx_semaphore_get((TX_SEMAPHORE *)&p_ctrl->prop_sem, 500); if (tstatus != TX_SUCCESS) { tx_mutex_put((TX_MUTEX *)&p_ctrl->prop_mutex); return SSP_ERR_ABORTED; } /* Check if get was accepted */ status = SSP_ERR_INTERNAL; if (strcmp((char *)p_mqtt_ctrl->topic, aws_topics[AWS_IOT_TOPIC_GET_ACCEPTED]) == 0) { /* Get values */ msg = p_mqtt_ctrl->msg; for (i = 0; i < num; i++) { memset(p_value[i], 0, valLen); buf = (UCHAR *)strstr((const char *)msg, "desired"); if (buf == NULL) break; buf += strlen("desired"); buf = (UCHAR *)strstr((const char *)buf, p_name[i]); if (buf == NULL) continue; buf += strlen(p_name[i]); buf = (UCHAR *)strstr((const char *)buf, ":"); if (buf == NULL) continue; buf += 1; buf = (UCHAR *)strstr((const char *)buf, """); buf += 1; q = (UCHAR *)strstr((const char *)buf, """); if (q == NULL) continue; strncpy(p_value[i], (const char *)buf, MIN(valLen, (size_t)(q - buf))); status = SSP_SUCCESS; } } tx_mutex_put((TX_MUTEX *)&p_ctrl->prop_mutex); return status; }