zoukankan      html  css  js  c++  java
  • memcached客户端连接建立过程笔记

    memcached在启动过程初始化server_sockets时,根据启动参数决定系统是进行tcp监听还是udp监听,这里暂时只关注tcp的情况。

    server_socket在初始化时会向系统申请监听socket之后设置地址,bind以及开始listen等操作,之后比较关键的一步是
    为监听socket创建了一个conn, conn是用来描述一个客户端请求的上下文,显然memcached将监听socket也当做conn处理,方便管理,
    主要代码:

    /* 设置conn的初始状态为conn_listening,表示这是一个监听socket,在之后的事件状态机中专门用于接收用户连接 */
    if (!(listen_conn_add = conn_new(sfd, conn_listening,	
    EV_READ | EV_PERSIST, 1,
    transport, main_base))) {
    fprintf(stderr, "failed to create listening connection
    ");
    exit(EXIT_FAILURE);
    }
    listen_conn_add->next = listen_conn;
    listen_conn = listen_conn_add; /* listen_conn是一个全局变量,用来保存所有的监听连接 */

    conn数据结构部分字段注释:

    struct conn {
    int sfd; /** 连接对应的fd,即监听fd或者用户连接fd*/
    
    char *rbuf; /** 存储读取到的命令 */
    char *rcurr; /** 已经解析到的rbuf的位置 */
    int rsize; /** rbuf的全部长度 */
    int rbytes; /** 从rcurr开始还有多少未解析的数据 */
    
    char *wbuf;
    char *wcurr;
    int wsize;
    int wbytes;
    /** 标识当前状态结束之后的下一个状态,在状态机中使用 */
    enum conn_states write_and_go;
    void *write_and_free; /** free this memory after finishing writing */
    
    char *ritem; /** 用来存储key-value中的value,在状态机中做了接收到的数据复制到存储位置的操作 */
    int rlbytes;
    
    /* 用于读取value的数据结构*/
    /* data for the nread state */
    
    /**
    * item is used to hold an item structure created after reading the command
    * line of set/add/replace commands, but before we finished reading the actual
    * . The data is read into ITEM_data(item) to avoid extra copying.
    */
    /**
    * 在未读取到key-value中的value数据时,item用来存储处理set/add/replace这三个命令锁生成的item1数据结构,
    * 之后读取到value会被直接读进item1的data区域,防止多一次的数据复制,这里需要多体会一下
    */ void *item; /* 在这几个命令set/add/replace会用到 */ /* data for the swallow state */ int sbytes; /* how many bytes to swallow */ enum protocol protocol; /* 标识传送数据类型 char型或者是二进制 */ enum network_transport transport; /* 标识数据传输的方式tcp或者udp或unix socket */ int hdrsize; /* number of headers' worth of space is allocated */ bool noreply; /* 标识该命令是否需要回复 */ conn *next; /* 指向下一个连接形成单链表结构 */ LIBEVENT_THREAD *thread; /* 指向所属的线程,每一个用户连接都会被固定分配给一个worker线程 */ }; 

    在new_conn中的关键代码标识了这个conn的初始状态以及处理函数:

    c->state = init_state; /** 设置连接的初始状态 */
    ...
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c); /** 设置事件监听以及event_handler回调函数*/
    event_base_set(base, &c->event); /**注册到libevent*/

    event_handler进行简单fd校验之后将事件转交给drive_machine处理,这是memcached事件处理状态机的实现,
    下面看看drive_machine连接建立部分逻辑:

    static void drive_machine(conn *c) {
    bool stop = false;
    int sfd;
    socklen_t addrlen;
    struct sockaddr_storage addr;
    int nreqs = settings.reqs_per_event;
    int res;
    const char *str;
    assert(c != NULL);
    while (!stop) {
    switch(c->state) {
    case conn_listening: /** 状态为conn_listening的监听连接负责接收客户端连接*/
    /** 关键部分,接收连接并设置为非阻塞*/
    accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
    fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0)
    /** 先判断是否达到设置的最大连接数量*/
    if (settings.maxconns_fast &&
    stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
    ...
    } else { /** 未达到最大连接数则分配该连接并创建conn表示客户单连接上下文*/
    /** 新conn的初始状态为conn_new_cmd,监听读事件,传输协议为tcp*/
    /** -------------------跳到下面dispach_conn_new的大致逻辑----------*/
    dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
    DATA_BUFFER_SIZE, tcp_transport);
    }
    stop = true; /** 跳出while循环*/
    break;
    
    /** 下面的状态基本由客户端建立连接之后的事件驱动*/
    case ...:
    ...
    return;
    }
    
    /**
    * 这个函数只能由主线程来调用,用于将新接收的连接分配给worker线程
    */
    void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
    int read_buffer_size, enum network_transport transport) {
    CQ_ITEM *item = cqi_new(); /*CQ_ITEM是主线程与worker线程数据交互的数据结构,对一个连接的包装*/
    char buf[1];
    if (item == NULL) {
    close(sfd);
    /* given that malloc failed this may also fail, but let's try */
    fprintf(stderr, "Failed to allocate memory for connection object
    ");
    return ;
    }
    int tid = (last_thread + 1) % settings.num_threads; /** robin-round循环获取一个目标worker线程*/
    
    LIBEVENT_THREAD *thread = threads + tid;
    ...
    cq_push(thread->new_conn_queue, item); /** 将新的连接包装实体压入到woker线程待处理连接队列*/
    
    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = 'c';
    /** 主线程通过初试化线程时创建的pipe通道给worker发送消息'c'表示分配给一个新的连接*/
    if (write(thread->notify_send_fd, buf, 1) != 1) {
    ...
    }
    }
    /**
    *worker线程收到消息后会回调线程启动时pipe关注的事件注册的回调函数
    *thread_libevent_process,下面继续看看该函数逻辑
    */
    static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];
    /** 读取消息*/
    if (read(fd, buf, 1) != 1)
    if (settings.verbose > 0)
    fprintf(stderr, "Can't read from libevent pipe
    ");
    /** 判断消息类型*/
    switch (buf[0]) {
    case 'c': /** 新连接*/
    item = cq_pop(me->new_conn_queue); /** pop出主线程压入的新连接结构*/
    
    if (NULL != item) {
    /** 创建新连接 me->base 表示将该连接关注的事件注册到的libevent 事件监听结构*/
    conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
    item->read_buffer_size, item->transport, me->base);
    ...
    cqi_free(item);
    }
    break;
    /** 下面两个消息标识主要跟线程启动注册有关*/
    /* we were told to flip the lock type and report in */
    case 'l':
    me->item_lock_type = ITEM_LOCK_GRANULAR;
    register_thread_initialized();
    break;
    case 'g':
    me->item_lock_type = ITEM_LOCK_GLOBAL;
    register_thread_initialized();
    break;
    }
    }
    

    新的连接在创建conn_new时又被注册了回调函数为event_handler,event_handler将事件处理转交drive_machine状态机,

    也就是由客户端发送命令事件驱动drive_machine其他状态的逻辑,以上是客户端连接建立的过程,drive_machine状态机
    其他状态注释留待下一次笔记。

  • 相关阅读:
    JDK1.8HashMap底层实现原理
    关于map转json,空key丢失的问题
    spring一些注解的使用及相关注解差异
    搭建基础项目遇到的一些小坑
    解析ftp上word文档的文字并输入
    R语言中回归模型预测的不同类型置信区间应用比较分析
    R语言中的广义线性模型(GLM)和广义相加模型(GAM):多元(平滑)回归分析保险资金投资组合信用风险敞口
    R语言对巨灾风险下的再保险合同定价研究案例:广义线性模型和帕累托分布Pareto distributions分析
    R语言中GLM(广义线性模型),非线性和异方差可视化分析
    如何用R语言绘制生成正态分布图表
  • 原文地址:https://www.cnblogs.com/bicowang/p/3821291.html
Copyright © 2011-2022 走看看