zoukankan      html  css  js  c++  java
  • 分布式缓存系统 Memcached 状态机之网络数据读取与解析

    整个状态机的基本流程如下图所示,后续分析将按该流程来进行。

    接上节分解,主线程将接收的连接socket分发给了某工作线程,然后工作线程从任务队列中取出该连接socket的CQ_ITEM,开始处理该连接的所有业务逻辑。这个过程也就是上图中的第一个状态conn_listening。 而工作线程首先进入的状态就是conn_new_cmd,即为这个新的连接做一些准备工作,如清理该连接conn结构的读缓冲区等。

    准备状态conn_new_cmd具体分析如下:

    {
      <span style="font-size:18px;">case conn_new_cmd://为新连接准备:各种清理重置工作
                /* Only process nreqs at a time to avoid starving other
                  connections */
                --nreqs;//记录每个libevent实例处理的最大事件数,通过初始启动参数配置
                if (nreqs >= 0) {//还可以处理请求
                    reset_cmd_handler(c);//缩小缓冲区,转为解析读缓冲区数据的状态,然后转为等待读取网络数据包状态
                } else {//拒绝请求
                    pthread_mutex_lock(&c->thread->stats.mutex);
                    c->thread->stats.conn_yields++;
                    pthread_mutex_unlock(&c->thread->stats.mutex);
                    if (c->rbytes > 0) {//读缓冲区中有数据了,即表明已经读入了数据,因此不再通知读事件
                        /* We have already read in data into the input buffer,
                          so libevent will most likely not signal read events
                          on the socket (unless more data is available. As a
                          hack we should just put in a request to write data,
                          because that should be possible ;-)
                        */
                        if (!update_event(c, EV_WRITE | EV_PERSIST)) {//更新event为写事件,并重新注册到event_bvase
                            if (settings.verbose > 0)
                                fprintf(stderr, "Couldn't update event ");
                            conn_set_state(c, conn_closing);//关闭连接
                            break;
                        }
                    }
                    stop = true;
                }
                break;</span>

    }

    其中整理缓冲区函数reset_cmd_handler函数:首先调用函数conn_shrink缩小该conn的各种缓冲区,然后进入解析状态,解析读缓冲区中未解析的字节,进而转为等待读数据状态(当读缓冲区中没有数据处理时,即进入等待状态)。

    具体分析如下:

    static void reset_cmd_handler(conn *c) {
        c->cmd = -1;
        c->substate = bin_no_state;
        if(c->item != NULL) {
            item_remove(c->item);//删除item
            c->item = NULL;
        }
        conn_shrink(c);//整理缓冲区
        if (c->rbytes > 0) {//缓冲区还有字节未解析
            conn_set_state(c, conn_parse_cmd);//转换为解析状态
        } else {//缓冲区没有数据
            conn_set_state(c, conn_waiting);//转为 等待读取一个数据包 状态 ,状态机没有数据要处理,就进入等待状态  
        }
    }

    其中调用函数conn_shrink,来缩小各缓冲区:

    static void conn_shrink(conn *c) {
        assert(c != NULL);

        if (IS_UDP(c->transport))//如果是UDP协议,不牵涉缓冲区管理
            return;
     //读缓冲区空间大小>READ_BUFFER_HIGHWAT && 还没解析的数据小于 DATA_BUFFER_SIZE 
        if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
            char *newbuf;

            if (c->rcurr != c->rbuf)//如果已经解析了部分数据
                memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);//把读缓冲区中的未解析数据向前移动,已覆盖掉已解析的内容

      //重新分配DATA_BUFFER_SIZE大小的空间作为读缓冲区
            newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);

            if (newbuf) {
                c->rbuf = newbuf;
                c->rsize = DATA_BUFFER_SIZE;
            }
            /* TODO check other branch... */
            c->rcurr = c->rbuf;//以解析数据被覆盖,因此剩下的全部未解析
        }

     ////需要写出(发往客户端)的item的数量
        if (c->isize > ITEM_LIST_HIGHWAT) {
            item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
            if (newbuf) {
                c->ilist = newbuf;
                c->isize = ITEM_LIST_INITIAL;
            }
        /* TODO check error condition? */
        }
     //待发送的消息个数,memcached发送消息是通过sendmsg批量发送
        if (c->msgsize > MSG_LIST_HIGHWAT) {
            struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
            if (newbuf) {
                c->msglist = newbuf;
                c->msgsize = MSG_LIST_INITIAL;
            }
        /* TODO check error condition? */
        }
     //一次性顺序写多个item??
        if (c->iovsize > IOV_LIST_HIGHWAT) {
            struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
            if (newbuf) {
                c->iov = newbuf;
                c->iovsize = IOV_LIST_INITIAL;
            }
        /* TODO check return value */
        }
    }

    由准备状态conn_new_cmd,如果读缓冲区还有未解析数据,则进入解析状态conn_parse_cmd,按协议解析读取到的网络数据。如果没有待处理数据,则进入等待状态conn_waiting。

    memcached采用二进制协议和文本协议两种网络协议,解析时,根据具体的协议解析,然后进入具体命令状态,执行相应具体的操作如:SET GET等待。

    解析状态conn_parse_cmd:

    //解析读缓冲区中的数据
            case conn_parse_cmd :
       //如果缓冲区中有完整的命令行,则读取之,否则继续转为等待状态
                if (try_read_command(c) == 0) {//缓冲区中没有一条完成的命令行,则需要更多的数据,因此继续等待客户端发来数据
                    /* wee need more data! */
                    conn_set_state(c, conn_waiting);//继续进入等待状态
                }

                break;

    解析缓冲区中的一条完整命令:

    //解析缓冲区数据
    static int try_read_command(conn *c) {
        assert(c != NULL);
        assert(c->rcurr <= (c->rbuf + c->rsize));
        assert(c->rbytes > 0);

        if (c->protocol == negotiating_prot || c->transport == udp_transport)  {
            if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
                c->protocol = binary_prot;//二进制协议
            } else {
                c->protocol = ascii_prot;//文本协议
            }

            if (settings.verbose > 1) {
                fprintf(stderr, "%d: Client using the %s protocol ", c->sfd,
                        prot_text(c->protocol));
            }
        }
     //采用二进制协议
        if (c->protocol == binary_prot) {
          //如果二进制协议读取的数据小于二进制协议头部长度,则需要继续读取数据
            if (c->rbytes < sizeof(c->binary_header)) {
                /* need more data! */
                return 0;
            } else {
    #ifdef NEED_ALIGN
       //则按8字节对齐,提高CPU读取的效率
                if (((long)(c->rcurr)) % 8 != 0) {
                  //调整缓冲区
                    memmove(c->rbuf, c->rcurr, c->rbytes);
                    c->rcurr = c->rbuf;
                    if (settings.verbose > 1) {
                        fprintf(stderr, "%d: Realign input buffer ", c->sfd);
                    }
                }
    #endif
                protocol_binary_request_header* req;//二进制协议头部
                req = (protocol_binary_request_header*)c->rcurr;

              //...
      //....
      
            //解析二进制协议数据,根据解析结果进行具体操作,如GET SET等
                dispatch_bin_command(c);

                c->rbytes -= sizeof(c->binary_header);//更新已读取到的字节数
                c->rcurr += sizeof(c->binary_header);//更新缓冲区的路标信息
            }
        } else {//文本协议
     
            //...
     //....
      
      //根据文本协议解析结果,执行具体操作如SET GET。
            process_command(c, c->rcurr);

          
        }

        return 1;
    }

    二进制协议处理函数:dispatch_bin_command。根据二进制协议解析的结果,处理具体的(比如get,set等)操作(进入相应的操作命令状态)。文本协议操作类似。 
    具体的命令如SET,GET,DELETE等操作放到后面讲解。

    /根据二进制协议解析的结果,处理具体的(比如get,set等)操作
    static void dispatch_bin_command(conn *c) {
      //...
      //...
      
        switch (c->cmd) {
        case PROTOCOL_BINARY_CMD_SETQ: //SET命令
            c->cmd = PROTOCOL_BINARY_CMD_SET;
            break;
        case PROTOCOL_BINARY_CMD_ADDQ: //ADD命令
            c->cmd = PROTOCOL_BINARY_CMD_ADD;
            break;
        //...
        case PROTOCOL_BINARY_CMD_DELETEQ:  //DELETE命令
            c->cmd = PROTOCOL_BINARY_CMD_DELETE;
            break;
      //...
      //...
        }

        switch (c->cmd) {
          
        //...
        //....
          case PROTOCOL_BINARY_CMD_GETQ:  /* FALLTHROUGH */
            case PROTOCOL_BINARY_CMD_GET:  /* FALLTHROUGH */
            case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */
            case PROTOCOL_BINARY_CMD_GETK:
                if (extlen == 0 && bodylen == keylen && keylen > 0) {
                    bin_read_key(c, bin_reading_get_key, 0);  //在该函数中: conn_set_state(c, conn_nread),进入读状态,读取指定数目的数据
                } else {
                    protocol_error = 1;
                }
                break;
          
        if (protocol_error)
            handle_binary_protocol_error(c);
     }
    }

    解析完完成后,就该进入具体的命令操作了,如SET  GET  等待。具体就后续分解

    case bin_read_set_value:  
            complete_update_bin(c);//执行Update操作  
            break;  
    case bin_reading_get_key:  
            process_bin_get(c);//执行get操作  
            break;

    当缓冲区中没有可解析的数据时,则进入等待状态。

    等待状态conn_waiting:

    {
    //进入等待状态
            case conn_waiting:
       //更新libevent中对该连接socket注册的事件为读事件,再重新注册。以等待客户端发数据到读缓冲区
                if (!update_event(c, EV_READ | EV_PERSIST)) {//注册为永久事件,直到下次更新该event事件
                    if (settings.verbose > 0)
                        fprintf(stderr, "Couldn't update event ");
                    conn_set_state(c, conn_closing);
                    break;
                }

                conn_set_state(c, conn_read);//转为读状态
                stop = true;
                break;
    }

    其中函数update_event:注意,每次转为读状态,或写状态时,都要更新该连接socket在该工作线程libevent实例中注册的事件event,然后再从新注册回libevent。

    且每次都注册为EV_PERSIST持久事件,直到下次更新该event。

    具体更新过程如下:

    //更新event,再重新注册到event_base中
    static bool update_event(conn *c, const int new_flags) {
        assert(c != NULL);

        struct event_base *base = c->event.ev_base;
        if (c->ev_flags == new_flags)
            return true;
        if (event_del(&c->event) == -1) return false;
        event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
        event_base_set(base, &c->event);
        c->ev_flags = new_flags;
        if (event_add(&c->event, 0) == -1) return false;
        return true;
    }

    切换conn状态的函数:

    //切换状态:将conn的状态设为state
    static void conn_set_state(conn *c, enum conn_states state) {
        assert(c != NULL);
        assert(state >= conn_listening && state < conn_max_state);//检验状态合法性

        if (state != c->state) {
            if (settings.verbose > 2) {
                fprintf(stderr, "%d: going from %s to %s ",
                        c->sfd, state_text(c->state),
                        state_text(state));
            }

            if (state == conn_write || state == conn_mwrite) {
                MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
            }
            c->state = state;//设置为新的状态
        }
    }

    当连接socket读事件就绪时,就进入读状态,读取网络数据,存入读缓冲区。 
    读状态conn_read:

    case conn_read:  
            res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);//判断采用UDP协议还是TCP协议  
      
            switch (res)  
            {  
            case READ_NO_DATA_RECEIVED://未读取到数据  
                conn_set_state(c, conn_waiting);//继续等待  
                break;  
            case READ_DATA_RECEIVED://读取数据  
                conn_set_state(c, conn_parse_cmd);//开始解析数据  
                break;  
            case READ_ERROR://读取发生错误  
                conn_set_state(c, conn_closing);//关闭连接  
                break;  
            case READ_MEMORY_ERROR: //申请内存空间错误,继续尝试  
                break;  
            }  
            break; 

    若采用TCP协议,从网络读取数据,其中调用函数read():

    static enum try_read_result try_read_network(conn *c) {
        enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
        int res;
        int num_allocs = 0;//记录从新分配缓冲区空间的次数,每次空间增倍
        assert(c != NULL);

     //如果原缓冲区中有部分数据已解析,则用未解析数据覆盖以解析部分
        if (c->rcurr != c->rbuf) {
            if (c->rbytes != 0) /* otherwise there's nothing to copy */
                memmove(c->rbuf, c->rcurr, c->rbytes);
            c->rcurr = c->rbuf;
        }

        while (1) {//循环读取数据
            if (c->rbytes >= c->rsize) {
                if (num_allocs == 4) {//如果分配了四次,缓冲区空间还是不够,则返回
                    return gotdata;
                }
                ++num_allocs;
                char *new_rbuf = realloc(c->rbuf, c->rsize * 2);//重分配2倍空间
                if (!new_rbuf) {//分配空间失败,则进入关闭连接状态
                    STATS_LOCK();
                    stats.malloc_fails++;//全局状态
                    STATS_UNLOCK();
                    if (settings.verbose > 0) {
                        fprintf(stderr, "Couldn't realloc input buffer ");
                    }
                    c->rbytes = 0; /* ignore what we read */
                    out_of_memory(c, "SERVER_ERROR out of memory reading request");
                    c->write_and_go = conn_closing;
                    return READ_MEMORY_ERROR;
                }
                c->rcurr = c->rbuf = new_rbuf;
                c->rsize *= 2;
            }

            int avail = c->rsize - c->rbytes;//可用空间大小=总空间- 未解析空间
      //执行网络读取,这个是非阻塞的读
            res = read(c->sfd, c->rbuf + c->rbytes, avail);//从套接字中读取数据,存入读缓冲区中,存放在原来未解析数据的后面
            if (res > 0) {
                pthread_mutex_lock(&c->thread->stats.mutex);
                c->thread->stats.bytes_read += res;//更新线程状态
                pthread_mutex_unlock(&c->thread->stats.mutex);
                gotdata = READ_DATA_RECEIVED; //已读取到数据
                c->rbytes += res;
                if (res == avail) {//最多读取到avail个,如果已经读到了,则可以尝试继续读取 
                    continue;
                } else {
                    break;
                }
            }
            if (res == 0) {//表示已经断开网络连接了  
                return READ_ERROR;
            }
            if (res == -1) {//因为是非阻塞的,所以会返回下面的两个错误码
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    break;
                }//如果返回为负数,且不是上面两个数,则表示发生了其他错误,返回READ_ERROR  
                return READ_ERROR;
            }
        }
        return gotdata;//返回读取结果
    }

    采用UDP是数据报的形式时,每次读取到的都是一个完整的数据报形式。

    函数try_read_udp:

    //UDP读取网络数据  
    static enum try_read_result try_read_udp(conn *c)  
    {  
    int res;  
      
    assert(c != NULL);  
      
    c->request_addr_size = sizeof(c->request_addr);  
    res = recvfrom(c->sfd, c->rbuf, c->rsize, 0, &c->request_addr,  
            &c->request_addr_size);//执行UDP的网络读取  
    if (res > 8)//UDP数据包大小大于8,已经有可能是业务数据包  
    {  
        unsigned char *buf = (unsigned char *) c->rbuf;  
        pthread_mutex_lock(&c->thread->stats.mutex);  
        c->thread->stats.bytes_read += res;//更新每个线程的统计数据  
        pthread_mutex_unlock(&c->thread->stats.mutex);  
      
        /* Beginning of UDP packet is the request ID; save it. */  
        c->request_id = buf[0] * 256 + buf[1];//UDP为了防止丢包,增加了确认字段  
      
        /* If this is a multi-packet request, drop it. */  
        if (buf[4] != 0 || buf[5] != 1)//一些业务的特征信息判断  
        {  
            out_string(c, "SERVER_ERROR multi-packet request not supported");  
            return READ_NO_DATA_RECEIVED;  
        }  
      
        /* Don't care about any of the rest of the header. */  
        res -= 8;  
        memmove(c->rbuf, c->rbuf + 8, res);//调整缓冲区  
      
        c->rbytes = res;//更新信息  
        c->rcurr = c->rbuf;  
        return READ_DATA_RECEIVED;  
    }  
    return READ_NO_DATA_RECEIVED;  

    到此状态机中的主要状态就分析得差不多了,剩下的其他状态主要是一系列具体命令操作,如SET 、GET、  DELETE等,这些正是根据对客户端数据解析的结果所进入的状态,后面将继续分析这些命令的执行过程。

  • 相关阅读:
    Oracle创建表空间用户等
    centos7安装neo4j
    Linux 介绍和命令超详细
    Go 变量声明
    Manjaro 安装 & 配置
    Go Golang安装及环境变量配置
    python3集合与常用操作
    正则以及re库的使用
    Requests 库安装
    数据库-数据库管理系统-数据库系统
  • 原文地址:https://www.cnblogs.com/duanxz/p/5138100.html
Copyright © 2011-2022 走看看