zoukankan      html  css  js  c++  java
  • 分布式缓存系统 Memcached 工作线程初始化

    Memcached采用典型的Master-Worker模式,其核心思想是:有Master和Worker两类进程(线程)协同工作,Master进程负责接收和分配任务,Worker进程负责处理子任务。当各Worker进程将各个子任务处理完成后,将结果返回给Master进程,由Master进程做归纳和汇总。 

    工作示意图如下所示:

    其中每个工作线程维护一个连接队列,以接收由主线程分配的客户连接;同时每个工作线程维护一个Libevent实例,以处理和主线程间的管道通信以及和客户连接间的socket通信事件。 

    工作线程的创建与初始化等工作由函数void thread_init(int nthreads, struct event_base *main_base) 

    调用相应函数来完成。

    //Memcached内部工作线程的封装    
    typedef struct {    
        pthread_t thread_id;      //线程ID    
        struct event_base *base;  //libevent的不是线程安全的,每个工作线程持有一个libevent实例,用于pipe管道通信和socket通信    
        struct event notify_event; //用于监听pipe管道的libevent事件    
        int notify_receive_fd;      //接收pipe管道消息描述符    
        int notify_send_fd;        //发送pipe管道消息描述符    
        struct thread_stats stats;  //每个线程对应的统计信息    
        struct conn_queue *new_conn_queue; //每个线程都有一个工作队列,主线程接受的连接,挂载到该消息队列中    
        cache_t *suffix_cache;      //后缀cache    
        uint8_t item_lock_type;    //线程操作hash表持有的锁类型,有局部锁和全局锁    
    } LIBEVENT_THREAD;

    //分发线程的封装    
    typedef struct {    
        pthread_t thread_id;        //线程id    
        struct event_base *base;    //libevent实例    
    } LIBEVENT_DISPATCHER_THREAD;    
        
    //工作线程绑定到libevent实例    
    static void setup_thread(LIBEVENT_THREAD *me) {    
        me->base = event_init();//创建libevent实例    
        if (! me->base) {    
            fprintf(stderr, "Can't allocate event base ");    
            exit(1);    
        }     

    //工作线程的初始化:对应liibevent实例的创建与设置,绑定到该libevent实例、注册事件、创建活动连接队列、创建设置线程,分别执行libevent事件循环:Memcached采用了典型的Master-Worker的线程模式,  
    //Master就是由main线程来充当,而Worker线程则是通过Pthread创建的。  
    void thread_init(int nthreads, struct event_base *main_base) {//该函数调用其他函数来完成线程创建、初始化等工作。  
        int        i;  
        int        power;  
      
        pthread_mutex_init(&cache_lock, NULL);  
        pthread_mutex_init(&stats_lock, NULL);  
      
        pthread_mutex_init(&init_lock, NULL);  
        pthread_cond_init(&init_cond, NULL);  
      
        pthread_mutex_init(&cqi_freelist_lock, NULL);  
        cqi_freelist = NULL;  
      
        /* Want a wide lock table, but don't waste memory */  
        if (nthreads < 3) {  
            power = 10;  
        } else if (nthreads < 4) {  
            power = 11;  
        } else if (nthreads < 5) {  
            power = 12;  
        } else {  
            /* 8192 buckets, and central locks don't scale much past 5 threads */  
            power = 13;  
        }  
      
        item_lock_count = hashsize(power);  
        item_lock_hashpower = power;  
      
        item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));  
        if (! item_locks) {  
            perror("Can't allocate item locks");  
            exit(1);  
        }  
        for (i = 0; i < item_lock_count; i++) {  
            pthread_mutex_init(&item_locks[i], NULL);  
        }  
        pthread_key_create(&item_lock_type_key, NULL);  
        pthread_mutex_init(&item_global_lock, NULL);  
      
        threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));  
        if (! threads) {  
            perror("Can't allocate thread descriptors");  
            exit(1);  
        }  
      
        dispatcher_thread.base = main_base;  
        dispatcher_thread.thread_id = pthread_self();  
        //工作线程的初始化,工作线程和主线程(main线程)是通过pipe管道进行通信的    
        for (i = 0; i < nthreads; i++) {  
            int fds[2];  
            if (pipe(fds)) {  
                perror("Can't create notify pipe");  
                exit(1);  
            }  
      
            threads[i].notify_receive_fd = fds[0];//读管道绑定到工作线程的接收消息的描述符    
            threads[i].notify_send_fd = fds[1];//写管道绑定到工作线程的发送消息的描述符    
      
            setup_thread(&threads[i]);//添加工作线程到lievent中  
            /* Reserve three fds for the libevent base, and two for the pipe */  
            stats.reserved_fds += 5;  
        }  
      
        /* Create threads after we've done all the libevent setup. */  
        for (i = 0; i < nthreads; i++) {  
            create_worker(worker_libevent, &threads[i]);//创建线程,执行libevnt事件循环  
        }  
      
        /* Wait for all the threads to set themselves up before returning. */  
        pthread_mutex_lock(&init_lock);  
        wait_for_thread_registration(nthreads);  
        pthread_mutex_unlock(&init_lock);  

    /* 
     * Set up a thread's information. 
     */  
    //工作线程绑定到libevent实例,注册该线程的通知事件,创建并初始化其消息队列(即就绪事件队列)  
    static void setup_thread(LIBEVENT_THREAD *me) {  
        me->base = event_init();  
        if (! me->base) {  
            fprintf(stderr, "Can't allocate event base ");  
            exit(1);  
        }  
      
        /* Listen for notifications from other threads */  
    //设置event对象:设置事件ev绑定文件描述符或信号值fd(定时事件设为-1);设置事件类型;设置回调函数及参数  
        event_set(&me->notify_event, me->notify_receive_fd,  
                  EV_READ | EV_PERSIST, thread_libevent_process, me);  
        //设置事件ev从属的event_base,以及ev的优先级  
        event_base_set(me->base, &me->notify_event);  
          
    //往事件队列中注册事件处理器,并将对应事件添加到事件多路分发器上  
    //内部调用event_add_internal函数  
        if (event_add(&me->notify_event, 0) == -1) {  
            fprintf(stderr, "Can't monitor libevent notify pipe ");  
            exit(1);  
        }  
      
     //创建消息队列,用于接受主线程分发的连接    
        me->new_conn_queue = malloc(sizeof(struct conn_queue));  
        if (me->new_conn_queue == NULL) {  
            perror("Failed to allocate memory for connection queue");  
            exit(EXIT_FAILURE);  
        }  
        cq_init(me->new_conn_queue);//初始化连接队列(单链表)  
    //创建线程的后缀cache??  
        if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {  
            perror("Failed to initialize mutex");  
            exit(EXIT_FAILURE);  
        }  
      
        me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),  
                                        NULL, NULL);  
        if (me->suffix_cache == NULL) {  
            fprintf(stderr, "Failed to create suffix cache ");  
            exit(EXIT_FAILURE);  
        }  

    //创建工作线程  
    static void create_worker(void *(*func)(void *), void *arg) {  
        pthread_t      thread;  
        pthread_attr_t  attr;  
        int            ret;  
      
        pthread_attr_init(&attr);//Posix线程部分,线程属性初始化    
      
        //创建线程:外部传入的线程函数worker_libevent  
        if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {  
            fprintf(stderr, "Can't create thread: %s ",  
                    strerror(ret));  
            exit(1);  
        }  
    }

    //线程处理函数 :执行libevent事件循环(处理与主线程的通信事件和所分配到的客户连接事件的处理)
      
    static void *worker_libevent(void *arg) {  
        LIBEVENT_THREAD *me = arg;  
      
      
        /* Any per-thread setup can happen here; thread_init() will block until 
        * all threads have finished initializing. 
        */  
      
      
        /* set an indexable thread-specific memory item for the lock type. 
        * this could be unnecessary if we pass the conn *c struct through 
        * all item_lock calls... 
        */  
      //默认的hash表的锁为局部锁    
        me->item_lock_type = ITEM_LOCK_GRANULAR;    
        pthread_setspecific(item_lock_type_key, &me->item_lock_type);//设定线程的属性    
        //用于控制工作线程初始化,通过条件变量来控制    
        register_thread_initialized();    
        //工作线程的libevent实例启动    
        event_base_loop(me->base, 0);  
        return NULL;  

    其中Libevent的事件循环函数event_baseloop:(代码来自Libevent框架源码),其处理流程如图:

    具体代码如下:

    //事件处理主循环:调用I/O事件多路分发器监听函数,以等待事件发生,  
    //当有事件发生时将其插入到活动事件队列数组对应优先级的队列中,然后调用其事件的回调函数依次处理事件。  
    int  
    event_base_loop(struct event_base *base, int flags)  
    {  
        const struct eventop *evsel = base->evsel;  
        struct timeval tv;  
        struct timeval *tv_p;  
        int res, done, retval = 0;  
      
        /* Grab the lock.  We will release it inside evsel.dispatch, and again 
        * as we invoke user callbacks. */  
        EVBASE_ACQUIRE_LOCK(base, th_base_lock);//获取对event_base的独占锁  
      
        //如果事件循环已经启动,则不能再启动  
        if (base->running_loop) {  
            event_warnx("%s: reentrant invocation.  Only one event_base_loop"  
                " can run on each event_base at once.", __func__);  
            EVBASE_RELEASE_LOCK(base, th_base_lock);//释放对event_base的独占锁  
            return -1;  
        }  
      
        base->running_loop = 1;//标记循环已经启动  
      
        clear_time_cache(base);//清除系统时间缓存  
      
        if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)  
            evsig_set_base(base);//设置信号处理器的event_base实例  
      
        done = 0;  
      
    #ifndef _EVENT_DISABLE_THREAD_SUPPORT    
        base->th_owner_id = EVTHREAD_GET_ID(); //不支持多线程时  
    #endif  
        //多线程  
        base->event_gotterm = base->event_break = 0;  
      
        while (!done) {  
            base->event_continue = 0;  
      
            /* Terminate the loop if we have been asked to */  
            if (base->event_gotterm) {  
                break;  
            }  
      
            if (base->event_break) {  
                break;  
            }  
      
            // 校正系统时间,如果系统支持monotonic时间,该时间是系统从boot后到现在所经过的时间,因此不需要执行校正。  
            //如果系统使用的是非MONOTONIC时间,用户可能会向后调整了系统时间  
            // 在timeout_correct函数里,比较last wait time和当前时间,如果当前时间< last wait time  
            // 表明时间有问题,这是需要更新timer_heap中所有定时事件的超时时间。  
            timeout_correct(base, &tv);  
      
            tv_p = &tv;  
            // 根据timer heap中事件的最小超时时间,设置系统I/O demultiplexer的最大等待时间  
            if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {//没有就绪事件则设置最大等待时间  
                //如果就绪事件为0,且、、  
                timeout_next(base, &tv_p);////获取时间堆中超时时间最小的事件处理器的,计算等待时间,返回值存在tv_p中  
                //超时时间作为I/O分发器的超时返回时间,这样当I/O分发器返回时即可以去处理  
                //事件堆上就绪的定时事件  
            } else {//有事件就绪了,就直接返回  
                /* 
                * if we have active events, we just poll new events 
                * without waiting. 
                */  
                //如果有就绪事件尚未处理,则将I/O复用系统调用的超时时间设置为0,  
                //这样I/O复用系统调用将直接返回,程序即立即处理就绪事件  
                evutil_timerclear(&tv);//(tvp)->tv_sec = (tvp)->tv_usec = 0  
            }  
      
            /* If we have no events, we just exit */  
            //如果event_base中没有注册任何事件,则直接退出  
            if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {  
                event_debug(("%s: no events registered.", __func__));  
                retval = 1;  
                goto done;  
            }  
      
            /* update last old time */  
            gettime(base, &base->event_tv);//更新base->event_tv为缓存时间(没清空时)或系统当前那时间  
      
            clear_time_cache(base);//清除系统时间缓存  
      
            //调用后端i/o复用机制的事件多路分发器的dispath函数等待事件,将就绪信号事件,I/O事件插入到活动事件队列中  
            res = evsel->dispatch(base, tv_p);//阻塞调用:如epoll_wait  
      
            if (res == -1) {  
                event_debug(("%s: dispatch returned unsuccessfully.",  
                    __func__));  
                retval = -1;  
                goto done;  
            }  
      
            update_time_cache(base);//更新时间缓存为当前系统时间(缓存时间记录了本次事件就绪返回的时间)  
    // 检查heap中的timer events,将就绪的timer event从heap上删除,并插入到激活链表中  
            timeout_process(base);  
    // 调用event_process_active()处理激活链表中的就绪event,调用其回调函数执行事件处理  
    // 该函数会寻找最高优先级(priority值越小优先级越高)的激活事件链表,  
    // 然后处理链表中的所有就绪事件;  
    // 因此低优先级的就绪事件可能得不到及时处理;  
            if (N_ACTIVE_CALLBACKS(base)) {//有就绪事件就绪了  
                int n = event_process_active(base);//处理就绪事件  
                if ((flags & EVLOOP_ONCE)  
                    && N_ACTIVE_CALLBACKS(base) == 0  
                    && n != 0)  
                    done = 1;  
            } else if (flags & EVLOOP_NONBLOCK)  
                done = 1;  
        }  
        event_debug(("%s: asked to terminate loop.", __func__));  
      
    done:  
        // 循环结束,清空时间缓存  
        clear_time_cache(base);  
        base->running_loop = 0;//设置停止循环标志  
      
        EVBASE_RELEASE_LOCK(base, th_base_lock);//释放锁  
      
        return (retval);  
    }

    //阻塞工作线程    
    static void wait_for_thread_registration(int nthreads) {    
        while (init_count < nthreads) {    
            pthread_cond_wait(&init_cond, &init_lock);//在条件变量init_cond上面阻塞,阻塞个数为nthreads-init_count    
        }    
    }    
    //唤醒工作线程    
    static void register_thread_initialized(void) {    
        pthread_mutex_lock(&init_lock);    
        init_count++;    
        pthread_cond_signal(&init_cond);    
        pthread_mutex_unlock(&init_lock);    
    }    
        
    //每个线程持有的统计信息    
    struct thread_stats {    
        pthread_mutex_t  mutex;    
        uint64_t          get_cmds;    
        uint64_t          get_misses;    
        uint64_t          touch_cmds;    
        uint64_t          touch_misses;    
        uint64_t          delete_misses;    
        uint64_t          incr_misses;    
        uint64_t          decr_misses;    
        uint64_t          cas_misses;    
        uint64_t          bytes_read;    
        uint64_t          bytes_written;    
        uint64_t          flush_cmds;    
        uint64_t          conn_yields; /* # of yields for connections (-R option)*/    
        uint64_t          auth_cmds;    
        uint64_t          auth_errors;    
        struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];    
    }; 

    本节主要分析了工作线程的初始化部分,至于具体的主线程任务分发调度,以及工作线程如何具体处理连接和与主线程之间的通信等后续讲解。

  • 相关阅读:
    e621. Activating a Keystroke When Any Child Component Has Focus
    e587. Filling Basic Shapes
    e591. Drawing Simple Text
    e595. Drawing an Image
    e586. Drawing Simple Shapes
    e636. Listening to All Key Events Before Delivery to Focused Component
    在 PL/SQL 块的哪部分可以对初始变量赋予新值? (选择1项)
    Oracle数据库中,在SQL语句中连接字符串的方法是哪个?(选择1项)
    你判断下面语句,有什么作用?(单选)
    Oracle数据库表空间与数据文件的关系描述正确的是( )
  • 原文地址:https://www.cnblogs.com/duanxz/p/5138091.html
Copyright © 2011-2022 走看看