zoukankan      html  css  js  c++  java
  • memcached 线程模型

    memcached 线程模型如下图所示

    memcached 线程可分为两种,一是负责基本事件处理(如启动时间更新,连接请求等)和内存管理等的线程, 二是负责网络读写请求处理的线程即 worker threads 。这里只对 worker threads 进行讨论。

    在分析之前,先看一下相关的数据结构和变量定义:

    1. 连接队列(thread.c)

    /* An item in the connection queue. */
    typedef struct conn_queue_item CQ_ITEM;
    struct conn_queue_item {
        int               sfd;                // 一个连接对应的 socket fd
        enum conn_states   init_state;            
        int               event_flags;        // 网络事件对应的 event flags, libevent概念 
        int               read_buffer_size;
        enum network_transport     transport;    // 传输协议: local_transport(Unix sockets)、 tcp_transport 或 udp_transport
        CQ_ITEM          *next;                // 链表 下一项指针, 用于链接在连接队列中的下一项或空闲项列表中的下一项
    };
    
    /* A connection queue. */
    typedef struct conn_queue CQ;                // 连接队列, 实际上是一个CQ_ITEM的单链表
    struct conn_queue {
        CQ_ITEM *head;
        CQ_ITEM *tail;
        pthread_mutex_t lock;                    // 访问锁
    };

    2. 线程信息(memcached.h)

    typedef struct {
        pthread_t thread_id;        /* unique ID of this thread */            // 线程 ID
        struct event_base *base;    /* libevent handle this thread uses */        // event base
        struct event notify_event;  /* listen event for notify pipe */            // 用于监听管道事件
        int notify_receive_fd;      /* receiving end of notify pipe */            // 管道接受端 fd
        int notify_send_fd;         /* sending end of notify pipe */            // 管道发送端 fd
        struct thread_stats stats;  /* Stats generated by this thread */        // 线程统计数据
        struct conn_queue *new_conn_queue; /* queue of new connections to handle */ // 连接队列
        cache_t *suffix_cache;      /* suffix cache */
    } LIBEVENT_THREAD; // 用于描述 worker thread
    
    typedef struct {
        pthread_t thread_id;        /* unique ID of this thread */
        struct event_base *base;    /* libevent handle this thread uses */
    } LIBEVENT_DISPATCHER_THREAD; // 用于描述 diapatcher thread

    3. 变量定义(thread.c)

    /* Locks for cache LRU operations */
    pthread_mutex_t lru_locks[POWER_LARGEST];                // 用于 lru cache 的锁
    
    /* Connection lock around accepting new connections */
    pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;    // 用于接受新连接的锁
    
    /* Lock for global stats */
    static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; // 用于收集统计信息的锁
    
    /* Lock to cause worker threads to hang up after being woken */
    static pthread_mutex_t worker_hang_lock;            // 用于线程同步的锁, 具体在下面会有交代 
    
    /* Free list of CQ_ITEM structs */
    static CQ_ITEM *cqi_freelist;                    // CQ_ITEM空闲项列表
    static pthread_mutex_t cqi_freelist_lock;        // 相应的访问锁
    
    static pthread_mutex_t *item_locks;                // 数组,存放用于访问 hash 表的锁
    /* size of the item lock hash table */
    static uint32_t item_lock_count;                // 上述数组大小
    unsigned int item_lock_hashpower;                // 上述数组大小以2为底的对数, 即 item_lock_count = 2**item_lock_hashpower
    #define hashsize(n) ((unsigned long int)1<<(n))    // 2 ** n 
    #define hashmask(n) (hashsize(n)-1)                // 2 ** n - 1
    
    static LIBEVENT_DISPATCHER_THREAD dispatcher_thread; // dispatcher 描述
    
    /*
     * Each libevent instance has a wakeup pipe, which other threads
     * can use to signal that they've put a new connection on its queue.
     */
    static LIBEVENT_THREAD *threads; // worker threads 描述
    
    /*
     * Number of worker threads that have finished setting themselves up.
     */
    static int init_count = 0;                // 已启动或重启的线程的数目
    static pthread_mutex_t init_lock;        // 用于启动或重启线程的锁和条件变量
    static pthread_cond_t init_cond;

    我们从 main 函数开始分析:

    int main(int argc, char* argv[]) { // in memcached.c
        // ...
        /* start up worker threads if MT mode */
        memcached_thread_init(settings.num_threads, main_base);
        // ...
    }

    其中settings.num_threads 是 worker threads 的数目, 默认值为 4, 可通过 -t 选项指定, 通常这个值不应超过机器处理器核的数目。
    main_base 是一个指向 event_base 结构的指针, 用于网络事件处理(libevent)。

    下面贴出 memcached_thread_init 的定义:

    /*
     * Initializes the thread subsystem, creating various worker threads.
     *
     * nthreads  Number of worker event handler threads to spawn
     * main_base Event base for main thread
     */
    void memcached_thread_init(int nthreads, struct event_base *main_base) {
        int         i;            // 用于循环
        int         power;        // 指数表示的 item lock table 大小, 实际大小为 2**power。 
                                // item lock table 是一个数组, 保存的是用来访问 hash table 的锁。
    
        // 初始化锁以及条件变量
        for (i = 0; i < POWER_LARGEST; i++) {
            pthread_mutex_init(&lru_locks[i], NULL); 
        }
        pthread_mutex_init(&worker_hang_lock, NULL);
    
        pthread_mutex_init(&init_lock, NULL);
        pthread_cond_init(&init_cond, NULL);
    
        
        pthread_mutex_init(&cqi_freelist_lock, NULL);
        cqi_freelist = NULL; // 初始化 cqi_freelist 
    
        /* Want a wide lock table, but don't waste memory */
        if (nthreads < 3) { // 0 1 2
            power = 10;
        } else if (nthreads < 4) { // 3
            power = 11;
        } else if (nthreads < 5) { // 4
            power = 12;
        } else {
            /* 8192 buckets, and central locks don't scale much past 5 threads */
            power = 13;
        }
    
        // 为了保证线程安全性, 在访问 hash 表时, 需要对当前访问的链(chain)加锁
        // 为了提升访问效率的同时又不浪费过多的存储空间, 要求锁的数目尽可能多但不允许其超过 hash 表的 slots 数
        if (power >= hashpower) {  
            fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)
    ", hashpower, power);
            fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)
    ");
            fprintf(stderr, "Hash table grows with `-o hashpower=N` 
    ");
            exit(1);
        }
    
        item_lock_count = hashsize(power); // 2 ** power
        item_lock_hashpower = power;
    
        // 初始化 item_locks
        item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
        if (! item_locks) {
            perror("Can't allocate item locks");
            exit(1);
        }
        for (i = 0; i < item_lock_count; i++) {
            pthread_mutex_init(&item_locks[i], NULL);
        }
    
        // 为 threads 数组分配空间
        threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
        if (! threads) {
            perror("Can't allocate thread descriptors");
            exit(1);
        }
    
        // 初始化 dispatcher_thread
        dispatcher_thread.base = main_base; // dispatcher thread 即主线程
        dispatcher_thread.thread_id = pthread_self();
    
        // 初始化用于线程通信的管道 pipe
        for (i = 0; i < nthreads; i++) {
            int fds[2];
            if (pipe(fds)) { 
                perror("Can't create notify pipe");
                exit(1);
            }
    
            threads[i].notify_receive_fd = fds[0];
            threads[i].notify_send_fd = fds[1];
    
            setup_thread(&threads[i]); // 初始化 thread[i] 的其他字段
            /* Reserve three fds for the libevent base, and two for the pipe */
            stats.reserved_fds += 5;
        }
    
        /* Create threads after we've done all the libevent setup. */
        for (i = 0; i < nthreads; i++) { // 创建线程 pthread_create
            create_worker(worker_libevent, &threads[i]);
        }
    
        /* Wait for all the threads to set themselves up before returning. */
        pthread_mutex_lock(&init_lock);
        wait_for_thread_registration(nthreads); // 等待所有线程创建完成,利用的是条件变量
        pthread_mutex_unlock(&init_lock);
    }

    setup_thread:

    /*
     * Set up a thread's information.
     */
    static void setup_thread(LIBEVENT_THREAD *me) {
        me->base = event_init(); // 初始化 libevent handle (或 event base)
        if (! me->base) {
            fprintf(stderr, "Can't allocate event base
    ");
            exit(1);
        }
    
        // 管道读事件监听
        /* Listen for notifications from other threads */
        event_set(&me->notify_event, me->notify_receive_fd,
                  EV_READ | EV_PERSIST, thread_libevent_process, me);
        event_base_set(me->base, &me->notify_event);
        if (event_add(&me->notify_event, 0) == -1) {
            fprintf(stderr, "Can't monitor libevent notify pipe
    ");
            exit(1);
        }
    
        // 连接队列初始化
        me->new_conn_queue = malloc(sizeof(struct conn_queue));
        if (me->new_conn_queue == NULL) {
            perror("Failed to allocate memory for connection queue");
            exit(EXIT_FAILURE);
        }
        cq_init(me->new_conn_queue); 
    
        if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
            perror("Failed to initialize mutex");
            exit(EXIT_FAILURE);
        }
    
        me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
                                        NULL, NULL);
        if (me->suffix_cache == NULL) {
            fprintf(stderr, "Failed to create suffix cache
    ");
            exit(EXIT_FAILURE);
        }
    }

    create_worker:

    static void create_worker(void *(*func)(void *), void *arg) { // 线程创建
        pthread_t       thread;
        pthread_attr_t  attr;
        int             ret;
    
        pthread_attr_init(&attr);
    
        if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
            fprintf(stderr, "Can't create thread: %s
    ",
                    strerror(ret));
            exit(1);
        }
    }

    wait_for_thread_registration:

    static void wait_for_thread_registration(int nthreads) {
        while (init_count < nthreads) {
            pthread_cond_wait(&init_cond, &init_lock);
        }
    }

    在执行 wait_for_thread_registration 时, 我们已经持有锁 init_lock。 每初始化完成一个thread,init_count 都会自增1(这个操作需要保证其原子性, 利用init_cond加锁) 并且调用 pthread_cond_signal(&init_cond),这些都是在 register_thread_initialized (in thread.c) 中完成的。

    调用栈: worker_libevent()->register_thread_initialized()->pthread_cond_signal(), 其中 worker_libevent 是 worker 线程的主方法, 其除了调用 register_thread_initialized() 外, 还启动了当前线程的事件处理循环。源代码如下:

    /*
     * Worker thread: main event loop
     */
    static void *worker_libevent(void *arg) {
        LIBEVENT_THREAD *me = arg;
    
        /* Any per-thread setup can happen here; memcached_thread_init() will block until
         * all threads have finished initializing.
         */
    
        register_thread_initialized();
    
        event_base_loop(me->base, 0);
        return NULL;
    }

    看一下 thread_libevent_process (管道读事件处理器 notify event handler):

    /*
     * Processes an incoming "handle a new connection" item. This is called when
     * input arrives on the libevent wakeup pipe.
     */
    static void thread_libevent_process(int fd, short which, void *arg) {
        LIBEVENT_THREAD *me = arg; // 当前线程信息
        CQ_ITEM *item;
        char buf[1];
    
        if (read(fd, buf, 1) != 1) // 读取消息 c 或 p
            if (settings.verbose > 0)
                fprintf(stderr, "Can't read from libevent pipe
    ");
    
        switch (buf[0]) { 
        case 'c': // 处理新连接
        item = cq_pop(me->new_conn_queue);
    
        if (NULL != item) {
            conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                               item->read_buffer_size, item->transport, me->base); 
            if (c == NULL) {
                if (IS_UDP(item->transport)) {
                    fprintf(stderr, "Can't listen for events on UDP socket
    ");
                    exit(1);
                } else {
                    if (settings.verbose > 0) {
                        fprintf(stderr, "Can't listen for events on fd %d
    ",
                            item->sfd);
                    }
                    close(item->sfd);
                }
            } else {
                c->thread = me;
            }
            cqi_free(item);
        }
            break;
        /* we were told to pause and report in */
        case 'p': // 暂停当前线程
            register_thread_initialized(); // 这时 worker_hang_lock 已被其他线程持有, 调用后因在该锁上执行lock操作暂停
            break;
        }
    }

     conn_new (in memcached.c) 初始化连接状态信息。主线程接收到连接请求后就会调用 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport)(这里使用的传输层协议是TCP), dispatch_conn_new 选择一个线程处理连接的网络事件(其实只有读事件)。 

    /* Which thread we assigned a connection to most recently. */
    static int last_thread = -1; // 记录最近处理新连接的线程的索引 (index)
    
    /*
     * Dispatches a new connection to another thread. This is only ever called
     * from the main thread, either during initialization (for UDP) or because
     * of an incoming connection.
     */
    void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                           int read_buffer_size, enum network_transport transport) {
        CQ_ITEM *item = cqi_new(); // 分配一个新的 CQ_ITEM
        char buf[1];
        if (item == NULL) {
            close(sfd);
            /* given that malloc failed this may also fail, but let's try */
            fprintf(stderr, "Failed to allocate memory for connection object
    ");
            return ;
        }
    
        int tid = (last_thread + 1) % settings.num_threads; // 计算处理这个新连接的线程的 index
    
        LIBEVENT_THREAD *thread = threads + tid; // 获取相应的线程描述信息
    
        last_thread = tid; // 更新 last_thread
    
        // 初始化 item
        item->sfd = sfd;
        item->init_state = init_state;
        item->event_flags = event_flags;
        item->read_buffer_size = read_buffer_size;
        item->transport = transport;
    
        cq_push(thread->new_conn_queue, item); // 将 item 添加到线程的连接队列
    
        MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
        buf[0] = 'c';
        if (write(thread->notify_send_fd, buf, 1) != 1) { // 通知相应的线程处理连接
            perror("Writing to thread notify pipe");
        }
    }

    write(thread->notify_send_fd, buf, 1) 成功执行后, 线程会监听到管道读事件, 从管道读取消息 "c", 对连接队列中的新连接进行处理, 源代码见 thread_libevent_process() (管道读事件处理器, 或通知处理器)。

    这篇文章到此为止。 上述代码涉及的其他细节如 CQ_ITEM 的分配和释放等,这里不再作详细说明。至于 hash 表 和 网络通信 会分别在内存管理(TODO : link)和网络通信(TODO : link)这两节给出比较详尽的描述。

  • 相关阅读:
    树形地铁系统[树的最小表示]
    156. 矩阵[二维的hash]
    兔子与兔子
    滑动窗口【单调队列入门题】
    【YBTOJ】生日相同
    【YBTOJ】移位包含
    【YBTOJ】【HDUOJ 3085】逃离噩梦
    【YBTOJ】立体推箱子
    【CodeForces 1408F】Two Different
    【Luogu P3338】[ZJOI2014]力
  • 原文地址:https://www.cnblogs.com/william-cheung/p/4805278.html
Copyright © 2011-2022 走看看