zoukankan      html  css  js  c++  java
  • memcached使用libevent 和 多线程模式

    一、libevent的使用

        首先我们知道,memcached是使用了iblievet作为网络框架的,而iblievet又是单线程模型的基于linux下epoll事件的异步模型。因此,其基本的思想就是 对可读,可写,超时,出错等事件进行绑定函数,等有其事件发生,对其绑定函数回调。

        

    可以减掉了解一下 libevent基本api调用

    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. struct event_base *base;  
    2. base = event_base_new();//初始化libevent  
     

    event_base_new对比epoll,可以理解为epoll里的epoll_create。

    event_base内部有一个循环,循环阻塞在epoll调用上,当有一个事件发生的时候,才会去处理这个事件。其中,这个事件是被绑定在event_base上面的,每一个事件就会对应一个struct event,可以是监听的fd。 

    其中struct event 使用event_new 来创建和绑定,使用event_add来启用,例如:

    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. struct event *listener_event;  
    2. listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);  
     
     

    参数说明:

    base:event_base类型,event_base_new的返回值

    listener:监听的fd,listen的fd

    EV_READ|EV_PERSIST:事件的类型及属性

    do_accept:绑定的回调函数

    (void*)base:给回调函数的参数

    event_add(listener_event, NULL);

    对比epoll:

    event_new相当于epoll中的epoll_wait,其中的epoll里的while循环,在libevent里使用event_base_dispatch。

    event_add相当于epoll中的epoll_ctl,参数是EPOLL_CTL_ADD,添加事件。

    注:libevent支持的事件及属性包括(使用bitfield实现,所以要用 | 来让它们合体)
    EV_TIMEOUT: 超时
    EV_READ: 只要网络缓冲中还有数据,回调函数就会被触发
    EV_WRITE: 只要塞给网络缓冲的数据被写完,回调函数就会被触发
    EV_SIGNAL: POSIX信号量
    EV_PERSIST: 不指定这个属性的话,回调函数被触发后事件会被删除
    EV_ET: Edge-Trigger边缘触发,相当于EPOLL的ET模式

    事件创建添加之后,就可以处理发生的事件了,相当于epoll里的epoll_wait,在libevent里使用event_base_dispatch启动event_base循环,直到不再有需要关注的事件。

     

    有了上面的分析,结合之前做的epoll服务端程序,对于一个服务器程序,流程基本是这样的:

    1. 创建socket,bind,listen,设置为非阻塞模式

    2. 创建一个event_base,即

    [cpp] view plaincopy
     
     
    1. struct event_base *  event_base_new(void)  

    3. 创建一个event,将该socket托管给event_base,指定要监听的事件类型,并绑定上相应的回调函数(及需要给它的参数)。即

    [cpp] view plaincopy
     
     
    1. struct event *  event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)  

    4. 启用该事件,即

    [cpp] view plaincopy
     
     
    1. int  event_add(struct event *ev, const struct timeval *tv)  

    5.  进入事件循环,即

    [cpp] view plaincopy
     
     
    1. int  event_base_dispatch(struct event_base *event_base)  
     

     

    有了上边的基础东西,可以进入memcached的阅读了。
     
    二、memcached源码分析
     
    main函数启动,首先会初始化很多数据,这里我们只涉及大网络这块,其他以后分析,先忽略。
    1.首先初始化 主工作线程的的iblievet对象
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /* initialize main thread libevent instance */  
    2.   main_base = event_init();  

    最后会调用
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /* enter the event loop */  
    2.  if (event_base_loop(main_base, 0) != 0) {  
    3.      retval = EXIT_FAILURE;  
    4.  }  

    在该对象内部循环。不退出。
     
    2.初始化连接的对象
     
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. static void conn_init(void) {  
    2.     freetotal = 200;  
    3.     freecurr = 0;  
    4.     if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {  
    5.         fprintf(stderr, "Failed to allocate connection structures ");  
    6.     }  
    7.     return;  
    8. }  


    这里是先预先分配200个conn*的内存。等有连接上来,会从freeconns  取。
    如下代码:
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /* 
    2.  * Returns a connection from the freelist, if any. 
    3.  */  
    4. conn *conn_from_freelist() {  
    5.     conn *c;  
    6.   
    7.     pthread_mutex_lock(&conn_lock);  
    8.     if (freecurr > 0) {  
    9.         c = freeconns[--freecurr];  
    10.     } else {  
    11.         c = NULL;  
    12.     }  
    13.     pthread_mutex_unlock(&conn_lock);  
    14.   
    15.     return c;  
    16. }  
     
     
    3.那么conn的结构体内部长什么样子呢?
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. typedef struct conn conn;  
    2. struct conn {  
    3.     int    sfd;  
    4.     sasl_conn_t *sasl_conn;  
    5.     enum conn_states  state;  
    6.     enum bin_substates substate;  
    7.     struct event event;  
    8.     short  ev_flags;  
    9.     short  which;   /** which events were just triggered */  
    10.   
    11.     char   *rbuf;   /** buffer to read commands into */  
    12.     char   *rcurr;  /** but if we parsed some already, this is where we stopped */  
    13.     int    rsize;   /** total allocated size of rbuf */  
    14.     int    rbytes;  /** how much data, starting from rcur, do we have unparsed */  
    15.   
    16.     char   *wbuf;  
    17.     char   *wcurr;  
    18.     int    wsize;  
    19.     int    wbytes;  
    20.     /** which state to go into after finishing current write */  
    21.     enum conn_states  write_and_go;  
    22.     void   *write_and_free; /** free this memory after finishing writing */  
    23.   
    24.     char   *ritem;  /** when we read in an item's value, it goes here */  
    25.     int    rlbytes;  
    26.   
    27.     /* data for the nread state */  
    28.   
    29.     /** 
    30.      * item is used to hold an item structure created after reading the command 
    31.      * line of set/add/replace commands, but before we finished reading the actual 
    32.      * data. The data is read into ITEM_data(item) to avoid extra copying. 
    33.      */  
    34.   
    35.     void   *item;     /* for commands set/add/replace  */  
    36.   
    37.     /* data for the swallow state */  
    38.     int    sbytes;    /* how many bytes to swallow */  
    39.   
    40.     /* data for the mwrite state */  
    41.     struct iovec *iov;  
    42.     int    iovsize;   /* number of elements allocated in iov[] */  
    43.     int    iovused;   /* number of elements used in iov[] */  
    44.   
    45.     struct msghdr *msglist;  
    46.     int    msgsize;   /* number of elements allocated in msglist[] */  
    47.     int    msgused;   /* number of elements used in msglist[] */  
    48.     int    msgcurr;   /* element in msglist[] being transmitted now */  
    49.     int    msgbytes;  /* number of bytes in current msg */  
    50.   
    51.     item   **ilist;   /* list of items to write out */  
    52.     int    isize;  
    53.     item   **icurr;  
    54.     int    ileft;  
    55.   
    56.     char   **suffixlist;  
    57.     int    suffixsize;  
    58.     char   **suffixcurr;  
    59.     int    suffixleft;  
    60.   
    61.     enum protocol protocol;   /* which protocol this con<pre name="code" class="cpp">  if (sigignore(SIGPIPE) == -1) {  
    62.         perror("failed to ignore SIGPIPE; sigaction");  
    63.         exit(EX_OSERR);  
    64.     }  

    nection speaks */ enum network_transport transport; /* what transport is used by this connection */ /* data for UDP clients */ int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ struct sockaddr request_addr; /* Who sent the most recent request */ socklen_t request_addr_size; unsigned char *hdrbuf; /* udp packet headers */ int hdrsize; /* number of headers' worth of space is allocated */ bool noreply; /* True if the reply should not be sent. */ /* current stats command */ struct { char *buffer; size_t size; size_t offset; } stats; /* Binary protocol stuff */ /* This is where the binary header goes */ protocol_binary_request_header binary_header; uint64_t cas; /* the cas to return */ short cmd; /* current command being processed */ int opaque; int keylen; conn *next; /* Used for generating a list of conn structures */ LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */};
    
    这里的所有字段就是在处理数据需要用到的。这里不详细描述。以后会慢慢分解。
     
     
    因为是memcached是多线程模型,因此在从freeconn取出一个对象的时候,是要加解锁使用。
     
    忽略SIGIPIE信号,防止rst时的程序退出
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. if (sigignore(SIGPIPE) == -1) {  
    2.       perror("failed to ignore SIGPIPE; sigaction");  
    3.       exit(EX_OSERR);  
    4.   }  

    初始化多线程模型,并且每个线程一个iblievent的事件模型就是调用event_init函数。
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /* start up worker threads if MT mode */  
    2.     thread_init(settings.num_threads, main_base);  

    内部实现不详细。主要是调用pthread_create函数。
     
    4、然后开始通过端口号启动网络监听事件
     
    代码如下:
     
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. if (settings.port && server_sockets(settings.port, tcp_transport,  
    2.                                         portnumber_file)) {  
    3.          vperror("failed to listen on TCP port %d", settings.port);  
    4.          exit(EX_OSERR);  
    5.      }  
     
    然后调用下面的函数:
     
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. static int server_socket(const char *interface,  
    2.                          int port,  
    3.                          enum network_transport transport,  
    4.                          FILE *portnumber_file)  

    因为,一个主机可能会有多个网卡,比如双线机房,联通或者电信,因此内部实现会出现以下代码:


    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. for (next= ai; next; next= next->ai_next) {  
    2.        conn *listen_conn_add;  
    3.        if ((sfd = new_socket(next)) == -1) {  
    4.            /* getaddrinfo can return "junk" addresses, 
    5.             * we make sure at least one works before erroring. 
    6.             */  
    7.            if (errno == EMFILE) {  
    8.                /* ...unless we're out of fds */  
    9.                perror("server_socket");  
    10.                exit(EX_OSERR);  
    11.            }  
    12.            continue;  
    13.        }  

    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. static int new_socket(struct addrinfo *ai)  

    该函数就是调用socket函数,设置为非阻塞。
     
     
    5、然后生成一个监听的conn对象
     
    代码如下
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. if (!(listen_conn_add = conn_new(sfd, conn_listening,  
    2.                                             EV_READ | EV_PERSIST, 1,  
    3.                                             transport, main_base))) {  
    4.                fprintf(stderr, "failed to create listening connection ");  
    5.                exit(EXIT_FAILURE);  
    6.            }  
    7.            listen_conn_add->next = listen_conn;  
    8.            listen_conn = listen_conn_add;  
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. static conn *listen_conn = NULL;  
    作为全局的静态的变量。无头结点的单链表
     
    我们继续深入conn_new 函数内部
     
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. conn *conn_new(const int sfd, enum conn_states init_state,  
    2.                 const int event_flags,  
    3.                 const int read_buffer_size, enum network_transport transport,  
    4.                 struct event_base *base) {  
    5.     conn *c = conn_from_freelist();  


    该函数主要是做了哪些动作呢?
     
    第一,从刚才的free_cnn_list取出一个conn* 来,然互分配内存,根据相关配置信息,进行相关的字段初始化工作。
     
     
    第二,加入到iblievent事件库中
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. event_set(&c->event, sfd, event_flags, event_handler, (void *)c);  
    2.     event_base_set(base, &c->event);  
    3.     c->ev_flags = event_flags;  
    4.   
    5.     if (event_add(&c->event, 0) == -1) {  
    6.         if (conn_add_to_freelist(c)) {  
    7.             conn_free(c);  
    8.         }  
    9.         perror("event_add");  
    10.         return NULL;  
    11.     }  

    这一步就是,讲sfd上的事件绑定event_handler 函数,就是当有该连接上来的时候有数据进行可读的时候绑定,回调。
     
    7、状态机的解读
     
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. <span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);">最终event_handler函数会调用</span>  
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. static void drive_machine(conn *c)  
    函数。那么这个函数做了哪些工作呢?
     
    当然是等待连接了,那就是accept函数了。
    因此,入股市conn_listening状态,
     
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. while (!stop) {  
    2.   
    3.       switch(c->state) {  
    4.       case conn_listening:  
    5.           addrlen = sizeof(addr);  
    6.           if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1)  


    当然同样是 讲sfd设置成非阻塞的。
     
    这个时候是有数据上来了。
     
    因此就要设置读命令状态了,调用以下函数:
     
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. <pre name="code" class="cpp">/* 
    2.  * Dispatches a new connection to another thread. This is only ever called 
    3.  * from the main thread, either during initialization (for UDP) or because 
    4.  * of an incoming connection. 
    5.  */  
    6. void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,  
    7.                        int read_buffer_size, enum network_transport transport) {  
    8.     CQ_ITEM *item = cqi_new();  
    9.     char buf[1];  
    10.     int tid = (last_thread + 1) % settings.num_threads;  
    11.   
    12.     LIBEVENT_THREAD *thread = threads + tid;  
    13.   
    14.     last_thread = tid;  
    15.   
    16.     item->sfd = sfd;  
    17.     item->init_state = init_state;  
    18.     item->event_flags = event_flags;  
    19.     item->read_buffer_size = read_buffer_size;  
    20.     item->transport = transport;  
    21.   
    22.     cq_push(thread->new_conn_queue, item);  
    23.   
    24.     MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);  
    25.     buf[0] = 'c';  
    26.     if (write(thread->notify_send_fd, buf, 1) != 1) {  
    27.         perror("Writing to thread notify pipe");  
    28.     }  
    29. }  


    
    
     
    通过注释可以知道,该函数是讲一个新连接分配各其他线程,
     
    通过代码我们可以看出
    首先,分配一个item块,讲连接的socket的fd 赋值给item,同时有当前状态,标志位,读buff大小等,然后分配一个线程,讲item推送到该thread的处理队列里了。
     
    然互,通过往管道里写入C字符,通知到管道的另一端,进行处理该操作符的事件。因此,完成了对对该连接的 分配工作。
     
    那么我接下来看一看 线程是如果处理的。
     
    在初始化线程的时候,已经把管道的两个操作符放入到了iblievent里了。如下代码:
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /* Listen for notifications from other threads */  
    2.   event_set(&me->notify_event, me->notify_receive_fd,  
    3.             EV_READ | EV_PERSIST, thread_libevent_process, me);  
    4.   event_base_set(me->base, &me->notify_event);  
    5.   
    6.   if (event_add(&me->notify_event, 0) == -1) {  
    7.       fprintf(stderr, "Can't monitor libevent notify pipe ");  
    8.       exit(1);  
    9.   }  


     
    绑定了回调函数:
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. static void thread_libevent_process(int fd, short which, void *arg)   


    当读到字符'c'的时候,就从其中队列中取出一个item*,掉用一下函数
     
     
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. conn *conn_new(const int sfd, enum conn_states init_state,  
    2.                 const int event_flags,  
    3.                 const int read_buffer_size, enum network_transport transport,  
    4.                 struct event_base *base)   

    同样,调用
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. conn *c = conn_from_freelist();  

    取出一个conn* ,然后进行初始化,这个时候和上文讲到的一样了,知识状态不同了,
    因此这里使用了一个状态机的模式了。
     
    有如下状态:
     
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. enum conn_states {  
    2.     conn_listening,  /**< the socket which listens for connections */  
    3.     conn_new_cmd,    /**< Prepare connection for next command */  
    4.     conn_waiting,    /**< waiting for a readable socket */  
    5.     conn_read,       /**< reading in a command line */  
    6.     conn_parse_cmd,  /**< try to parse a command from the input buffer */  
    7.     conn_write,      /**< writing out a simple response */  
    8.     conn_nread,      /**< reading in a fixed number of bytes */  
    9.     conn_swallow,    /**< swallowing unnecessary bytes w/o storing */  
    10.     conn_closing,    /**< closing this connection */  
    11.     conn_mwrite,     /**< writing out many items sequentially */  
    12.     conn_max_state   /**< Max state value (used for assertion) */  
    13. };  

    也就是
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. static void drive_machine(conn *c)  
    的核心逻辑了。通过设置状态,然后调用不同的代码,
     
    因此在一个状态结束之后,总是会看大如下代码调用:
     
    [cpp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /* 
    2.  * Sets a connection's current state in the state machine. Any special 
    3.  * processing that needs to happen on certain state transitions can 
    4.  * happen here. 
    5.  */  
    6. static void conn_set_state(conn *c, enum conn_states state) {  
    7.     assert(c != NULL);  
    8.     assert(state >= conn_listening && state < conn_max_state);  
    9.   
    10.     if (state != c->state) {  
    11.         if (settings.verbose > 2) {  
    12.             fprintf(stderr, "%d: going from %s to %s ",  
    13.                     c->sfd, state_text(c->state),  
    14.                     state_text(state));  
    15.         }  
    16.   
    17.         if (state == conn_write || state == conn_mwrite) {  
    18.             MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);  
    19.         }  
    20.         c->state = state;  
    21.     }  
    22. }  
     
     

    到此,网络框架部分已经基本处理完成。起始这个框架是非常简单而且实用的。
    redis也是基本的思想模型,只不过是单线程的,而memcached是多线程的模型。在开发模式上可以有效的借鉴。 
  • 相关阅读:
    appium 启动失败解决方案
    appium for windows 环境搭建
    HttpClient 4 使用方法的几个例子(代理,StringEntity字符串数据,文件上传)(转载)
    下拉框和单选框复选框的选中的值
    js刷新父页面的方法
    克隆虚拟机ip修改后没改变的原因
    二进制转换与此平台,VMware Workstation不,Workstation 不可恢复,此虚拟环境中的长模式
    在虚拟机(VMware)中安装Linux CentOS 6.4系统(图解) 转
    sql语句常用的函数总结
    java.lang.OutOfMemoryError: Java heap space 。java heap space明确的指出了异常发生的区域,
  • 原文地址:https://www.cnblogs.com/tangchuanyang/p/5992984.html
Copyright © 2011-2022 走看看