zoukankan      html  css  js  c++  java
  • 分布式缓存系统 Memcached 状态机之socket连接与派发

    上节已经分析到了主线程中监听socket注册事件和工作线程中连接socket注册事件的回调函数都是event_handler,且event_handler的核心部分都是一个有限状态机:drive_machine。因此接下来将对该状态机具体的业务处理进行深入的剖析。

    memcached将每个socket都封装为一个conn结构体,该结构体包含了比如socket的文件描述符sfd、注册事件event、连接状态结构体conn_states,等等诸多信息字段,其中的状态结构:conn_states中包含了该socket的各种状态。 而状态机drive_machine正是通过该状态结构来判断该socket当前所处的具体状态,从而进行业务逻辑处理的。

    其中连接状态结构体如下:

    //socket的可能状态组成的结构体
    enum conn_states {
        conn_listening,  //监听状态/**< the socket which listens for connections */
        conn_new_cmd,    //为下一个连接做准备/**< Prepare connection for next command */
        conn_waiting,    //等待读取一个数据包/**< waiting for a readable socket */
        conn_read,      //读取网络数据/**< reading in a command line */
        conn_parse_cmd,  //解析缓冲区数据/**< try to parse a command from the input buffer */
        conn_write,      //简单的回复数据/**< writing out a simple response */
        conn_nread,      //读取固定字节的网络数据/**< reading in a fixed number of bytes */
        conn_swallow,    //处理不需要的写缓冲区的数据/**< swallowing unnecessary bytes w/o storing */
        conn_closing,    //关闭连接/**< closing this connection */
        conn_mwrite,    //顺序写入多个item数据  /**< writing out many items sequentially */
        conn_closed,    //连接已关闭/**< connection is closed */
        conn_max_state  //最大状态,断言使用/**< Max state value (used for assertion) */
    };

    接下来看下drive_machine的概貌吧,其中主要就是一个while循环以处理各状态的业务逻辑:

    //监听套接字和 连接套接字 事件回调函数的核心部分:
    //有限状态机:根据套接字的状态conn_sattes执行对应的操作
    static void drive_machine(conn *c) {
        bool stop = false;
        int sfd;
        socklen_t addrlen;
        struct sockaddr_storage addr;
        int nreqs = settings.reqs_per_event;
        int res;
        const char *str;

        assert(c != NULL);
     //因为状态间存在转化或跳变等,因此需要循环,直到确定stop为止
        while (!stop) {

      //对套接字的各种状态,进行对应业务处理
            switch(c->state) {
            case conn_listening://监听状态
                addrlen = sizeof(addr);

       //
       //
       //
        //主线程进入状态机之后执行accept操作,这个操作也是非阻塞的。
                sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
    #endif
              //连接失败
                if (sfd == -1) {
       //
       //
                  
                }
       //连接成功,则将连接socket设为非阻塞
                if (!use_accept4) {
                    if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
                        perror("setting O_NONBLOCK");
                        close(sfd);
                        break;
                    }
                }

       //如果超过最大连接数(根据全局状态结构的记录判断),则需要关闭连接
                if (settings.maxconns_fast &&
                    stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
                    //
        //
                } else {//如果没有超载,则直接分发(UDP,不需要建立连接,直接分发)工作线程
                    dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                        DATA_BUFFER_SIZE, tcp_transport);
                }

                stop = true;
                break;

       
            case conn_waiting:

            case conn_read:
                
            case conn_parse_cmd :
                
            case conn_nread:
      //以及其他各种状态
                
      return;
      }
    }

    本小节要着重分析的是第一个状态 conn_listening:

    该状态是主线程监听socket的业务处理:监听套接字,接受,并将得到的连接socket分发给选中的某个工作线程。

     switch(c->state) {
            case conn_listening://监听状态
                addrlen = sizeof(addr);
    #ifdef HAVE_ACCEPT4
                if (use_accept4) {
                    sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
                } else {
                    sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
                }
    #else
      //主线程进入状态机之后执行accept操作,这个操作也是非阻塞的。
                sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
    #endif
                if (sfd == -1) {
                    if (use_accept4 && errno == ENOSYS) {
                        use_accept4 = 0;
                        continue;
                    }
                    perror(use_accept4 ? "accept4()" : "accept()");
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        /* these are transient, so don't log anything */
                        stop = true;
                    } else if (errno == EMFILE) {//连接超载
                        if (settings.verbose > 0)
                            fprintf(stderr, "Too many open connections ");
                        accept_new_conns(false);
                        stop = true;
                    } else {
                        perror("accept()");
                        stop = true;
                    }
                    break;
                }
       //连接成功,则将连接socket设为非阻塞
                if (!use_accept4) {
                    if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
                        perror("setting O_NONBLOCK");
                        close(sfd);
                        break;
                    }
                }

       //如果超过设置的同时在线最大连接数(默认为1024)(根据全局状态结构的记录判断),则需要关闭连接
                if (settings.maxconns_fast &&
                    stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
                    str = "ERROR Too many open connections ";
                    res = write(sfd, str, strlen(str));
                    close(sfd);
                    STATS_LOCK();
                    stats.rejected_conns++;
                    STATS_UNLOCK();
                } else {//如果没有超载,则直接分发(UDP,不需要建立连接,直接分发)工作线程
                    dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                        DATA_BUFFER_SIZE, tcp_transport);
                }

                stop = true;
                break;
            }

    其中工作线程的选择采用轮询(round-robin)方式。连接socket的派发函��是dispath_conn_new:

    //主线程在监听套接字的回调函数中,当有新连接到来时, 调用该函数将接受到的新连接socket分发给工作线程
    //注意:由于UDP不需要建立连接,所以直接分发给Worker线程
    void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                          int read_buffer_size, enum network_transport transport) {
        CQ_ITEM *item = cqi_new();//从CQ_ITEM资源池中取得一个空闲ITEM
        char buf[1];
        if (item == NULL) {
            close(sfd);
            /* given that malloc failed this may also fail, but let's try */
            fprintf(stderr, "Failed to allocate memory for connection object ");
            return ;
        }

        int tid = (last_thread + 1) % settings.num_threads;//通过round-robin算法选择一个线程

        LIBEVENT_THREAD *thread = threads + tid;//缓存这次选中的线程

        last_thread = tid;//更新最近一次选中的线程编号

     //设置CQ_ITEM的各字段
        item->sfd = sfd;//sfd是连接socket
        item->init_state = init_state;
        item->event_flags = event_flags;
        item->read_buffer_size = read_buffer_size;
        item->transport = transport;

     //主线程将item投递到选中的工作线程的ITEM连接队列中
        cq_push(thread->new_conn_queue, item);

        MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
        buf[0] = 'c';
      //管道通知:在Worker线程的notify_send_fd写入字符c,表示有连接    
        if (write(thread->notify_send_fd, buf, 1) != 1) {
            perror("Writing to thread notify pipe");
        }
    }

    可以看到,在该派发函数中首先从CQ_ITEM资源池(空闲链表)中提取一个ITEM,并设置为该连接socket的各字段信息,然后以采用轮询方式选择一个工作线程,再将该ITEM放入该工作线程的连接任务队列CQ中,最后通过通知管道的写端,写入通知信息。    接下来就是前面已经分析过的工作线程来负责处理该连接socket的所有业务了。

  • 相关阅读:
    matlab2016b -ubuntu 1604 -install- and -trouble -shooting--finally-all is ok!!
    cvpr2017-code-etc
    汇率换算自然语言理解功能JAVA DEMO
    聚焦新相亲时代:女孩在京有五六套房哭着想嫁富2代
    cvpr2017年的所有论文下载
    公司危机、下岗困局、不受重视,程序员该如何面对职场挫折?
    利用CH341A编程器刷新BIOS,恢复BIOS,妈妈再也不用担心BIOS刷坏了
    垃圾人定律和垃圾人生存方式定律
    90后女孩的杀身之祸----悲剧酿成--放弃所有的虚构的故事后,你终会发现,真实平淡的现实才是最美好的。
    仓央嘉措比较著名的诗
  • 原文地址:https://www.cnblogs.com/duanxz/p/5138099.html
Copyright © 2011-2022 走看看