zoukankan      html  css  js  c++  java
  • 分布式缓存系统 Memcached 半同步/半异步模式

    在前面工作线程初始化的分析中讲到Memcached采用典型的Master_Worker模式,也即半同步/半异步的高效网络并发模式。其中主线程(异步线程)负责接收客户端连接,然后分发给工作线程,具体由工作线程完成客户端的求情任务。

    在memcached中,主线程负责监听所有socket上的事件,当socket上有可读事件发生,即新的客户连接求情到来,主线程就接受之得到新的连接socket,并将该连接socket信息放入一个任务对象(CQ_ITEM)结构体中,然后选择一个工作线程,将该CQ_ITEM放入该工作线程的任务队列(CQ)中,并通过管道通知工作线程:“我接到一个新的客户端请求,我已经把把放入你的任务队列中了,你赶紧去处理吧!”,后面就由工作线程去完成客户端请求的任务。 需要主要的是,在memcached中,主线程和每个工作线程都关联一个Libevent实例来负责与主线程的通信和处理客户端的任务事件。因此,实际上这里的工作线程也是异步的,每个工作线程处理多个客户端请求任务,这正是由Libevent实现的。

    具体的半同步/半异步模式结构如下图所示:

    I/O框架库Libevent基本结构如下图所示:

    连接请求队列CQ结构:

    //连接队列(循环单链表)
    //队列的主要操作:初始化  push  pop
    typedef struct conn_queue CQ;
    struct conn_queue {
        CQ_ITEM *head;
        CQ_ITEM *tail;
        pthread_mutex_t lock;//每个队列维持一个线程锁,因此主线程在向该队列中push ITEM
     //时都是要加锁的
    };

    CQ队列节点CQ_ITEM:

    //连接对象CQ_ITEM
    typedef struct conn_queue_item CQ_ITEM;
    struct conn_queue_item {
        int              sfd;//连接socket
        enum conn_states  init_state;//所有可能状态组成的结构体
        int              event_flags;
        int              read_buffer_size;
        enum network_transport    transport;//网络传输方式
        CQ_ITEM          *next;//指向所在连接队列的下一ITEM对象
    };

    连接队列CQ的主要操作: 
    push 与pop,即主线程将先到来的连接对象ITEM push到CQ队列中; 工作线程从CQ队列中pop出一个ITEM进行处理。

    //主线程将一个ITEM放入到连接连接队列的尾部
    static void cq_push(CQ *cq, CQ_ITEM *item) {
        item->next = NULL;
        pthread_mutex_lock(&cq->lock);//需要加锁
        if (NULL == cq->tail)
            cq->head = item;
        else
            cq->tail->next = item;
        cq->tail = item;
        pthread_mutex_unlock(&cq->lock);
    }

    //工作线程读取CQ队列中头部的ITEM(同时从队列中删除该ITEM)
    //注意:读写CQ队列都需要加锁,以防工作线程正在读取时,主线程往队列中加入新的ITEM
    static CQ_ITEM *cq_pop(CQ *cq) {
        CQ_ITEM *item;
        pthread_mutex_lock(&cq->lock);//
        item = cq->head;
        if (NULL != item) {
            cq->head = item->next;
            if (NULL == cq->head)
                cq->tail = NULL;
        }
        pthread_mutex_unlock(&cq->lock);


        return item;
    }

    CQ_ITEM内存池:

    memcached在申请一个CQ_ITEM结构体时,并不是直接使用malloc申请的。因为这样做可能会导致大量的内存碎片(作为长期运行的服务器进程memcached需要考虑这个问题)。为此,memcached也为CQ_ITEM使用类似内存池的技术:每次在主线程在请求一个空闲ITEM时,检测该空闲ITEM链表,如果为空,则请于预分配一块内存(默认情况一次分配64个ITEM),分割为多个ITEM,将第一个返回给调用者使用,剩余的连成空闲ITEM链表,以备主线程使用。在工作线程中释放一个ITEM时,只需要将ITEM放回该空闲链表即可。

    空闲链表:static CQ_ITEM *cqi_freelist;

    具体实现如下:

    //这里采取的优化方法是,一次性分配64个CQ_ITEM大小的内存(即预分配).  
     //下次调用本函数的时候,直接从之前分配64个中要一个即可。  
     //由于是为了防止内存碎片,所以不是以链表的形式放置这64个CQ_ITEM。而是数组的形式。  
     //于是,cqi_free函数就有点特别了。它并不会真正释放.而是像内存池那样归还  
    static CQ_ITEM *cqi_new(void) {  
        //所有线程都会访问cqi_freelist的。所以需要加锁  
        CQ_ITEM *item = NULL;  
        pthread_mutex_lock(&cqi_freelist_lock);  
        if (cqi_freelist) {  //如果空闲链表中还有ITEM,则之间返回一个ITEM给调用者
            item = cqi_freelist;  
            cqi_freelist = item->next;  
        }  
        pthread_mutex_unlock(&cqi_freelist_lock);  
      
        if (NULL == item) {//没有多余的CQ_ITEM了  
            int i;  
      
            item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);//该宏等于64  
      
            //item[0]直接返回为调用者,不用next指针连在一起。调用者负责将  
            //item[0].next赋值为NULL  
            for (i = 2; i < ITEMS_PER_ALLOC; i++)//将这块内存分成一个个的item并且用next指针像链表一样连起来  
                item[i - 1].next = &item[i];  
      
            pthread_mutex_lock(&cqi_freelist_lock);  
            //因为主线程负责申请CQ_ITEM,子线程负责释放CQ_ITEM。所以cqi_freelist此刻  
            //可能并不等于NULL。由于使用头插法,所以无论cqi_freeelist是否为NULL,都能  
            //把链表连起来的。  
            item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;  
            cqi_freelist = &item[1];  
            pthread_mutex_unlock(&cqi_freelist_lock);  
        }  
      
        return item;  
    }  
      
      
    //并非释放,而是像内存池那样归还  
    static void cqi_free(CQ_ITEM *item) {  
        pthread_mutex_lock(&cqi_freelist_lock);  
        item->next = cqi_freelist;  
        cqi_freelist = item;  //头插法归还  
        pthread_mutex_unlock(&cqi_freelist_lock);  

    工作线程:

    全局变量:static LIBEVENT_THREAD *threads;

    threads指向所有的工程线程组成的数组,主线程通过该threads指针即可遍历所有的工作线程,从该数组中选出一个工作线程,然后通过管道即可实现与工作线程的通信,同时也可以将CQ_ITEM放入对应的CQ队列中。

    其中Memcached封装了工作线程结构体,包括了Libevent实例对象,以及通信管道描述符等。

    //LIBEVENT_THREAD是Memcached内部对工作线程的一个封装 
    typedef struct {
        pthread_t thread_id;    //线程id  /* unique ID of this thread */
        struct event_base *base;  //libevent的不是线程安全的,每个工作线程持有一个libevent实例,用于pipe管道通信和socket通信  
        struct event notify_event; //线程的通知事件  /* listen event for notify pipe */
        int notify_receive_fd;    //通知管道接收端(读) /* receiving end of notify pipe */
        int notify_send_fd;        //通知管道写端 /* sending end of notify pipe */
        struct thread_stats stats; //该线程的状态 /* Stats generated by this thread */
        struct conn_queue *new_conn_queue; //每个线程都有一个工作队列,主线程接受的连接,挂载到该消息队列中  
        cache_t *suffix_cache;      //后缀cache  
        uint8_t item_lock_type;    //线程操作hash表持有的锁类型,有局部锁和全局锁  
    } LIBEVENT_THREAD;

    工作线程的创建包括由其初始化函数完成,包括:配置Libevent实例对象、创建CQ队列等过程,然后创建线程,启动事件循环。

    下面给初始化函数:

    //工作线程的初始化,创建线程,分别执行libevent事件循环:Memcached采用了典型的Master-Worker的线程模式,
    //Master就是由main线程来充当,而Worker线程则是通过Pthread创建的。
    void thread_init(int nthreads, struct event_base *main_base) {
        int        i;
        int        power;


        pthread_mutex_init(&cache_lock, NULL);
        pthread_mutex_init(&stats_lock, NULL);


        pthread_mutex_init(&init_lock, NULL);
        pthread_cond_init(&init_cond, NULL);


        pthread_mutex_init(&cqi_freelist_lock, NULL);
        cqi_freelist = NULL;


        /* Want a wide lock table, but don't waste memory */
        if (nthreads < 3) {
            power = 10;
        } else if (nthreads < 4) {
            power = 11;
        } else if (nthreads < 5) {
            power = 12;
        } else {
            /* 8192 buckets, and central locks don't scale much past 5 threads */
            power = 13;
        }


        item_lock_count = hashsize(power);
        item_lock_hashpower = power;


        item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
        if (! item_locks) {
            perror("Can't allocate item locks");
            exit(1);
        }
        for (i = 0; i < item_lock_count; i++) {
            pthread_mutex_init(&item_locks[i], NULL);
        }
        pthread_key_create(&item_lock_type_key, NULL);
        pthread_mutex_init(&item_global_lock, NULL);


        threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));//申请nthreads个工作线程空间,由全局变量threads维护
        if (! threads) {
            perror("Can't allocate thread descriptors");
            exit(1);
        }


        dispatcher_thread.base = main_base;
        dispatcher_thread.thread_id = pthread_self();
     //工作线程的初始化,工作线程和主线程(main线程)是通过pipe管道进行通信的 
        for (i = 0; i < nthreads; i++) {
            int fds[2];
            if (pipe(fds)) {
                perror("Can't create notify pipe");
                exit(1);
            }


            threads[i].notify_receive_fd = fds[0];//读管道绑定到工作线程的接收消息的描述符  
            threads[i].notify_send_fd = fds[1];//写管道绑定到工作线程的发送消息的描述符  
      //为每个线程配置一个Libevent实例 和一个CQ队列
            setup_thread(&threads[i]);
            /* Reserve three fds for the libevent base, and two for the pipe */
            stats.reserved_fds += 5;
        }


        /* Create threads after we've done all the libevent setup. */
     
        for (i = 0; i < nthreads; i++) {
            create_worker(worker_libevent, &threads[i]);//创建线程,执行libevnt事件循环
        }


        /* Wait for all the threads to set themselves up before returning. */
        pthread_mutex_lock(&init_lock);
        wait_for_thread_registration(nthreads);
        pthread_mutex_unlock(&init_lock);
    }

    上面初始化函数中的:

    create_worker函数中,调用工作线程函数worker_libevent启动Libevent事件循环:event_base_loop,处理主线程的管道通知和所分配的客户端请求任务等事件。

    而setup_thread函数主要负责两种事件的注册:

    1、将读管道事件注册到libevent上。

    2、设置其中读管道事件回调函数为thread_libevent_process(),该函数负责对读取并解析管道的一个字节的通知信息。然后从连接队列CQ的头部取出该ITEM,为该连接socket在该线程的libeven上注册事件(调用函数conn_new设置),连接socket事件的回调的函数也event_handler(因为实际上主线程也是通过conn_new初始化监听socket 的libevent可读事件),而其中event_handler函数的核心依然是 memcached网络事件处理的最核心部分- drive_machine 。drive_machine 将留到后面分析。

    并设定该线程为该连接socket的服务线程,然后将该ITEM放入到ITEM内存池空闲链表中。

    其中管道事件回调thread_libevent_process函数,具体代码如下:

    //回调函数:处理主线程发来的读管道事件,将收到的连接socket注册到libevent上
    //fd:读管道描述符notify_receive_fd
    static void thread_libevent_process(int fd, short which, void *arg) {
        LIBEVENT_THREAD *me = arg;
        CQ_ITEM *item;
        char buf[1];

    //首先将管道的1个字节通知信号读出
    //(这是必须的,在水平触发模式下如果不处理该事件,则会被循环通知,直到事件被处理) 
        if (read(fd, buf, 1) != 1)
            if (settings.verbose > 0)
                fprintf(stderr, "Can't read from libevent pipe ");

     //解析主线程发送来的通知类容
        switch (buf[0]) {
        case 'c'://从CQ队列取出头部ITEM进行处理
        item = cq_pop(me->new_conn_queue);

        if (NULL != item) {
      //创建连接结构体,并向该线程libevent注册该连接socket的事件。
            conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                              item->read_buffer_size, item->transport, me->base);
            if (c == NULL) {
                if (IS_UDP(item->transport)) {
                    fprintf(stderr, "Can't listen for events on UDP socket ");
                    exit(1);
                } else {
                    if (settings.verbose > 0) {
                        fprintf(stderr, "Can't listen for events on fd %d ",
                            item->sfd);
                    }
                    close(item->sfd);
                }
            } else {
                c->thread = me;//指定该连接的服务线程为当前线程
            }
            cqi_free(item);//将该ITEM加入空闲ITEM链表(ITEM内存池),以循环利用
        }
            break;
        /* we were told to flip the lock type and report in */
        case 'l':
        me->item_lock_type = ITEM_LOCK_GRANULAR;
        register_thread_initialized();
            break;
        case 'g':
        me->item_lock_type = ITEM_LOCK_GLOBAL;
        register_thread_initialized();
            break;
        }
    }

    上面的setup_thread函数将管道事件与连接socket事件都注册到了linevent上,工作线程的基础设施已经完成。

    因此线程函数worker_libevent则负责启动事件循环,如下:

    //线程处理函数 :执行libevent事件循环
    static void *worker_libevent(void *arg) {
        LIBEVENT_THREAD *me = arg;

        /* Any per-thread setup can happen here; thread_init() will block until
        * all threads have finished initializing.
        */

        /* set an indexable thread-specific memory item for the lock type.
        * this could be unnecessary if we pass the conn *c struct through
        * all item_lock calls...
        */
      //默认的hash表的锁为局部锁  
        me->item_lock_type = ITEM_LOCK_GRANULAR;  
        pthread_setspecific(item_lock_type_key, &me->item_lock_type);//设定线程的属性  
        //用于控制工作线程初始化,通过条件变量来控制  
        register_thread_initialized();  
        //工作线程的libevent实例启动  
        event_base_loop(me->base, 0); 
        return NULL;
    }

    主线程:

    上面是工作过线程的基本工作过程,而主线的的主要工作就是:

    1、接受新的客户端连接请求

    2、将连接socket分发给工作线程,并通知工作线程来处理之。

    当然,主要函数也就是我们的main函数了。在main函数中,主线程创建了属于自己的Libevent实例,存放在全局变量main_base中。在main函数的最后,主线程调用event_base_loop进入事件循环中。中间的server_sockets函数是创建一个监听客户端的socket,并将创建一个event监听该socket的可读事件。

    main函数具体分析如下(仅仅展示main函数的基本流程):

    int main (int argc, char **argv) {  
          
        //检查libevent的版本是否足够新.1.3即可  
        if (!sanitycheck()) {  
            return EX_OSERR;  
        }  
      
        //对memcached的关键设置取默认值  
        settings_init();  
      
        ...//解析memcached启动参数  
      //是否以守护进程方式运行 memcached
        if (do_daemonize) {
            if (sigignore(SIGHUP) == -1) {
                perror("Failed to ignore SIGHUP");
            }
            if (daemonize(maxcore, settings.verbose) == -1) {
                fprintf(stderr, "failed to daemon() in order to daemonize ");
                exit(EXIT_FAILURE);
            }
        }
        //main_base是一个struct event_base类型的全局变量  
        main_base = event_init();//为主线程创建一个event_base  
      <span style="white-space:pre"> </span>//如果以多线程模式运行 memcached,则启动工作者线程
    <span style="white-space:pre"> </span>//配置,创建并启动多线程模式中的每个工作线程
        thread_init(settings.num_threads, main_base);
        conn_init();//先不管,后面会说到  
      
        //创建settings.num_threads个worker线程,并且为每个worker线程创建一个CQ队列  
        //并为这些worker申请各自的event_base,worker线程然后进入事件循环中    
        thread_init(settings.num_threads, main_base);  
      
        //设置一个定时event(也叫超时event),定时(频率为一秒)更新current_time变量  
        //这个超时event是add到全局变量main_base里面的,所以主线程负责更新current_time(这是一个很重要的全局变量)  
        clock_handler(0, 0, 0);  
      
      
        /* create the listening socket, bind it, and init */  //创建监听套接字,绑定到端口
        if (settings.socketpath == NULL) {  
            FILE *portnumber_file = NULL;  
            //创建监听客户端的socket  
            if (settings.port && server_sockets(settings.port, tcp_transport,//tcp_transport是枚举类型  
                                              portnumber_file)) {  
                vperror("failed to listen on TCP port %d", settings.port);  
                exit(EX_OSERR);  
            }  
      
            ...  
        }  
      
      
        
        if (event_base_loop(main_base, 0) != 0) {//主线程进入事件循环  
            retval = EXIT_FAILURE;  
        }  
      //完成善后工作
        return retval;  
    }

    这节主要是分析了memcached的网络并发处理模式,且分析了主线程和工作线程协调运行的具体实现。

    其中工作线程的初始化函数中主要完成了:

    1、基本设施的配置,如设置libevent实例等

    2、在libevent上注册管道读通知事件。回调函数为thread_libevent_process,该函数负责根据管道通知,执行相关任务,比如下面的3也是在该函数中调用其他函数完成的:

    3、在libevent上注册主线程分配的连接socket事件,回调函数通过conn_new函数设为event_handler(其中主要是调用了memcached网络事件处理的最核心部分- drive_machine ),实际上主线程也是通过conn_new初始化监听socket 的libevent可读事件

    通过以上分析,对memcached的基本框架已经有了较为深入的理解。对于主线程,在这仅仅简单介绍了其基本执行流程,其具体设计细节将留到下节分析。

  • 相关阅读:
    JavaScript的alert()突然出现中文乱码
    对ie CSS hack总结和理解
    getBoundingClientRect()获取元素相对浏览器视窗的坐标值
    typeof和instanceof的区别
    转载:Dos命令行下,关闭占用80端口的进程 (~~~这个比较实用!)
    关于JS浮点数计算误差问题和二进制浮点数表示法的思考
    callee与caller,apply与call的区别和作用
    [Javascript权威指南笔记01]后自增/后自减运算符的副作用 和 运算符的结合性
    转载:关于zindex的那些事(~~~比较理论的描述 堆栈上下文)
    西游东去 (~~创意?创新?恶搞?不置可否,不过有点意思)
  • 原文地址:https://www.cnblogs.com/duanxz/p/5138095.html
Copyright © 2011-2022 走看看