zoukankan      html  css  js  c++  java
  • memcached源码剖析5:并发模型

    网络连接建立与分发

    前面分析了worker线程的初始化,以及主线程创建socket并监听的过程。本节会分析连接如何建立与分发。

    初始状态

    A,可以摸清楚master线程的大致逻辑:

    1)初始化各个worker线程

    2)执行socket,bind,listen...主线程进行监听

    3)一旦有新的连接建立,则调用event_handler

    B,woker线程被创建之后的逻辑:

    1)监听管道recv端的fd,一旦有数据过来,则调用thread_libevent_process

    注意,worker线程其实也是利用event_base_loop将自己进行阻塞。主线程阻塞在监听的fd上,而worker线程则阻塞在监听管道recv端的fd上。回忆一下前文,memcached_thread_init函数中建立了管道,用于master线程和worker线程间的通信。只有master线程接受了新的请求之后,才会利用管道告知worker线程,而worker线程只有等管道有数据传输来的时候,才会被唤醒。

    用图展现初始状态下的master和worker线程:

    图中假设主线程socket函数返回的fd是26。可以看到master线程以及4条worker线程,都因为没有任何事件而处于阻塞状态。

    另外,前文提到一个很重要的结构体conn,conns数组由conn指针组成。memcahed的每个连接都对应有一个conn实例,可以根据fd在conns数组里找到。由于master线程的套接口是26,所以conns[26]的指向的conn就表示master线程正监听的socket以及一些附加信息。

    marster线程接收与分发

    我们来模拟一下有连接过来时,memcached内部的执行。前文提到,一旦有连接过来,则master线程会被触发执行event_handler函数。

    void event_handler(const int fd, const short which, void *arg) {
        conn *c;
    
        c = (conn *)arg;
        assert(c != NULL);
    
        c->which = which;
    
        /* sanity */
        if (fd != c->sfd) {
            if (settings.verbose > 0)
                fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!
    ");
            conn_close(c);
            return;
        }
    
        // 主要逻辑全部封装在drive_machine中
        drive_machine(c);
    
        /* wait for next event */
        return;
    }

    event_handler中传入的conn,就是conns[26]。

    drive_machine

    在event_handler里会继续将这个conn传递给drive_machine。

    static void drive_machine(conn *c) {
        bool stop = false;
        int sfd;
        socklen_t addrlen;
        struct sockaddr_storage addr;
        int nreqs = settings.reqs_per_event;
        int res;
        const char *str;
    #ifdef HAVE_ACCEPT4
        static int  use_accept4 = 1;
    #else
        static int  use_accept4 = 0;
    #endif
    
        assert(c != NULL);
    
        // 一个大的while循环,维护了一个状态机,根据conn的当前状态做出处理,跳到下一状态
        while (!stop) {
    
            switch(c->state) {
            
            // 初始状态为conn_listening
            case conn_listening:
                addrlen = sizeof(addr);
                
                // accept连接,会产生一个新的fd,该连接之后的读写均通过新fd完成
    #ifdef HAVE_ACCEPT4
                if (use_accept4) {
                    sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
                } else {
                    sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
                }
    #else
                sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
    #endif
                // accept失败
                if (sfd == -1) {
                    // 如果是accept4未被实现,则换用accept继续尝试接受连接
                    if (use_accept4 && errno == ENOSYS) {
                        use_accept4 = 0;
                        continue;
                    }
                    perror(use_accept4 ? "accept4()" : "accept()");
                    // 如果连接队列已经没有未处理的连接,则终止循环
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        /* these are transient, so don't log anything */
                        stop = true;
                    }
                    // 连接打满,accept_new_conns(false)会终止event继续触发
                    else if (errno == EMFILE) {
                        if (settings.verbose > 0)
                            fprintf(stderr, "Too many open connections
    ");
                        accept_new_conns(false);
                        stop = true;
                    } else {
                        perror("accept()");
                        stop = true;
                    }
                    break;
                }
                
                // 设置新的套接字为非阻塞
                if (!use_accept4) {
                    if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
                        perror("setting O_NONBLOCK");
                        close(sfd);
                        break;
                    }
                }
    
                if (settings.maxconns_fast &&
                    stats_state.curr_conns + stats_state.reserved_fds >= settings.maxconns - 1) {
                    str = "ERROR Too many open connections
    ";
                    res = write(sfd, str, strlen(str));
                    close(sfd);
                    STATS_LOCK();
                    stats.rejected_conns++;
                    STATS_UNLOCK();
                } else {
                    // !!!分发并通知worker线程有一个新的连接
                    dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                         DATA_BUFFER_SIZE, c->transport);
                }
    
                stop = true;
                break;
    
            case conn_waiting:
                ...
    
            case conn_read:
                ...
    
            case conn_parse_cmd :
                ...
    
            case conn_new_cmd:
                ...
    
            case conn_nread:
                ...
    
            case conn_swallow:
                ...
    
            case conn_write:
                ...
    
            case conn_mwrite:
                ...
    
            case conn_closing:
                ...
    
            case conn_closed:
                ...
    
            case conn_watch:
                ...
    
            case conn_max_state:
                ...
            }
        }
    
        return;
    }

    前文曾提到drive_machine是一个大的状态机。上面的代码只保留了对conn_listening的处理,因为master线程接受新连接时,就是这个状态。

    代码里调用accept4或者accept函数产生新的fd,为27。accept之后,主要就是调用dispatch_conn_new来对连接做分发,并且通知worker线程。

    dispatch_conn_new

    我们来看dispatch_conn_new的实现:

    void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                           int read_buffer_size, enum network_transport transport) {
        
        // CQ_ITEM用于封装新连接的一些信息
        CQ_ITEM *item = cqi_new();
        char buf[1];
        if (item == NULL) {
            close(sfd);
            /* given that malloc failed this may also fail, but let's try */
            fprintf(stderr, "Failed to allocate memory for connection object
    ");
            return ;
        }
    
        // 挑选worker线程,采用轮循机制
        int tid = (last_thread + 1) % settings.num_threads;
    
        LIBEVENT_THREAD *thread = threads + tid;
    
        last_thread = tid;
    
        // 设置item
        item->sfd = sfd;
        item->init_state = init_state;
        item->event_flags = event_flags;
        item->read_buffer_size = read_buffer_size;
        item->transport = transport;
    
        // 将item放入线程的new_conn_queue队列
        cq_push(thread->new_conn_queue, item);
    
        MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
        
        // 通过管道,写入一字节的c,用来达到通知子线程的目的
        buf[0] = 'c';
        if (write(thread->notify_send_fd, buf, 1) != 1) {
            perror("Writing to thread notify pipe");
        }
    }

    可以很明显的看到,worker线程的分发是采用轮循机制的,每次选出来的都是threads数组中的下一个。

    CQ_ITEM与conn_queue队列

    CQ_ITEM结构体用于封装新accept的连接的一些相关信息,每个线程内部都维护着一个CQ_ITEM队列。当主线程通过管道写入字符c之后,子线程会被通知到,有一个新的连接来了。于是,子线程随后立即从CQ_ITEM队列中取出CQ_ITEM,并对这个新的连接设置监听事件等等。子线程具体的实现,后面会分析到,这里先看下CQ_ITEM:

    typedef struct conn_queue_item CQ_ITEM;
    struct conn_queue_item {
        int               sfd;                      // accept产生的新fd
        enum conn_states  init_state;               // 子线程拿到新的连接之后,连接对应的状态
        int               event_flags;              // 子线程对新连接设置的事件
        int               read_buffer_size;         // 一般2M
        enum network_transport     transport;       // tcp or udp
        conn *c;
        CQ_ITEM          *next;
    };

    好,至此我们已经看完了主线程所做的工作。

    只要不断的有新连接进来,主线程就会不断调用event_handler,并在drive_machine状态机中accept & dispatch连接。至于接下来,接收客户端发来的命令并做出响应等等,都是在worker线程里完成。

    这张图画出了子线程中的CQ_ITEM队列,以及主线程通过管道告知子线程。

    注意worker1线的连接,fd分别是27,31,35...因为中间其他fd对应的连接会分别被worker2,worker3,worker4处理。

    另外借用一张其他博客的图,也挺清楚的:

    worker线程设置监听

    本小节开始看worker线程获取通知之后,所做的一些处理。前文说到,marster线程会向管道写入一个字符'c',用来告知worker线程有新的连接了。于是worker线程监听管道的事件被触发,worker线程会进入thread_libevent_process函数:

    static void thread_libevent_process(int fd, short which, void *arg) {
        LIBEVENT_THREAD *me = arg;
        CQ_ITEM *item;
        char buf[1];
        unsigned int timeout_fd;
    
        // 从管道里读取一个字节
        if (read(fd, buf, 1) != 1) {
            if (settings.verbose > 0)
                fprintf(stderr, "Can't read from libevent pipe
    ");
            return;
        }
    
        switch (buf[0]) {
        
        // 字符c
        case 'c':
            // item是从new_conn_queue队列中取出的一个CQ_ITEM对象
            item = cq_pop(me->new_conn_queue);
    
            if (NULL != item) {
                // 调用conn_new来创建conn,并且设置监听事件
                conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                                   item->read_buffer_size, item->transport,
                                   me->base);
                if (c == NULL) {
                    if (IS_UDP(item->transport)) {
                        fprintf(stderr, "Can't listen for events on UDP socket
    ");
                        exit(1);
                    } else {
                        if (settings.verbose > 0) {
                            fprintf(stderr, "Can't listen for events on fd %d
    ",
                                item->sfd);
                        }
                        close(item->sfd);
                    }
                } else {
                    c->thread = me;
                }
                
                // 释放item
                cqi_free(item);
            }
            break;
        case 'r':
            ...
            
        case 'p':
            ...
            
        case 't':
            ...
        }
    }

    注意conn_new函数,conn_new在前文出现过(server_socket函数中用conn_new创建了conns[26])。本例中,conn_new执行完之后,会新创建一个conn对象,并且将conns[27]指向它,从此以后,线程worker1就利用该conn来与客户端交互。

    值得一提的是,新的conn中,state被置为conn_new_cmd,表明该连接已经建立,等待接受client发送的命令而主线程所使用的conns[26],state永远为conn_listening,表明主线程一直在等待新的连接

    在conn_new中,设置事件的语句为:

    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);

    可见,子线程的事件被触发之后,也是调用event_handler函数,和主线程的事件触发的函数一样。

    回忆下event_handler中的状态机,worker线程的conn初始状态为conn_new_cmd,所以一旦worker线程接受到client的命令,便会进入drive_machine中的case conn_new_cmd分支。当然这涉及到后续具体的命令处理,已经不在本文的探讨范畴了。

    最后来看一下worker线程thread_libevent_process处理完毕之后的状态:

  • 相关阅读:
    快速排序
    冒泡排序
    选择排序
    合并排序
    插入排序
    跟我一起阅读Java源代码之HashMap(三)
    跟我一起阅读Java源代码之HashMap(二)
    跟我一起阅读Java源代码之HashMap(一)
    Apache2.2 + tomcat7 服务器集群配置
    Spring+Hibernate实现动态SessionFactory切换(改进版)
  • 原文地址:https://www.cnblogs.com/driftcloudy/p/5882136.html
Copyright © 2011-2022 走看看