zoukankan      html  css  js  c++  java
  • mosquitto简单应用

    1. 简述

    一款实现了消息推送协议 MQTT v3.1 的开源消息代理软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,比如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。一个典型的应用案例就是 Andy Stanford-ClarkMosquitto(MQTT协议创始人之一)在家中实现的远程监控和自动化,并在 OggCamp 的演讲上,对MQTT协议进行详细阐述。

    官网:

    http://www.mosquitto.org/  

    http://www.mosquitto.org/man/  

    http://www.mosquitto.org/api/

    https://mosquitto.org/man/mosquitto-8.html IBM的一个server(broker)示例用法说明

    2. 函数

    mosquitto结构体:

    struct mosquitto;
    struct mosquitto_message{
        int mid;
        char *topic;
        void *payload;
        int payloadlen;
        int qos;
        bool retain;
    };

     mosquitto支持推送和订阅消息模式:

    /* 
     * Function: mosquitto_publish
     *
     * Publish a message on a given topic.
     * 
     * Parameters:
     *  mosq -       a valid mosquitto instance.
     *  mid -        pointer to an int. If not NULL, the function will set this
     *               to the message id of this particular message. This can be then
     *               used with the publish callback to determine when the message
     *               has been sent.
     *               Note that although the MQTT protocol doesn't use message ids
     *               for messages with QoS=0, libmosquitto assigns them message ids
     *               so they can be tracked with this parameter.
     *  topic -      null terminated string of the topic to publish to.
     *  payloadlen - the size of the payload (bytes). Valid values are between 0 and
     *               268,435,455.
     *  payload -    pointer to the data to send. If payloadlen > 0 this must be a
     *               valid memory location.
     *  qos -        integer value 0, 1 or 2 indicating the Quality of Service to be
     *               used for the message.
     *  retain -     set to true to make the message retained.
     *
     * Returns:
     *  MOSQ_ERR_SUCCESS -      on success.
     *  MOSQ_ERR_INVAL -        if the input parameters were invalid.
     *  MOSQ_ERR_NOMEM -        if an out of memory condition occurred.
     *  MOSQ_ERR_NO_CONN -      if the client isn't connected to a broker.
     *  MOSQ_ERR_PROTOCOL -     if there is a protocol error communicating with the
     *                          broker.
     *  MOSQ_ERR_PAYLOAD_SIZE - if payloadlen is too large.
     *
     * See Also: 
     *  <mosquitto_max_inflight_messages_set>
     */
    libmosq_EXPORT int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain);
    /*
     * Function: mosquitto_subscribe
     *
     * Subscribe to a topic.
     *
     * Parameters:
     *  mosq - a valid mosquitto instance.
     *  mid -  a pointer to an int. If not NULL, the function will set this to
     *         the message id of this particular message. This can be then used
     *         with the subscribe callback to determine when the message has been
     *         sent.
     *  sub -  the subscription pattern.
     *  qos -  the requested Quality of Service for this subscription.
     *
     * Returns:
     *  MOSQ_ERR_SUCCESS - on success.
     *  MOSQ_ERR_INVAL -   if the input parameters were invalid.
     *  MOSQ_ERR_NOMEM -   if an out of memory condition occurred.
     *  MOSQ_ERR_NO_CONN - if the client isn't connected to a broker.
     */
    libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos);
    
    /*
     * Function: mosquitto_unsubscribe
     *
     * Unsubscribe from a topic.
     *
     * Parameters:
     *  mosq - a valid mosquitto instance.
     *  mid -  a pointer to an int. If not NULL, the function will set this to
     *         the message id of this particular message. This can be then used
     *         with the unsubscribe callback to determine when the message has been
     *         sent.
     *  sub -  the unsubscription pattern.
     *
     * Returns:
     *  MOSQ_ERR_SUCCESS - on success.
     *  MOSQ_ERR_INVAL -   if the input parameters were invalid.
     *  MOSQ_ERR_NOMEM -   if an out of memory condition occurred.
     *  MOSQ_ERR_NO_CONN - if the client isn't connected to a broker.
     */
    libmosq_EXPORT int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const char *sub);

    一般使用流程如下:

    mosquitto_lib_init();
    mosq=mosquitto_new();
    
    mosquitto_connect_callback_set();  ----mosquitto_subscribe();
    mosquitto_disconnect_callback_set();
    mosquitto_message_callback_set();  ----接收解析消息 并推送mosquitto_publish()
    
    mosquitto_username_pw_set(mosq, "user", "pw");
    mosquitto_connect();
    mosquitto_loop(mosq, timeout, 1);  // 需要不断循环判断, 也可根据需要永远运行:mosquitto_loop_forever(mosq, timeout, 1)
    
    mosquitto_publish();
    
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();

    3. 应用

    一般使用流程:

    0. 依赖库安装

    apt-get install openssl libssl-dev  uuid-dev

    1. 编译安装
    make 
    make install
    交叉编译:
    CC=arm-linux-gnueabihf-gcc CXX=arm-linux-gnueabihf-g++ make WITH_SRV=no  WITH_UUID=no WITH_TLS=no WITH_DOCS=no WITH_WEBSOCKETS=no
    2. 创建mosquitto用户 mosquitto默认以mosquitto用户启动 groupadd mosquitto useradd -g mosquitto mosquitto 3. 配置文件修改 根据需求修改配置文件/etc/mosquitto/mosquitto.conf
    一般不修改直接可用,本机所有IP都可达,外部访问本机IP可达。
    4. 启动 mosquitto -c /etc/mosquitto/mosquitto.conf -d 5. 测试 mosquitto_sub -t wang/ming mosquitto_pub -m "hello"
    mosquitto_pub -h 172.16.1.20 -p 1883 -t data_topic -m "hello_wang"
    mosquitto_pub -h 172.16.1.20 -p 1883 -t data_topic -m "hello_wang"  -u admin -P 48e848df75c08d4c0ba75bee
    服务器需要配置密码,客户端不用修改可直接采用密码上报

    一个可参考配置文件如下:

     # Port to use for the default listener.
    #port 1883
    port 1883
    
     # Websockets support is currently disabled by default at compile time.
     # Certificate based TLS may be used with websockets, except that
     # only the cafile, certfile, keyfile and ciphers options are supported.
    #protocol mqtt
    listener 9001
    protocol websockets
    listener 8883
    protocol websockets
    
     #cafile
     #capath
     
    certfile /etc/cert/server.crt
    keyfile /etc/cert/server.key
    
     # For example, setting "secure-" here would mean a client "secure-
     # client" could connect but another with clientid "mqtt" couldn't.
    #clientid_prefixes
    clientid_prefixes ABC
     
     # Boolean value that determines whether clients that connect 
     # without providing a username are allowed to connect. If set to 
     # false then a password file should be created (see the 
     # password_file option) to control authenticated client access. 
     # Defaults to true.
    #allow_anonymous true
    allow_anonymous false
     
     # See the TLS client require_certificate and use_identity_as_username options
     # for alternative authentication options.
    #password_file
    password_file /etc/mosquitto/pwfile

    mosquitto是一个server broker,可直接运行测试客户端。

    manpage上一个示例:

    #include <stdio.h>
    #include <mosquitto.h>
    
    void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
    {
        if(message->payloadlen){
            printf("%s %s
    ", message->topic, message->payload);
        }else{
            printf("%s (null)
    ", message->topic);
        }
        fflush(stdout);
    }
    
    void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
    {
        int i;
        if(!result){
            /* Subscribe to broker information topics on successful connect. */
            mosquitto_subscribe(mosq, NULL, "$SYS/#", 2);
        }else{
            fprintf(stderr, "Connect failed
    ");
        }
    }
    
    void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
    {
        int i;
    
        printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
        for(i=1; i<qos_count; i++){
            printf(", %d", granted_qos[i]);
        }
        printf("
    ");
    }
    
    void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
    {
        /* Pring all log messages regardless of level. */
        printf("%s
    ", str);
    }
    
    int main(int argc, char *argv[])
    {
        int i;
        char *host = "localhost";
        int port = 1883;
        int keepalive = 60;
        bool clean_session = true;
        struct mosquitto *mosq = NULL;
    
        mosquitto_lib_init();
        mosq = mosquitto_new(NULL, clean_session, NULL);
        if(!mosq){
            fprintf(stderr, "Error: Out of memory.
    ");
            return 1;
        }
        mosquitto_log_callback_set(mosq, my_log_callback);
        mosquitto_connect_callback_set(mosq, my_connect_callback);
        mosquitto_message_callback_set(mosq, my_message_callback);
        mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
    
        if(mosquitto_connect(mosq, host, port, keepalive)){
            fprintf(stderr, "Unable to connect.
    ");
            return 1;
        }
    
        mosquitto_loop_forever(mosq, -1, 1);
    
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
        return 0;
    }

     自测程序

    #include <stdio.h>
    #include <unistd.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <mosquitto.h>
    
    char peerid[] = "wangq";
    char host[] = "127.0.0.1";
    int port = 1883;
    int keepalive = 60;
    bool clean_session = true;
    struct mosquitto *mosq = NULL;
    pthread_t pmosid = 0;
    
    
    void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
    {
        if(message->payloadlen){
            printf("====>recv:%s %s
    ", message->topic, message->payload);
        }else{
            printf("%s (null)
    ", message->topic);
        }
    
        mosquitto_publish(mosq, NULL, "wang/result", sizeof("loveresult"), "loveresult", 2, false);
        sleep(2);
    
        fflush(stdout);
    }
    
    void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
    {
        if(!result){
            /* Subscribe to broker information topics on successful connect. */
            //mosquitto_subscribe(mosq, NULL, "$SYS/broker/uptime", 2);
            mosquitto_subscribe(mosq, NULL, "wang/hua", 1);
        }else{
            fprintf(stderr, "Connect failed
    ");
        }
    }
    
    void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
    {
        int i;
    
        printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
        for(i=1; i<qos_count; i++){
            printf(", %d", granted_qos[i]);
        }
        printf("
    ");
    }
    
    void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
    {
        /* Pring all log messages regardless of level. */
        printf("====>log:%s
    ", str);
    }
    
    void mos_init()
    {
        mosquitto_lib_init();
        mosq = mosquitto_new(peerid, clean_session, NULL);
        if(!mosq){
            fprintf(stderr, "Error: Out of memory.
    ");
            exit(-1);
        }
        mosquitto_log_callback_set(mosq, my_log_callback);
        mosquitto_connect_callback_set(mosq, my_connect_callback);
        mosquitto_message_callback_set(mosq, my_message_callback);
        mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
        mosquitto_will_set(mosq,"xiao/ming", sizeof("livewill"), "livewill", 2, false);
        mosquitto_threaded_set(mosq, 1);
    }
    
    void * pthread_mos(void *arg)
    {
        int toserver = -1;
        int timeout = 0;
        
        while(toserver){
            toserver = mosquitto_connect(mosq, host, port, keepalive);
            if(toserver){
                timeout++;
                fprintf(stderr, "Unable to connect server [%d] times.
    ", timeout);
                if(timeout > 3){
                    fprintf(stderr, "Unable to connect server, exit.
    " );
                    pthread_exit(NULL);
                }
                sleep(10);
            }
        }
    
        mosquitto_loop_forever(mosq, -1, 1);
    
        mosquitto_disconnect(mosq);
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
    
        pthread_exit(NULL);
    }
    
    int main(int argc, char *argv[])
    {
        int ret = 0;
    
        mos_init();
    
        ret = pthread_create(&pmosid, NULL, pthread_mos, NULL);
        if(ret != 0){
            printf("create pthread mos error.
    ");
            exit(-1);
        }
        pthread_detach(pmosid);
    
        while(1){
            mosquitto_publish(mosq, NULL, "wang/ming", sizeof("love"), "love", 2, false);
            sleep(2);
        }
    
    /*    
        mosquitto_loop_forever(mosq, -1, 1);
    
        while(1){
            mosquitto_publish(mosq, NULL, "wang/qin/hua", sizeof("love"), "love", 2, false);
            sleep(2);
        }
    */
        return 0;
    }

     运行结果:

    #./a.out
    ====>log:Client wangq sending CONNECT
    ====>log:Client wangq received CONNACK (0)
    ====>log:Client wangq sending SUBSCRIBE (Mid: 2, Topic: wang/hua, QoS: 1)
    ====>log:Client wangq sending PUBLISH (d1, q2, r0, m1, 'wang/ming', ... (5 bytes))
    ====>log:Client wangq received SUBACK
    Subscribed (mid: 2): 1
    ====>log:Client wangq received PUBREC (Mid: 1)
    ====>log:Client wangq sending PUBREL (Mid: 1)
    ====>log:Client wangq received PUBCOMP (Mid: 1)
    ====>log:Client wangq sending PUBLISH (d0, q2, r0, m3, 'wang/ming', ... (5 bytes))
    ====>log:Client wangq received PUBREC (Mid: 3)
    ====>log:Client wangq sending PUBREL (Mid: 3)
    ====>log:Client wangq received PUBCOMP (Mid: 3)

    订阅:

    #mosquitto_sub -t wang/ming
    love
    love

    will订阅:

    #mosquitto_sub -t xiao/ming 
    livewill

    参考:

    1. MQTT协议规范 阿里云物理网

    2. Mosquitto安装及使用

    3. MQTT协议及EMQ应用

  • 相关阅读:
    mysql查询字段取前3位,后3位,中间3位,去除前3位,去除后3位
    10月份四季度
    JavaScript箭头函数的立即执行函数实现三元表达式执行多条语句
    JavaScript判断是否是同一天
    项目经理:是兄弟就一起加班吧
    技术人员转型项目经理的角色转换
    项目经理入职后,如何快速管理项目
    如何解决项目成员之间的冲突?
    提高各方面沟通效率,是项目经理该去做的事
    项目计划太复杂?试试思维导图
  • 原文地址:https://www.cnblogs.com/embedded-linux/p/9386169.html
Copyright © 2011-2022 走看看