zoukankan      html  css  js  c++  java
  • MQTT事件回调流程

    TLS 如下强调:

      1、每个IOT设备应该有一对独有的公钥/私钥

      2、SERVER的认证通过SERVER的"root certificate"

      

     SSL产生过程:

    $ openssl genrsa -out deviceCertOne.key 2048  
    $ openssl req -new -key deviceCertOne.key -out deviceCertOne.csr  
    $ openssl x509 -req -in deviceCertOne.csr -CA sampleCACertificateOne.pem -Cakey sampleCACertificateOne.key -CAcreateserial -out deviceCertOne.crt -days 365 -sha256  

    数据区分配:

      

      

      程序架构:

      

    关键数据封装:

    /** Protocol definitions */
    typedef enum e_iot_service_protocol {
        IOT_SERVICE_PROTO_MQTT,         ///< MQTT protocol
    } iot_service_protocol_t;
    
    /** IoT Service configuration structure  */
    typedef struct st_iot_service_cfg {
        const char              *p_name;
        iot_service_protocol_t  protocol;
        mqtt_client_instance_t  *p_mqtt_client;
    } iot_service_cfg_t;
    
    /** IoT Service control structure */
    typedef struct st_iot_service_ctrl_t {
        mqtt_client_instance_t  * p_mqtt_client;
        TX_SEMAPHORE         prop_sem;
        TX_MUTEX                prop_mutex;
        int                     debug;
    } iot_service_ctrl_t;
    
    static iot_service_ctrl_t baidu_iot_ctrl;
    
    mqtt_client_instance_t g_mqtt_client = {
         .p_cfg     = &g_mqtt_cfg,
         .p_ctrl    = &g_mqtt_ctrl,
         .p_api     = &nx_mqtt_api,
    };
    
    static iot_service_cfg_t baidu_iot_cfg = {
         .p_name        = "BAIDU",
         .protocol      = IOT_SERVICE_PROTO_MQTT,
         .p_mqtt_client = &g_mqtt_client,
    };
    
    static iot_service_instance_t baidu_iot = {
         .p_cfg         = &baidu_iot_cfg,
         .p_ctrl        = &baidu_iot_ctrl,
         .p_api         = &baidu_iot_api,
    };    
    
    static ssp_err_t BAIDU_Open(iot_service_ctrl_t * p_ctrl, iot_service_cfg_t const * const p_cfg)
    {
        ssp_err_t status;
        mqtt_client_api_t *p_api;
        mqtt_client_ctrl_t *p_mqtt_ctrl;
    
        p_ctrl->p_mqtt_client = p_cfg->p_mqtt_client;
        p_api = p_ctrl->p_mqtt_client->p_api;
        p_mqtt_ctrl = p_ctrl->p_mqtt_client->p_ctrl;
        p_mqtt_ctrl->p_iot_service_ctrl = (void *)p_ctrl;
        p_mqtt_ctrl->notify = baidu_callback;
    
        p_ctrl->prop_sem = g_baidu_semaphore;
        p_ctrl->prop_mutex = g_baidu_mutex;
    }    
    
    void turbine_main(ULONG data)
    {
        ULONG i = 0, freq_ms = CLOUD_UPDATE_FREQ_MS;
        ULONG ticks;
        struct sensors old, new;
        iot_service_instance_t *p_iots;
        const iot_service_api_t *p_api;
        ssp_err_t status;
        UINT retry_count = SECT_UPDATE_RETRY_COUNT;
    
        p_iots = (iot_service_instance_t *)data;
        p_api = p_iots->p_api;
    
        /* Send a pointer to the IoT Service instance via the message
         * queue. This will be picked up by the MQTT Notifier Thread
         */
        tx_queue_send(&g_msg_queue, &p_iots, TX_WAIT_FOREVER);
        tx_thread_resume(&mqtt_callback_thread);
    
        status = p_api->open(p_iots->p_ctrl, p_iots->p_cfg);
    }

    MQTT相关API:

    #define nxd_mqtt_client_create                _nxde_mqtt_client_create
    #define nxd_mqtt_client_login_set             _nxde_mqtt_client_login_set
    #define nxd_mqtt_client_will_message_set      _nxde_mqtt_client_will_message_set
    #define nxd_mqtt_client_delete                _nxde_mqtt_client_delete
    #define nxd_mqtt_client_connect               _nxde_mqtt_client_connect
    #define nxd_mqtt_client_secure_connect        _nxde_mqtt_client_secure_connect
    #define nxd_mqtt_client_publish               _nxde_mqtt_client_publish
    #define nxd_mqtt_client_subscribe             _nxde_mqtt_client_subscribe
    #define nxd_mqtt_client_unsubscribe           _nxde_mqtt_client_unsubscribe
    #define nxd_mqtt_client_disconnect            _nxde_mqtt_client_disconnect
    #define nxd_mqtt_client_receive_notify_set    _nxde_mqtt_client_receive_notify_set
    #define nxd_mqtt_client_message_get           _nxde_mqtt_client_message_get
    #define nxd_mqtt_client_disconnect_notify_set _nxde_mqtt_client_disconnect_notify_set
    
    //Client Id是客户端到服务器唯一的标识,必须是在客户端到服务器唯一的ID,是处理QoS级别1和2消息ID的关键
    //一般使用设备的Unique ID,配置ClientID回调:
    mqtt_client_id_callback()
    //设备Unique ID g_fmi.p_api->uniqueIdGet(&uid)//SERVER获取MQTT数据: nxd_mqtt_client_message_get //MQTT反控设备事件回调: nxd_mqtt_client_receive_notify_set->NX_MQTT_notify_callback->aws_callback->AWS_PropertyGet //MQTT主动上报数据 turbine_main->update_iot_service-> //信号量用于MQTT事件同步: g_mqtt_notify_sem //MQTT AWS地址&认证 g_mqtt_cfg.p_root_ca = (uint8_t *) aws_ca_cert_der; #define AWS_SERVER_ADDRESS "iotmoonraker.us-west-2.prod.iot.us-west-2.amazonaws.com"

    综上:MQTT要注意两个回调,一个是配置获取Client ID的回调,如上:

    另一个为配置获取订阅消息的回调,如下:

    status = nxd_mqtt_client_receive_notify_set(p_ctrl->p_secure_client, NX_MQTT_notify_callback);
    if(status) 
    {
        nxd_mqtt_client_unsubscribe(p_ctrl->p_secure_client, (CHAR *)p_name, strlen(p_name));
        NX_MQTT_log_msg(p_ctrl, "Could not set notify function (0x%02x)
    ", status);
        return SSP_ERR_ABORTED;
    }

    回调:信号量的方式通知可以接受MQTT服务器推送的消息,即客户端订阅的消息

    static void NX_MQTT_notify_callback(NXD_MQTT_CLIENT * p_client, UINT number_of_messages)
    {
        mqtt_client_ctrl_t *p_ctrl;
        p_ctrl = g_mqtt_client.p_ctrl;
        p_ctrl->num_messages += number_of_messages;
        tx_semaphore_put(&g_mqtt_notify_sem);
    }

    MQTT接收事件处理线程:

    void mqtt_callback_thread_entry(void) {
        UINT tstatus;
        mqtt_client_ctrl_t *p_ctrl;
        mqtt_client_instance_t *p_mqtt_client;
        iot_service_instance_t *p_iots;
    
        /* Get the IoT cloud provider via the message queue */
        tx_queue_receive(&g_msg_queue, &p_iots, TX_WAIT_FOREVER);
    
        while (1) {
            /* This thread will be woken up by the MQTT notify function when
             * a message is received on the subscribed topic.
             */
            tstatus = tx_semaphore_get(&g_mqtt_notify_sem, TX_WAIT_FOREVER);
            if (tstatus != TX_SUCCESS)
                continue;
    
            /* Get the selected IoT service and MQTT client and call the notify callback */
            p_mqtt_client = p_iots->p_ctrl->p_mqtt_client;
            p_ctrl = p_mqtt_client->p_ctrl;
            while (p_ctrl->num_messages > 0) {
                p_mqtt_client->p_api->messageGet(p_ctrl);
                if (p_ctrl->notify)
                    p_ctrl->notify(p_iots->p_ctrl);
                p_ctrl->num_messages--;
            }
        }
    }
    static ssp_err_t NX_MQTT_Subscribe(mqtt_client_ctrl_t * p_ctrl, char const * p_name,
                                        mqtt_client_qos_t qos)
    {
        UINT status;
    
        status = nxd_mqtt_client_subscribe(p_ctrl->p_secure_client, (CHAR *)p_name, strlen(p_name), qos);
        if(status) {
            NX_MQTT_log_msg(p_ctrl, "Could not subscribe to MQTT topic %s (0x%02x)
    ", p_name, status);
            return SSP_ERR_ABORTED;
        }
    
        status = nxd_mqtt_client_receive_notify_set(p_ctrl->p_secure_client, NX_MQTT_notify_callback);
        if(status) {
            nxd_mqtt_client_unsubscribe(p_ctrl->p_secure_client, (CHAR *)p_name, strlen(p_name));
            NX_MQTT_log_msg(p_ctrl, "Could not set notify function (0x%02x)
    ", status);
            return SSP_ERR_ABORTED;
        }
    
        return SSP_SUCCESS;
    }
    
    
    static void NX_MQTT_notify_callback(NXD_MQTT_CLIENT * p_client, UINT number_of_messages)
    {
        mqtt_client_ctrl_t *p_ctrl;
    
        SSP_PARAMETER_NOT_USED (p_client);
    
        p_ctrl = g_mqtt_client.p_ctrl;
        p_ctrl->num_messages += number_of_messages;
        tx_semaphore_put(&g_mqtt_notify_sem);
    }
    
    
    void mqtt_callback_thread_entry(void) {
        UINT tstatus;
        mqtt_client_ctrl_t *p_ctrl;
        mqtt_client_instance_t *p_mqtt_client;
        iot_service_instance_t *p_iots;
    
        /* Get the IoT cloud provider via the message queue */
        tx_queue_receive(&g_msg_queue, &p_iots, TX_WAIT_FOREVER);
    
        while (1) {
            /* This thread will be woken up by the MQTT notify function when
             * a message is received on the subscribed topic.
             */
            tstatus = tx_semaphore_get(&g_mqtt_notify_sem, TX_WAIT_FOREVER);
            if (tstatus != TX_SUCCESS)
                continue;
    
            /* Get the selected IoT service and MQTT client and call the notify callback */
            p_mqtt_client = p_iots->p_ctrl->p_mqtt_client;
            p_ctrl = p_mqtt_client->p_ctrl;
            while (p_ctrl->num_messages > 0) {
                p_mqtt_client->p_api->messageGet(p_ctrl);
                if (p_ctrl->notify)
                    p_ctrl->notify(p_iots->p_ctrl);
                p_ctrl->num_messages--;
            }
        }
    }
    
    static void aws_callback(void *p_context)
    {
        iot_service_ctrl_t *p_ctrl = (iot_service_ctrl_t *)p_context;
        tx_semaphore_put(&p_ctrl->prop_sem);
    }
    
    static ssp_err_t AWS_Open(iot_service_ctrl_t * p_ctrl, iot_service_cfg_t const * const p_cfg)
    {
        ssp_err_t status;
        mqtt_client_api_t *p_api;
        mqtt_client_ctrl_t *p_mqtt_ctrl;
    
        p_ctrl->p_mqtt_client = p_cfg->p_mqtt_client;
        p_api = p_ctrl->p_mqtt_client->p_api;
        p_mqtt_ctrl = p_ctrl->p_mqtt_client->p_ctrl;
        p_mqtt_ctrl->p_iot_service_ctrl = (void *)p_ctrl;
        p_mqtt_ctrl->notify = aws_callback;
    
        p_ctrl->prop_sem = g_aws_semaphore;
        p_ctrl->prop_mutex = g_aws_mutex;
    }
    
    static ssp_err_t AWS_PropertyGet(iot_service_ctrl_t const * p_ctrl, char const * p_name[], char *p_value[], uint32_t valLen, uint32_t num)
    {
        ssp_err_t status;
        UINT tstatus;
        mqtt_client_api_t *p_api;
        mqtt_client_ctrl_t *p_mqtt_ctrl;
        UCHAR *msg, *buf, *q;
        unsigned int i;
    
        p_api = p_ctrl->p_mqtt_client->p_api;
        p_mqtt_ctrl = p_ctrl->p_mqtt_client->p_ctrl;
        msg = p_mqtt_ctrl->msg;
    
        status = tx_mutex_get((TX_MUTEX *)&p_ctrl->prop_mutex, TX_WAIT_FOREVER);
    
        //snprintf(p_mqtt_ctrl->msg, sizeof(p_mqtt_ctrl->msg), "");
        status = p_api->publish(p_mqtt_ctrl, aws_topics[AWS_IOT_TOPIC_GET], NULL, 0, MQTT_QOS_0);
        if (status != SSP_SUCCESS) {
            tx_mutex_put((TX_MUTEX *)&p_ctrl->prop_mutex);
            return status;
        }
    
        tstatus = tx_semaphore_get((TX_SEMAPHORE *)&p_ctrl->prop_sem, 500);
        if (tstatus != TX_SUCCESS)
        {
            tx_mutex_put((TX_MUTEX *)&p_ctrl->prop_mutex);
            return SSP_ERR_ABORTED;
        }
        /* Check if get was accepted */
        status = SSP_ERR_INTERNAL;
        if (strcmp((char *)p_mqtt_ctrl->topic, aws_topics[AWS_IOT_TOPIC_GET_ACCEPTED]) == 0) {
            /* Get values */
            msg = p_mqtt_ctrl->msg;
    
            for (i = 0; i < num; i++) {
                memset(p_value[i], 0, valLen);
                buf = (UCHAR *)strstr((const char *)msg, "desired");
                if (buf == NULL)
                    break;
                buf += strlen("desired");
                buf = (UCHAR *)strstr((const char *)buf, p_name[i]);
                if (buf == NULL)
                    continue;
                buf += strlen(p_name[i]);
                buf = (UCHAR *)strstr((const char *)buf, ":");
                if (buf == NULL)
                    continue;
                buf += 1;
                buf = (UCHAR *)strstr((const char *)buf, """);
                buf += 1;
                q = (UCHAR *)strstr((const char *)buf, """);
                if (q == NULL)
                    continue;
                strncpy(p_value[i], (const char *)buf, MIN(valLen, (size_t)(q - buf)));
                status = SSP_SUCCESS;
            }
        }
        tx_mutex_put((TX_MUTEX *)&p_ctrl->prop_mutex);
        return status;
    }
  • 相关阅读:
    27. Remove Element
    列表变成字典
    1. Two Sum
    CVPR2019:What and How Well You Performed? A Multitask Learning Approach to Action Quality Assessment
    959. Regions Cut By Slashes
    118. Pascal's Triangle
    loj3117 IOI2017 接线 wiring 题解
    题解 NOI2019 序列
    题解 省选联考2020 组合数问题
    题解 Educational Codeforces Round 90 (Rated for Div. 2) (CF1373)
  • 原文地址:https://www.cnblogs.com/jiangzhaowei/p/8991531.html
Copyright © 2011-2022 走看看