zoukankan      html  css  js  c++  java
  • MQTT 开源代理mosquitto的网络层封装相当sucks

    最近学习MQTT协议,选择了当前比较流行的MQTT Broker “mosquitto”,但是在阅读代码过程中发现其网络底层库封装的相当差劲。

    对于MQTT协议的变长头长度的读取上,基本上采取每次一个byte的方式进行读取判断,对于系统调用read的高代价来讲,真的是相当的浪费,也难怪其不能作为高并发的服务器进行处理。 

     

    当然mosquitto需要优化的地方还很多:

    1. 使用poll而不是使用epoll (可能是处于跨平台考虑,如果linux下可以使用epoll替换),同时的就是刚才提到的 byte 读取网络数据

    2. 订阅树的管理上,对于大量的请求断开或者重练效率比较低

    3. 空闲空间管理机制优化和数据包发送方式的修改

    4. 内存管理上malloc new 没有使用mem pool机制,在大并发情况下,内存管理容易出现问题

    5. 锁遍地飞,如果采用reactor_ 

    但是从另一个方面讲,mosquitto作为开源的实现,思路上还是比较清晰,为mqtt服务器开发提供了比较完备的参考,这也就是它的价值所在了。

     

    #ifdef WITH_BROKER

    int _mosquitto_packet_read(struct mosquitto_db *db, struct mosquitto *mosq)

    #else

    int _mosquitto_packet_read(struct mosquitto *mosq)

    #endif

    {

        uint8_t byte;

        ssize_t read_length;

        int rc = 0;

        

        if(!mosq) return MOSQ_ERR_INVAL;

        if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;

        if(mosq->state == mosq_cs_connect_pending){

            return MOSQ_ERR_SUCCESS;

        }

        

        /* This gets called if pselect() indicates that there is network data

         * available - ie. at least one byte.  What we do depends on what data we

         * already have.

         * If we've not got a command, attempt to read one and save it. This should

         * always work because it's only a single byte.

         * Then try to read the remaining length. This may fail because it is may

         * be more than one byte - will need to save data pending next read if it

         * does fail.

         * Then try to read the remaining payload, where 'payload' here means the

         * combined variable header and actual payload. This is the most likely to

         * fail due to longer length, so save current data and current position.

         * After all data is read, send to _mosquitto_handle_packet() to deal with.

         * Finally, free the memory and reset everything to starting conditions.

         */

        if(!mosq->in_packet.command){

            read_length = _mosquitto_net_read(mosq, &byte, 1);

            if(read_length == 1){

                mosq->in_packet.command = byte;

    #ifdef WITH_BROKER

    #  ifdef WITH_SYS_TREE

                g_bytes_received++;

    #  endif

                /* Clients must send CONNECT as their first command. */

                if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL;

    #endif

            }else{

                if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */

    #ifdef WIN32

                errno = WSAGetLastError();

    #endif

                if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){

                    return MOSQ_ERR_SUCCESS;

                }else{

                    switch(errno){

                        case COMPAT_ECONNRESET:

                            return MOSQ_ERR_CONN_LOST;

                        default:

                            return MOSQ_ERR_ERRNO;

                    }

                }

            }

        }

        /* remaining_count is the number of bytes that the remaining_length

         * parameter occupied in this incoming packet. We don't use it here as such

         * (it is used when allocating an outgoing packet), but we must be able to

         * determine whether all of the remaining_length parameter has been read.

         * remaining_count has three states here:

         *   0 means that we haven't read any remaining_length bytes

         *   <0 means we have read some remaining_length bytes but haven't finished

         *   >0 means we have finished reading the remaining_length bytes.

         */

        if(mosq->in_packet.remaining_count <= 0){

            do{

                read_length = _mosquitto_net_read(mosq, &byte, 1);

                if(read_length == 1){

                    mosq->in_packet.remaining_count--;

                    /* Max 4 bytes length for remaining length as defined by protocol.

                     * Anything more likely means a broken/malicious client.

                     */

                    if(mosq->in_packet.remaining_count < -4) return MOSQ_ERR_PROTOCOL;

                    

    #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)

                    g_bytes_received++;

    #endif

                    mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;

                    mosq->in_packet.remaining_mult *= 128;

                }else{

                    if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */

    #ifdef WIN32

                    errno = WSAGetLastError();

    #endif

                    if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){

                        return MOSQ_ERR_SUCCESS;

                    }else{

                        switch(errno){

                            case COMPAT_ECONNRESET:

                                return MOSQ_ERR_CONN_LOST;

                            default:

                                return MOSQ_ERR_ERRNO;

                        }

                    }

                }

            }while((byte & 128) != 0);

            /* We have finished reading remaining_length, so make remaining_count

             * positive. */

            mosq->in_packet.remaining_count *= -1;

            

            if(mosq->in_packet.remaining_length > 0){

                mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));

                if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM;

                mosq->in_packet.to_process = mosq->in_packet.remaining_length;

            }

        }

        while(mosq->in_packet.to_process>0){

            read_length = _mosquitto_net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);

            if(read_length > 0){

    #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)

                g_bytes_received += read_length;

    #endif

                mosq->in_packet.to_process -= read_length;

                mosq->in_packet.pos += read_length;

            }else{

    #ifdef WIN32

                errno = WSAGetLastError();

    #endif

                if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){

                    if(mosq->in_packet.to_process > 1000){

                        /* Update last_msg_in time if more than 1000 bytes left to

                         * receive. Helps when receiving large messages.

                         * This is an arbitrary limit, but with some consideration.

                         * If a client can't send 1000 bytes in a second it

                         * probably shouldn't be using a 1 second keep alive. */

                        pthread_mutex_lock(&mosq->msgtime_mutex);

                        mosq->last_msg_in = mosquitto_time();

                        pthread_mutex_unlock(&mosq->msgtime_mutex);

                    }

                    return MOSQ_ERR_SUCCESS;

                }else{

                    switch(errno){

                        case COMPAT_ECONNRESET:

                            return MOSQ_ERR_CONN_LOST;

                        default:

                            return MOSQ_ERR_ERRNO;

                    }

                }

            }

        }

        

        /* All data for this packet is read. */

        mosq->in_packet.pos = 0;

    #ifdef WITH_BROKER

    #  ifdef WITH_SYS_TREE

        g_msgs_received++;

        if(((mosq->in_packet.command)&0xF5) == PUBLISH){

            g_pub_msgs_received++;

        }

    #  endif

        rc = mqtt3_packet_handle(db, mosq);

    #else

        rc = _mosquitto_packet_handle(mosq);

    #endif

        

        /* Free data and reset values */

        _mosquitto_packet_cleanup(&mosq->in_packet);

        

        pthread_mutex_lock(&mosq->msgtime_mutex);

        mosq->last_msg_in = mosquitto_time();

        pthread_mutex_unlock(&mosq->msgtime_mutex);

        return rc;

    }

  • 相关阅读:
    [国家集训队]数颜色 / 维护队列
    【模板】二逼平衡树(线段树+平衡树)
    jenkins实现接口自动化持续集成(python+pytest+ Allure+git)
    Locust快速上手指南
    缓解多分类的样本不均衡问题
    PlayStation@4功能介绍及测试应用
    APP专项测试-弱网测试
    游戏自动化测试-局内战斗
    Windows下JMeter分布式压测环境搭建
    基于simhash的文本去重原理
  • 原文地址:https://www.cnblogs.com/davad/p/4585412.html
Copyright © 2011-2022 走看看