zoukankan      html  css  js  c++  java
  • MQTT 客户端源码分析

    参看:逍遥子_mosquitto源码分析系列

    参看:MQTT libmosquitto源码分析

    参看:Mosquitto学习笔记

    一、目录结构

    首先我们还是来看一下 mosquitto-1.4.14 的源码目录结构

    我们主要关注 client、lib、src 这三个目录。其中 src 和 lib 目录下主要放置 mosquitto 的实现代码以及部分底层与网络相关的操作,client 目录主要为两个客户端程序的实现源码。

    我们主要就是来看看,这两个客户端的实现源码。

    二、SUB 客户端源码

    首先我们先看 sub_client.c 

    我们从 main 函数开始。
    查看结构体:
    结构体 struct mosq_config 主要为 MQTT 的配置信息
    [cpp] view plain copy
     
    1. struct mosq_config {  
    2.     char *id;  
    3.     char *id_prefix;  
    4.     int protocol_version;  
    5.     int keepalive;  
    6.     char *host;  
    7.     int port;  
    8.     int qos;  
    9.     bool retain;  
    10.     int pub_mode; /* pub */  
    11.     char *file_input; /* pub */  
    12.     char *message; /* pub */  
    13.     long msglen; /* pub */  
    14.     char *topic; /* pub */  
    15.     char *bind_address;  
    16. #ifdef WITH_SRV  
    17.     bool use_srv;  
    18. #endif  
    19.     bool debug;  
    20.     bool quiet;  
    21.     unsigned int max_inflight;  
    22.     char *username;  
    23.     char *password;  
    24. char *will_topic;  
    25.     char *will_payload;  
    26.     long will_payloadlen;  
    27.     int will_qos;  
    28.     bool will_retain;  
    29. #ifdef WITH_TLS  
    30.     char *cafile;  
    31.     char *capath;  
    32.     char *certfile;  
    33.     char *keyfile;  
    34.     char *ciphers;  
    35.     bool insecure;  
    36.     char *tls_version;  
    37. #  ifdef WITH_TLS_PSK  
    38.     char *psk;  
    39.     char *psk_identity;  
    40. #  endif  
    41. #endif  
    42.     bool clean_session; /* sub */  
    43.     char **topics; /* sub */  
    44.     int topic_count; /* sub */  
    45.     bool no_retain; /* sub */  
    46.     char **filter_outs; /* sub */  
    47.     int filter_out_count; /* sub */  
    48. bool verbose; /* sub */  
    49.     bool eol; /* sub */  
    50.     int msg_count; /* sub */  
    51. #ifdef WITH_SOCKS  
    52.     char *socks5_host;  
    53.     int socks5_port;  
    54.     char *socks5_username;  
    55.     char *socks5_password;  
    56. #endif  
    57. };  
     
    结构体 struct mosquito 主要用于保存一个客户端连接的所有信息,例如用户名、密码、用户ID、向该客户端发送的消息等
    [cpp] view plain copy
     
    1. struct mosquitto {  
    2.     mosq_sock_t sock;  
    3. #ifndef WITH_BROKER  
    4.     mosq_sock_t sockpairR, sockpairW;  
    5. #endif  
    6. #if defined(__GLIBC__) && defined(WITH_ADNS)  
    7.     struct gaicb *adns; /* For getaddrinfo_a */  
    8. #endif  
    9.     enum _mosquitto_protocol protocol;  
    10.     char *address;  
    11.     char *id;  
    12.     char *username;  
    13.     char *password;  
    14.     uint16_t keepalive;  
    15.     uint16_t last_mid;  
    16.     enum mosquitto_client_state state;  
    17.     time_t last_msg_in;  
    18.     time_t next_msg_out;  
    19.     time_t ping_t;  
    20.     struct _mosquitto_packet in_packet;  
    21.     struct _mosquitto_packet *current_out_packet;  
    22.     struct _mosquitto_packet *out_packet;  
    23.     struct mosquitto_message *will;  
    24. #ifdef WITH_TLS  
    25.     SSL *ssl;  
    26.     SSL_CTX *ssl_ctx;  
    27.     char *tls_cafile;  
    28.     char *tls_capath;  
    29.     char *tls_certfile;  
    30.     char *tls_keyfile;  
    31.     int (*tls_pw_callback)(char *buf, int size, int rwflag, void *userdata);  
    32.     char *tls_version;  
    33.     char *tls_ciphers;  
    34.     char *tls_psk;  
    35.     char *tls_psk_identity;  
    36.     int tls_cert_reqs;  
    37.     bool tls_insecure;  
    38. #endif  
    39.     bool want_write;  
    40.     bool want_connect;  
    41. #if defined(WITH_THREADING) && !defined(WITH_BROKER)  
    42.     pthread_mutex_t callback_mutex;  
    43.     pthread_mutex_t log_callback_mutex;  
    44.     pthread_mutex_t msgtime_mutex;  
    45.     pthread_mutex_t out_packet_mutex;  
    46.     pthread_mutex_t current_out_packet_mutex;  
    47.     pthread_mutex_t state_mutex;  
    48.     pthread_mutex_t in_message_mutex;  
    49.     pthread_mutex_t out_message_mutex;  
    50.     pthread_mutex_t mid_mutex;  
    51.     pthread_t thread_id;  
    52. #endif  
    53.     bool clean_session;  
    54. #ifdef WITH_BROKER  
    55.     bool is_dropping;  
    56.     bool is_bridge;  
    57.     struct _mqtt3_bridge *bridge;  
    58.     struct mosquitto_client_msg *msgs;  
    59.     struct mosquitto_client_msg *last_msg;  
    60.     int msg_count;  
    61.     int msg_count12;  
    62.     struct _mosquitto_acl_user *acl_list;  
    63.     struct _mqtt3_listener *listener;  
    64.     time_t disconnect_t;  
    65.     struct _mosquitto_packet *out_packet_last;  
    66.     struct _mosquitto_subhier **subs;  
    67.     int sub_count;  
    68.     int pollfd_index;  
    69. #  ifdef WITH_WEBSOCKETS  
    70. #    if defined(LWS_LIBRARY_VERSION_NUMBER)  
    71.     struct lws *wsi;  
    72. #    else  
    73.     struct libwebsocket_context *ws_context;  
    74.     struct libwebsocket *wsi;  
    75. #    endif  
    76. #  endif  
    77.     bool ws_want_write;  
    78. #else  
    79. #  ifdef WITH_SOCKS  
    80.     char *socks5_host;  
    81.     int socks5_port;  
    82.     char *socks5_username;  
    83.     char *socks5_password;  
    84. #  endif  
    85.     void *userdata;  
    86.     bool in_callback;  
    87.     unsigned int message_retry;  
    88.     time_t last_retry_check;  
    89.     struct mosquitto_message_all *in_messages;  
    90.     struct mosquitto_message_all *in_messages_last;  
    91.     struct mosquitto_message_all *out_messages;  
    92.     struct mosquitto_message_all *out_messages_last;  
    93.     void (*on_connect)(struct mosquitto *, void *userdata, int rc);  
    94.     void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);  
    95.     void (*on_publish)(struct mosquitto *, void *userdata, int mid);  
    96.     void (*on_message)(struct mosquitto *, void *userdata, const struct mosquitto_message *message);  
    97.     void (*on_subscribe)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos);  
    98.     void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid);  
    99.     void (*on_log)(struct mosquitto *, void *userdata, int level, const char *str);  
    100.     //void (*on_error)();  
    101.     char *host;  
    102.     int port;  
    103.     int in_queue_len;  
    104.     int out_queue_len;  
    105.     char *bind_address;  
    106.     unsigned int reconnect_delay;  
    107.     unsigned int reconnect_delay_max;  
    108.     bool reconnect_exponential_backoff;  
    109.     char threaded;  
    110.     struct _mosquitto_packet *out_packet_last;  
    111.     int inflight_messages;  
    112.     int max_inflight_messages;  
    113. #  ifdef WITH_SRV  
    114.     ares_channel achan;  
    115. #  endif  
    116. #endif  
    117.   
    118. #ifdef WITH_BROKER  
    119.     UT_hash_handle hh_id;  
    120.     UT_hash_handle hh_sock;  
    121.     struct mosquitto *for_free_next;  
    122. #endif  
    123. };  
     

    client_config_load 客户端配置负载
    第二个参数,可选择选择是 PUB 还是 SUB
     
    然后看到 init_config 函数


    可以看到一些初始化配置
    [cpp] view plain copy
     
    1. void init_config(struct mosq_config *cfg)  
    2. {  
    3.     memset(cfg, 0, sizeof(*cfg));  
    4.     cfg->port = 1883;  
    5.     cfg->max_inflight = 20;   
    6.     cfg->keepalive = 60;   
    7.     cfg->clean_session = true;  
    8.     cfg->eol = true;  
    9.     cfg->protocol_version = MQTT_PROTOCOL_V31;  
    10. }  
     
    mosquitto_lib_init 初始化  (重点)

    [html] view plain copy
     
    1. int mosquitto_lib_init(void)  
    2. {  
    3. #ifdef WIN32          
    4.     srand(GetTickCount());  
    5. #else                 
    6.     struct timeval tv;  
    7.   
    8.     gettimeofday(&tv, NULL);  
    9.     srand(tv.tv_sec*1000 + tv.tv_usec/1000);  
    10. #endif    
    11.   
    12.     _mosquitto_net_init();  
    13.   
    14.     return MOSQ_ERR_SUCCESS;  
    15. }  
    这里有个时间戳函数 gettimeofday,参看:C语言再学习 -- 时间函数
    所在文件 mosquitto-1.4.14/lib/mosquitto.c 所以说需要链接动态库 libmosquitto.so.1
     

    client_id_generate 生成客户端 ID 
    其实就是我们讲MQTT服务器的时候,订阅主题然后在服务器上多出的那一行信息。
    里面的 mosqsub|2431-ubuntu 就是客户端 ID。这个函数就是干这个。
    [cpp] view plain copy
     
    1. 1502159601: New client connected from 127.0.0.1 as mosqsub|2431-ubuntu (c1, k60)    
     
    mosquitto_new 新建一个 mosq。(重点)

    看了一下这个函数里面就是一些初始化的东西
    然后可以看到它也是在 lib 目录下定义的。所以说需要链接动态库 libmosquitto.so.1。其他不用改。
     
    client_ipts_set 各种设置。懒得看...
     
    一些调试信息
    以及订阅回调设置 mosquitto_subscribe_callback_set (重要)
     
    连接回调设置,和信息回调设置(重点)
    这两个函数都是在lib目录下定义的。里面都是有互斥锁的。
     
    client_connect 客户端连接
    int client_connect(struct mosquitto *mosq, struct mosq_config *cfg)
    [cpp] view plain copy
     
    1. {  
    2.     char err[1024];  
    3.     int rc;  
    4.   
    5. #ifdef WITH_SRV  
    6.     if(cfg->use_srv){  
    7.         rc = mosquitto_connect_srv(mosq, cfg->host, cfg->keepalive, cfg->bind_address);  
    8.     }else{  
    9.         rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address);  
    10.     }  
    11. #else  
    12.     rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address);  
    13. #endif  
    14.     if(rc>0){  
    15.         if(!cfg->quiet){  
    16.             if(rc == MOSQ_ERR_ERRNO){  
    17. #ifndef WIN32  
    18.                 strerror_r(errno, err, 1024);  
    19. #else  
    20.                 FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errno, 0, (LPTSTR)&err, 1024, NULL);  
    21. #endif  
    22.                 fprintf(stderr, "Error: %s ", err);  
    23.             }else{  
    24.                 fprintf(stderr, "Unable to connect (%s). ", mosquitto_strerror(rc));  
    25.             }  
    26.         }  
    27.         mosquitto_lib_cleanup();  
    28.         return rc;  
    29.     }  
    30.     return MOSQ_ERR_SUCCESS;  
    31. }  
    可以看到里面又有几个重要函数
    mosquitto_connect_srv
    mosquitto_connect_bind --> _mosquitto_connect_init
     
    然后不断回环,里面的参数自己看。
     
    最后是 mosq 的销毁和库的关闭。(重点)
     
    到此结束!!

    三、PUB 客户端源码

    接下来来看 pub_client.c 有一些相同部分我就不再重复了。
     
    client_config_load 客户端配置负载
    第二个参数,可选择选择是 PUB 还是 SUB


    一些配置信息的比较
     
    mosquitto_lib_init 初始化  (重点)

     

    client_id_generate 生成客户端 ID 
     
    mosquitto_new 新建一个 mosq。(重点)

     
    调试信息这里面就没有了订阅回调设置 mosquitto_subscribe_callback_set
     
    然后这里看这里,是有区别的。(重点)
    connect、disconnect、publish.  这些回调设置
     
    client_ipts_set 各种设置。懒得看...
     
    client_connect 客户端连接
     
    回环开始、结束
     
    最重要的来了,do while循环里的发布内容 (重点)
    这里的 qos 就是消息发布服务质量级别。
     
    然后还有 retain 用于区分新老订阅者
    RETAIN标志位只用于 PUBLISH 消息,当服务器收到某个主题的 PUBLISH 消息时,如果RETAIN标志位为1,则表示服务在将该消息发送给所有的已订阅该主题的订阅者后(发送前服务器将RETAIN标志置为0),还需保持这条消息,当有新增的订阅者时,再将这条消息发给新增的订阅者;如果RETAIN标志位为0,则不保持消息,也不用发给新增的订阅者。
    目的:
    1.将RETAIN标志位置为1,可使新的订阅者收到之前保持的或上一个确定有效的消息。
    2.区分新订阅者(RETAIN标志为1)和老订阅者(RETAIN标志为0)
     
    源码中这两个参数的设置都是 0
     
    最后是 mosq 的销毁和库的关闭。(重点)
  • 相关阅读:
    Luogu P1090 合并果子(优先队列 || priority_queue)
    Luogu P1012 拼数
    hibernate5.2的基本配置
    [bzoj1210][HNOI2004]邮递员【插头dp】
    [bzoj3470]Freda’s Walk【概率与期望dp】
    [bzoj4851][Jsoi2016]位运算【矩阵乘法】【状压dp】
    [bzoj4852][Jsoi2016]炸弹攻击【随机化】
    [bzoj4853][Jsoi2016]飞机调度【最短路】【网络流】
    [bzoj4850][Jsoi2016]灯塔【暴力】
    [bzoj4919][Lydsy1706月赛]大根堆【dp】【启发式合并】【stl】
  • 原文地址:https://www.cnblogs.com/jiangzhaowei/p/8459211.html
Copyright © 2011-2022 走看看