zoukankan      html  css  js  c++  java
  • MQTT再学习 -- MQTT 客户端源码分析

    MQTT 源码分析,搜索了一下发现网络上讲的很少,多是逍遥子的那几篇。

    参看:逍遥子_mosquitto源码分析系列

    参看:MQTT libmosquitto源码分析

    参看:Mosquitto学习笔记

    一、目录结构
    首先我们还是来看一下 mosquitto-1.4.14 的源码目录结构

     

    我们主要关注 client、lib、src 这三个目录。其中 src 和 lib 目录下主要放置 mosquitto 的实现代码以及部分底层与网络相关的操作,client 目录主要为两个客户端程序的实现源码。

    我们主要就是来看看,这两个客户端的实现源码。

    二、SUB 客户端源码

    首先我们先看 sub_client.c 
    我们从 main 函数开始。

    查看结构体:
    结构体 struct mosq_config 主要为 MQTT 的配置信息
    struct mosq_config {
    char *id;
    char *id_prefix;
    int protocol_version;
    int keepalive;
    char *host;
    int port;
    int qos;
    bool retain;
    int pub_mode; /* pub */
    char *file_input; /* pub */
    char *message; /* pub */
    long msglen; /* pub */
    char *topic; /* pub */
    char *bind_address;
    #ifdef WITH_SRV
    bool use_srv;
    #endif
    bool debug;
    bool quiet;
    unsigned int max_inflight;
    char *username;
    char *password;
    char *will_topic;
    char *will_payload;
    long will_payloadlen;
    int will_qos;
    bool will_retain;
    #ifdef WITH_TLS
    char *cafile;
    char *capath;
    char *certfile;
    char *keyfile;
    char *ciphers;
    bool insecure;
    char *tls_version;
    # ifdef WITH_TLS_PSK
    char *psk;
    char *psk_identity;
    # endif
    #endif
    bool clean_session; /* sub */
    char **topics; /* sub */
    int topic_count; /* sub */
    bool no_retain; /* sub */
    char **filter_outs; /* sub */
    int filter_out_count; /* sub */
    bool verbose; /* sub */
    bool eol; /* sub */
    int msg_count; /* sub */
    #ifdef WITH_SOCKS
    char *socks5_host;
    int socks5_port;
    char *socks5_username;
    char *socks5_password;
    #endif
    };

    结构体 struct mosquito 主要用于保存一个客户端连接的所有信息,例如用户名、密码、用户ID、向该客户端发送的消息等
    struct mosquitto {
    mosq_sock_t sock;
    #ifndef WITH_BROKER
    mosq_sock_t sockpairR, sockpairW;
    #endif
    #if defined(__GLIBC__) && defined(WITH_ADNS)
    struct gaicb *adns; /* For getaddrinfo_a */
    #endif
    enum _mosquitto_protocol protocol;
    char *address;
    char *id;
    char *username;
    char *password;
    uint16_t keepalive;
    uint16_t last_mid;
    enum mosquitto_client_state state;
    time_t last_msg_in;
    time_t next_msg_out;
    time_t ping_t;
    struct _mosquitto_packet in_packet;
    struct _mosquitto_packet *current_out_packet;
    struct _mosquitto_packet *out_packet;
    struct mosquitto_message *will;
    #ifdef WITH_TLS
    SSL *ssl;
    SSL_CTX *ssl_ctx;
    char *tls_cafile;
    char *tls_capath;
    char *tls_certfile;
    char *tls_keyfile;
    int (*tls_pw_callback)(char *buf, int size, int rwflag, void *userdata);
    char *tls_version;
    char *tls_ciphers;
    char *tls_psk;
    char *tls_psk_identity;
    int tls_cert_reqs;
    bool tls_insecure;
    #endif
    bool want_write;
    bool want_connect;
    #if defined(WITH_THREADING) && !defined(WITH_BROKER)
    pthread_mutex_t callback_mutex;
    pthread_mutex_t log_callback_mutex;
    pthread_mutex_t msgtime_mutex;
    pthread_mutex_t out_packet_mutex;
    pthread_mutex_t current_out_packet_mutex;
    pthread_mutex_t state_mutex;
    pthread_mutex_t in_message_mutex;
    pthread_mutex_t out_message_mutex;
    pthread_mutex_t mid_mutex;
    pthread_t thread_id;
    #endif
    bool clean_session;
    #ifdef WITH_BROKER
    bool is_dropping;
    bool is_bridge;
    struct _mqtt3_bridge *bridge;
    struct mosquitto_client_msg *msgs;
    struct mosquitto_client_msg *last_msg;
    int msg_count;
    int msg_count12;
    struct _mosquitto_acl_user *acl_list;
    struct _mqtt3_listener *listener;
    time_t disconnect_t;
    struct _mosquitto_packet *out_packet_last;
    struct _mosquitto_subhier **subs;
    int sub_count;
    int pollfd_index;
    # ifdef WITH_WEBSOCKETS
    # if defined(LWS_LIBRARY_VERSION_NUMBER)
    struct lws *wsi;
    # else
    struct libwebsocket_context *ws_context;
    struct libwebsocket *wsi;
    # endif
    # endif
    bool ws_want_write;
    #else
    # ifdef WITH_SOCKS
    char *socks5_host;
    int socks5_port;
    char *socks5_username;
    char *socks5_password;
    # endif
    void *userdata;
    bool in_callback;
    unsigned int message_retry;
    time_t last_retry_check;
    struct mosquitto_message_all *in_messages;
    struct mosquitto_message_all *in_messages_last;
    struct mosquitto_message_all *out_messages;
    struct mosquitto_message_all *out_messages_last;
    void (*on_connect)(struct mosquitto *, void *userdata, int rc);
    void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);
    void (*on_publish)(struct mosquitto *, void *userdata, int mid);
    void (*on_message)(struct mosquitto *, void *userdata, const struct mosquitto_message *message);
    void (*on_subscribe)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos);
    void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid);
    void (*on_log)(struct mosquitto *, void *userdata, int level, const char *str);
    //void (*on_error)();
    char *host;
    int port;
    int in_queue_len;
    int out_queue_len;
    char *bind_address;
    unsigned int reconnect_delay;
    unsigned int reconnect_delay_max;
    bool reconnect_exponential_backoff;
    char threaded;
    struct _mosquitto_packet *out_packet_last;
    int inflight_messages;
    int max_inflight_messages;
    # ifdef WITH_SRV
    ares_channel achan;
    # endif
    #endif

    #ifdef WITH_BROKER
    UT_hash_handle hh_id;
    UT_hash_handle hh_sock;
    struct mosquitto *for_free_next;
    #endif
    };


    client_config_load 客户端配置负载
    第二个参数,可选择选择是 PUB 还是 SUB

    然后看到 init_config 函数


    可以看到一些初始化配置
    void init_config(struct mosq_config *cfg)
    {
    memset(cfg, 0, sizeof(*cfg));
    cfg->port = 1883;
    cfg->max_inflight = 20;
    cfg->keepalive = 60;
    cfg->clean_session = true;
    cfg->eol = true;
    cfg->protocol_version = MQTT_PROTOCOL_V31;
    }

    mosquitto_lib_init 初始化  (重点)

    int mosquitto_lib_init(void)
    {
    #ifdef WIN32
    srand(GetTickCount());
    #else
    struct timeval tv;

    gettimeofday(&tv, NULL);
    srand(tv.tv_sec*1000 + tv.tv_usec/1000);
    #endif

    _mosquitto_net_init();

    return MOSQ_ERR_SUCCESS;
    }
    这里有个时间戳函数 gettimeofday,参看:C语言再学习 -- 时间函数
    所在文件 mosquitto-1.4.14/lib/mosquitto.c 所以说需要链接动态库 libmosquitto.so.1


    client_id_generate 生成客户端 ID 
    其实就是我们讲MQTT服务器的时候,订阅主题然后在服务器上多出的那一行信息。
    里面的 mosqsub|2431-ubuntu 就是客户端 ID。这个函数就是干这个。
    1502159601: New client connected from 127.0.0.1 as mosqsub|2431-ubuntu (c1, k60)

    mosquitto_new 新建一个 mosq。(重点)

    看了一下这个函数里面就是一些初始化的东西
    然后可以看到它也是在 lib 目录下定义的。所以说需要链接动态库 libmosquitto.so.1。其他不用改。


    client_ipts_set 各种设置。懒得看...


    一些调试信息
    以及订阅回调设置 mosquitto_subscribe_callback_set (重要)

    连接回调设置,和信息回调设置(重点)

    这两个函数都是在lib目录下定义的。里面都是有互斥锁的。

    client_connect 客户端连接

    int client_connect(struct mosquitto *mosq, struct mosq_config *cfg)
    {
    char err[1024];
    int rc;

    #ifdef WITH_SRV
    if(cfg->use_srv){
    rc = mosquitto_connect_srv(mosq, cfg->host, cfg->keepalive, cfg->bind_address);
    }else{
    rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address);
    }
    #else
    rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address);
    #endif
    if(rc>0){
    if(!cfg->quiet){
    if(rc == MOSQ_ERR_ERRNO){
    #ifndef WIN32
    strerror_r(errno, err, 1024);
    #else
    FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errno, 0, (LPTSTR)&err, 1024, NULL);
    #endif
    fprintf(stderr, "Error: %s ", err);
    }else{
    fprintf(stderr, "Unable to connect (%s). ", mosquitto_strerror(rc));
    }
    }
    mosquitto_lib_cleanup();
    return rc;
    }
    return MOSQ_ERR_SUCCESS;
    }
    可以看到里面又有几个重要函数
    mosquitto_connect_srv
    mosquitto_connect_bind --> _mosquitto_connect_init

    然后不断回环,里面的参数自己看。

    最后是 mosq 的销毁和库的关闭。(重点)


    到此结束!!
    三、PUB 客户端源码
    接下来来看 pub_client.c 有一些相同部分我就不再重复了。

    client_config_load 客户端配置负载
    第二个参数,可选择选择是 PUB 还是 SUB


    一些配置信息的比较


    mosquitto_lib_init 初始化  (重点)

     

    client_id_generate 生成客户端 ID 

    mosquitto_new 新建一个 mosq。(重点)


    调试信息这里面就没有了订阅回调设置 mosquitto_subscribe_callback_set


    然后这里看这里,是有区别的。(重点)

    connect、disconnect、publish.  这些回调设置


    client_ipts_set 各种设置。懒得看...

    client_connect 客户端连接


    回环开始、结束


    最重要的来了,do while循环里的发布内容 (重点)

    这里的 qos 就是消息发布服务质量级别。


    然后还有 retain 用于区分新老订阅者
    RETAIN标志位只用于 PUBLISH 消息,当服务器收到某个主题的 PUBLISH 消息时,如果RETAIN标志位为1,则表示服务在将该消息发送给所有的已订阅该主题的订阅者后(发送前服务器将RETAIN标志置为0),还需保持这条消息,当有新增的订阅者时,再将这条消息发给新增的订阅者;如果RETAIN标志位为0,则不保持消息,也不用发给新增的订阅者。
    目的:
    1.将RETAIN标志位置为1,可使新的订阅者收到之前保持的或上一个确定有效的消息。
    2.区分新订阅者(RETAIN标志为1)和老订阅者(RETAIN标志为0)

    源码中这两个参数的设置都是 0


    最后是 mosq 的销毁和库的关闭。(重点)


    ---------------------
    作者:聚优致成
    来源:CSDN
    原文:https://blog.csdn.net/qq_29350001/article/details/77161537
    版权声明:本文为博主原创文章,转载请附上博文链接!

  • 相关阅读:
    消息中间件——RabbitMQ(一)Windows/Linux环境搭建(完整版)
    数据结构与算法(一):带你了解时间复杂度和空间复杂度到底是什么?
    作为一技术人员,面试前都需要做哪些准备?
    消息中间件——RocketMQ(一) 环境搭建(完整版)
    从“数学归纳法”到理解“递归算法”!
    深入浅出了解“装箱与拆箱”
    PMP备考指南之第二章:项目运作环境
    PMP备考指南之第一章:引论
    PMP备考指南之相关事项介绍
    7月新的开始
  • 原文地址:https://www.cnblogs.com/jjg0519/p/9969359.html
Copyright © 2011-2022 走看看