zoukankan      html  css  js  c++  java
  • mosquitto简单应用

    1. 简述

    一款实现了消息推送协议 MQTT v3.1 的开源消息代理软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,比如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。一个典型的应用案例就是 Andy Stanford-ClarkMosquitto(MQTT协议创始人之一)在家中实现的远程监控和自动化,并在 OggCamp 的演讲上,对MQTT协议进行详细阐述。

    官网:

    http://www.mosquitto.org/  

    http://www.mosquitto.org/man/  

    http://www.mosquitto.org/api/

    https://mosquitto.org/man/mosquitto-8.html IBM的一个server(broker)示例用法说明

    2. 函数

    mosquitto结构体:

    struct mosquitto;
    struct mosquitto_message{
        int mid;
        char *topic;
        void *payload;
        int payloadlen;
        int qos;
        bool retain;
    };

     mosquitto支持推送和订阅消息模式:

    /* 
     * Function: mosquitto_publish
     *
     * Publish a message on a given topic.
     * 
     * Parameters:
     *  mosq -       a valid mosquitto instance.
     *  mid -        pointer to an int. If not NULL, the function will set this
     *               to the message id of this particular message. This can be then
     *               used with the publish callback to determine when the message
     *               has been sent.
     *               Note that although the MQTT protocol doesn't use message ids
     *               for messages with QoS=0, libmosquitto assigns them message ids
     *               so they can be tracked with this parameter.
     *  topic -      null terminated string of the topic to publish to.
     *  payloadlen - the size of the payload (bytes). Valid values are between 0 and
     *               268,435,455.
     *  payload -    pointer to the data to send. If payloadlen > 0 this must be a
     *               valid memory location.
     *  qos -        integer value 0, 1 or 2 indicating the Quality of Service to be
     *               used for the message.
     *  retain -     set to true to make the message retained.
     *
     * Returns:
     *  MOSQ_ERR_SUCCESS -      on success.
     *  MOSQ_ERR_INVAL -        if the input parameters were invalid.
     *  MOSQ_ERR_NOMEM -        if an out of memory condition occurred.
     *  MOSQ_ERR_NO_CONN -      if the client isn't connected to a broker.
     *  MOSQ_ERR_PROTOCOL -     if there is a protocol error communicating with the
     *                          broker.
     *  MOSQ_ERR_PAYLOAD_SIZE - if payloadlen is too large.
     *
     * See Also: 
     *  <mosquitto_max_inflight_messages_set>
     */
    libmosq_EXPORT int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain);
    /*
     * Function: mosquitto_subscribe
     *
     * Subscribe to a topic.
     *
     * Parameters:
     *  mosq - a valid mosquitto instance.
     *  mid -  a pointer to an int. If not NULL, the function will set this to
     *         the message id of this particular message. This can be then used
     *         with the subscribe callback to determine when the message has been
     *         sent.
     *  sub -  the subscription pattern.
     *  qos -  the requested Quality of Service for this subscription.
     *
     * Returns:
     *  MOSQ_ERR_SUCCESS - on success.
     *  MOSQ_ERR_INVAL -   if the input parameters were invalid.
     *  MOSQ_ERR_NOMEM -   if an out of memory condition occurred.
     *  MOSQ_ERR_NO_CONN - if the client isn't connected to a broker.
     */
    libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos);
    
    /*
     * Function: mosquitto_unsubscribe
     *
     * Unsubscribe from a topic.
     *
     * Parameters:
     *  mosq - a valid mosquitto instance.
     *  mid -  a pointer to an int. If not NULL, the function will set this to
     *         the message id of this particular message. This can be then used
     *         with the unsubscribe callback to determine when the message has been
     *         sent.
     *  sub -  the unsubscription pattern.
     *
     * Returns:
     *  MOSQ_ERR_SUCCESS - on success.
     *  MOSQ_ERR_INVAL -   if the input parameters were invalid.
     *  MOSQ_ERR_NOMEM -   if an out of memory condition occurred.
     *  MOSQ_ERR_NO_CONN - if the client isn't connected to a broker.
     */
    libmosq_EXPORT int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const char *sub);

    一般使用流程如下:

    mosquitto_lib_init();
    mosq=mosquitto_new();
    
    mosquitto_connect_callback_set();  ----mosquitto_subscribe();
    mosquitto_disconnect_callback_set();
    mosquitto_message_callback_set();  ----接收解析消息 并推送mosquitto_publish()
    
    mosquitto_username_pw_set(mosq, "user", "pw");
    mosquitto_connect();
    mosquitto_loop(mosq, timeout, 1);  // 需要不断循环判断, 也可根据需要永远运行:mosquitto_loop_forever(mosq, timeout, 1)
    
    mosquitto_publish();
    
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();

    3. 应用

    一般使用流程:

    0. 依赖库安装

    apt-get install openssl libssl-dev  uuid-dev

    1. 编译安装
    make 
    make install
    交叉编译:
    CC=arm-linux-gnueabihf-gcc CXX=arm-linux-gnueabihf-g++ make WITH_SRV=no  WITH_UUID=no WITH_TLS=no WITH_DOCS=no WITH_WEBSOCKETS=no
    2. 创建mosquitto用户 mosquitto默认以mosquitto用户启动 groupadd mosquitto useradd -g mosquitto mosquitto 3. 配置文件修改 根据需求修改配置文件/etc/mosquitto/mosquitto.conf
    一般不修改直接可用,本机所有IP都可达,外部访问本机IP可达。
    4. 启动 mosquitto -c /etc/mosquitto/mosquitto.conf -d 5. 测试 mosquitto_sub -t wang/ming mosquitto_pub -m "hello"
    mosquitto_pub -h 172.16.1.20 -p 1883 -t data_topic -m "hello_wang"
    mosquitto_pub -h 172.16.1.20 -p 1883 -t data_topic -m "hello_wang"  -u admin -P 48e848df75c08d4c0ba75bee
    服务器需要配置密码,客户端不用修改可直接采用密码上报

    一个可参考配置文件如下:

     # Port to use for the default listener.
    #port 1883
    port 1883
    
     # Websockets support is currently disabled by default at compile time.
     # Certificate based TLS may be used with websockets, except that
     # only the cafile, certfile, keyfile and ciphers options are supported.
    #protocol mqtt
    listener 9001
    protocol websockets
    listener 8883
    protocol websockets
    
     #cafile
     #capath
     
    certfile /etc/cert/server.crt
    keyfile /etc/cert/server.key
    
     # For example, setting "secure-" here would mean a client "secure-
     # client" could connect but another with clientid "mqtt" couldn't.
    #clientid_prefixes
    clientid_prefixes ABC
     
     # Boolean value that determines whether clients that connect 
     # without providing a username are allowed to connect. If set to 
     # false then a password file should be created (see the 
     # password_file option) to control authenticated client access. 
     # Defaults to true.
    #allow_anonymous true
    allow_anonymous false
     
     # See the TLS client require_certificate and use_identity_as_username options
     # for alternative authentication options.
    #password_file
    password_file /etc/mosquitto/pwfile

    mosquitto是一个server broker,可直接运行测试客户端。

    manpage上一个示例:

    #include <stdio.h>
    #include <mosquitto.h>
    
    void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
    {
        if(message->payloadlen){
            printf("%s %s
    ", message->topic, message->payload);
        }else{
            printf("%s (null)
    ", message->topic);
        }
        fflush(stdout);
    }
    
    void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
    {
        int i;
        if(!result){
            /* Subscribe to broker information topics on successful connect. */
            mosquitto_subscribe(mosq, NULL, "$SYS/#", 2);
        }else{
            fprintf(stderr, "Connect failed
    ");
        }
    }
    
    void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
    {
        int i;
    
        printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
        for(i=1; i<qos_count; i++){
            printf(", %d", granted_qos[i]);
        }
        printf("
    ");
    }
    
    void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
    {
        /* Pring all log messages regardless of level. */
        printf("%s
    ", str);
    }
    
    int main(int argc, char *argv[])
    {
        int i;
        char *host = "localhost";
        int port = 1883;
        int keepalive = 60;
        bool clean_session = true;
        struct mosquitto *mosq = NULL;
    
        mosquitto_lib_init();
        mosq = mosquitto_new(NULL, clean_session, NULL);
        if(!mosq){
            fprintf(stderr, "Error: Out of memory.
    ");
            return 1;
        }
        mosquitto_log_callback_set(mosq, my_log_callback);
        mosquitto_connect_callback_set(mosq, my_connect_callback);
        mosquitto_message_callback_set(mosq, my_message_callback);
        mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
    
        if(mosquitto_connect(mosq, host, port, keepalive)){
            fprintf(stderr, "Unable to connect.
    ");
            return 1;
        }
    
        mosquitto_loop_forever(mosq, -1, 1);
    
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
        return 0;
    }

     自测程序

    #include <stdio.h>
    #include <unistd.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <mosquitto.h>
    
    char peerid[] = "wangq";
    char host[] = "127.0.0.1";
    int port = 1883;
    int keepalive = 60;
    bool clean_session = true;
    struct mosquitto *mosq = NULL;
    pthread_t pmosid = 0;
    
    
    void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
    {
        if(message->payloadlen){
            printf("====>recv:%s %s
    ", message->topic, message->payload);
        }else{
            printf("%s (null)
    ", message->topic);
        }
    
        mosquitto_publish(mosq, NULL, "wang/result", sizeof("loveresult"), "loveresult", 2, false);
        sleep(2);
    
        fflush(stdout);
    }
    
    void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
    {
        if(!result){
            /* Subscribe to broker information topics on successful connect. */
            //mosquitto_subscribe(mosq, NULL, "$SYS/broker/uptime", 2);
            mosquitto_subscribe(mosq, NULL, "wang/hua", 1);
        }else{
            fprintf(stderr, "Connect failed
    ");
        }
    }
    
    void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
    {
        int i;
    
        printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
        for(i=1; i<qos_count; i++){
            printf(", %d", granted_qos[i]);
        }
        printf("
    ");
    }
    
    void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
    {
        /* Pring all log messages regardless of level. */
        printf("====>log:%s
    ", str);
    }
    
    void mos_init()
    {
        mosquitto_lib_init();
        mosq = mosquitto_new(peerid, clean_session, NULL);
        if(!mosq){
            fprintf(stderr, "Error: Out of memory.
    ");
            exit(-1);
        }
        mosquitto_log_callback_set(mosq, my_log_callback);
        mosquitto_connect_callback_set(mosq, my_connect_callback);
        mosquitto_message_callback_set(mosq, my_message_callback);
        mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
        mosquitto_will_set(mosq,"xiao/ming", sizeof("livewill"), "livewill", 2, false);
        mosquitto_threaded_set(mosq, 1);
    }
    
    void * pthread_mos(void *arg)
    {
        int toserver = -1;
        int timeout = 0;
        
        while(toserver){
            toserver = mosquitto_connect(mosq, host, port, keepalive);
            if(toserver){
                timeout++;
                fprintf(stderr, "Unable to connect server [%d] times.
    ", timeout);
                if(timeout > 3){
                    fprintf(stderr, "Unable to connect server, exit.
    " );
                    pthread_exit(NULL);
                }
                sleep(10);
            }
        }
    
        mosquitto_loop_forever(mosq, -1, 1);
    
        mosquitto_disconnect(mosq);
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
    
        pthread_exit(NULL);
    }
    
    int main(int argc, char *argv[])
    {
        int ret = 0;
    
        mos_init();
    
        ret = pthread_create(&pmosid, NULL, pthread_mos, NULL);
        if(ret != 0){
            printf("create pthread mos error.
    ");
            exit(-1);
        }
        pthread_detach(pmosid);
    
        while(1){
            mosquitto_publish(mosq, NULL, "wang/ming", sizeof("love"), "love", 2, false);
            sleep(2);
        }
    
    /*    
        mosquitto_loop_forever(mosq, -1, 1);
    
        while(1){
            mosquitto_publish(mosq, NULL, "wang/qin/hua", sizeof("love"), "love", 2, false);
            sleep(2);
        }
    */
        return 0;
    }

     运行结果:

    #./a.out
    ====>log:Client wangq sending CONNECT
    ====>log:Client wangq received CONNACK (0)
    ====>log:Client wangq sending SUBSCRIBE (Mid: 2, Topic: wang/hua, QoS: 1)
    ====>log:Client wangq sending PUBLISH (d1, q2, r0, m1, 'wang/ming', ... (5 bytes))
    ====>log:Client wangq received SUBACK
    Subscribed (mid: 2): 1
    ====>log:Client wangq received PUBREC (Mid: 1)
    ====>log:Client wangq sending PUBREL (Mid: 1)
    ====>log:Client wangq received PUBCOMP (Mid: 1)
    ====>log:Client wangq sending PUBLISH (d0, q2, r0, m3, 'wang/ming', ... (5 bytes))
    ====>log:Client wangq received PUBREC (Mid: 3)
    ====>log:Client wangq sending PUBREL (Mid: 3)
    ====>log:Client wangq received PUBCOMP (Mid: 3)

    订阅:

    #mosquitto_sub -t wang/ming
    love
    love

    will订阅:

    #mosquitto_sub -t xiao/ming 
    livewill

    参考:

    1. MQTT协议规范 阿里云物理网

    2. Mosquitto安装及使用

    3. MQTT协议及EMQ应用

  • 相关阅读:
    173. Binary Search Tree Iterator
    199. Binary Tree Right Side View
    230. Kth Smallest Element in a BST
    236. Lowest Common Ancestor of a Binary Tree
    337. House Robber III
    449. Serialize and Deserialize BST
    508. Most Frequent Subtree Sum
    513. Find Bottom Left Tree Value
    129. Sum Root to Leaf Numbers
    652. Find Duplicate Subtrees
  • 原文地址:https://www.cnblogs.com/embedded-linux/p/9386169.html
Copyright © 2011-2022 走看看