zoukankan      html  css  js  c++  java
  • Memcached源码分析之请求处理(状态机)

    作者:Calix

    一)上文

    在上一篇线程模型的分析中,我们知道,worker线程和主线程都调用了同一个函数,conn_new进行事件监听,并返回conn结构体对象。最终有事件到达时,调用同一个函数event_handler最终来到执行drive_machine。

    二)conn结构体

    首先,很有必要地先分析一个结构体:conn

    这个conn在memcached里面是这样一个角色,听名字也知道它代表一个“连接”,但这个“连接”不一定是已经连接上的连接,可以是监听中的连接,例如主线程在监听listen fd的时候,也通过conn_new创建了一个conn实例对象,而这个conn对象的conn_states值为conn_listening,代表“监听中的连接”。

    而worker线程监听的client fd是已经连接上了,也为这个连接创建一个“conn”对象,而连接状态conn_states则不是conn_listening,最开始的时候为conn_cmd_new,听名字也知道,这个连接处于“新命令”状态。

    每一个“连接”都有当前的状态,监听中,还是等待新命令中,还是后面会看到的“写数据”中,“关闭中”等等,所以这个conn结构体的定义是合理的。

    所以最后总结出,无论是主线程监听listen fd还是worker线程监听client fd,只要是与客户端有关的fd的监听都以一个conn对象来表示。

    下面大概分析一下conn的结构,(建议先大体看下各个字段的意义,具体到某个字段被使用时再详讲):

    1. typedef struct conn conn;
    2. struct conn {
    3. int sfd; //连接的socket fd
    4. sasl_conn_t *sasl_conn;
    5. bool authenticated;
    6. enum conn_states state; //当前的连接状态
    7. enum bin_substates substate;
    8. rel_time_t last_cmd_time;
    9. struct event event; // 监听的事件
    10. short ev_flags; //监听的事件 类型
    11. short which; /** which events were just triggered */ //刚触发的事件
    12.  
    13. /**
    14. 读buffer会涉及两个方向上的“读”:
    15. 一个是从socket读进来到rbuf里面
    16. 一个是从rbuf里面把数据读出去解析,读buffer相当于一个中介,从socket读进来最终还是得被别人读出去解析,而
    17. rcurr工作指针与rbytes就是在rbuf数据被读出去解析的时候用到,用来控制可以读出去被解析的数据还剩余多少。
    18. */
    19. char *rbuf; /** buffer to read commands into */ //读buffer
    20. char *rcurr; /** but if we parsed some already, this is where we stopped */ //读buffer的当前指针
    21. int rsize; /** total allocated size of rbuf */ //读buffer大小
    22. int rbytes; /** how much data, starting from rcur, do we have unparsed */ //剩余buffer字节数
    23.  
    24. //下面4个属性和上面4个类似
    25. char *wbuf;
    26. char *wcurr;
    27. int wsize;
    28. int wbytes;
    29.  
    30. /** which state to go into after finishing current write */
    31. enum conn_states write_and_go; //完成当前写操作后,连接状态将会置为此状态
    32. void *write_and_free; /** free this memory after finishing writing */
    33.  
    34. char *ritem; /** when we read in an item's value, it goes here */ //这个指针指向item结构体中data中的value地址
    35. int rlbytes; //尚未读完item的data的value的字节数
    36. void *item; /* for commands set/add/replace */ //当执行set/add/replace 命令时,此指针用于指向分配的item空间
    37.  
    38. /* data for the swallow state */
    39. int sbytes; /* how many bytes to swallow */
    40.  
    41. //下面是往socket写出数据时用的字段
    42. struct iovec *iov;
    43. int iovsize; /* number of elements allocated in iov[] */
    44. int iovused; /* number of elements used in iov[] */
    45. struct msghdr *msglist;
    46. int msgsize; /* number of elements allocated in msglist[] */
    47. int msgused; /* number of elements used in msglist[] */
    48. int msgcurr; /* element in msglist[] being transmitted now */
    49. int msgbytes; /* number of bytes in current msg */
    50. item **ilist; /* list of items to write out */
    51. int isize;
    52. item **icurr;
    53. int ileft;
    54. char **suffixlist;
    55. int suffixsize;
    56. char **suffixcurr;
    57. int suffixleft;
    58. enum protocol protocol; /* which protocol this connection speaks */
    59. enum network_transport transport; /* what transport is used by this connection */
    60.  
    61. //UDP相关的字段
    62. int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
    63. struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */
    64. socklen_t request_addr_size;
    65. unsigned char *hdrbuf; /* udp packet headers */
    66. int hdrsize; /* number of headers' worth of space is allocated */
    67. bool noreply; /* True if the reply should not be sent. */
    68.  
    69. /* current stats command */
    70. struct {
    71. char *buffer;
    72. size_t size;
    73. size_t offset;
    74. } stats;
    75.  
    76. // 二进制相关的字段
    77. protocol_binary_request_header binary_header;
    78. uint64_t cas; /* the cas to return */
    79. short cmd; /* current command being processed */
    80. int opaque;
    81. int keylen;
    82. conn *next; /* Used for generating a list of conn structures */
    83. LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
    84. };
    85.  
    86.  
    87.  
    88.  
    89. /* conn_states是一个枚举:*/
    90.  
    91. enum conn_states {
    92. conn_listening, /**< the socket which listens for connections */
    93. conn_new_cmd, /**< Prepare connection for next command */
    94. conn_waiting, /**< waiting for a readable socket */
    95. conn_read, /**< reading in a command line */
    96. conn_parse_cmd, /**< try to parse a command from the input buffer */
    97. conn_write, /**< writing out a simple response */
    98. conn_nread, /**< reading in a fixed number of bytes */
    99. conn_swallow, /**< swallowing unnecessary bytes w/o storing */
    100. conn_closing, /**< closing this connection */
    101. conn_mwrite, /**< writing out many items sequentially */
    102. conn_closed, /**< connection is closed */
    103. conn_max_state /**< Max state value (used for assertion) */
    104. };

    知道conn的意义之后,主线程和worker线程都调用conn_new监听fd并创建conn对象就合情合理了,大家都有conn对象,只是状态不一样,event_handler被触发,调用drive_machine,进入不一样的case完成不一样的操作。

    这句话压缩来说:“根据状态不同去做不同的事情”,这种工作方式其实就是下面要讲的“状态机”。

    三)状态机

    状态机drive_machine函数是worker线程网络请求进行业务逻辑处理的核心。

    它的实现方式是:

    一个while循环里面有一个巨大的switch case,根据连接对象 conn当前的连接状态conn_state,进入不同的case,而每个case可能会改变conn的连接状态,也就是说在这个while+switch中,conn会不断的发生状态转移,最后被分发到合适的case上作处理。可以理解为,这里是一个有向图,每个case是一个顶点,有些case通过改变conn对象的连接状态让程序在下一次循环中进入另一个case,几次循环后程序最终进入到“无出度的顶点”然后结束状态机,这里的无出度的顶点就是带设置stop=true的case分支。

    看下大概的代码结构:

    1. static void drive_machine(conn *c) {
    2.     while (!stop) {
    3.         switch(c->state) {
    4.            case conn_listening:
    5.                  //。。。。
    6.            case conn_waiting:
    7.                 //。。。
    8.                 stop = true; break;
    9.                //。。。
    10.         }
    11.    }
    12. }

    主线程状态机的行为我们已经知道了,永远只会是conn_listening状态,永远只会进入drive_machine的conn_listening分支,accept连接把client fd 通过dispatch_conn_new函数分发给worker线程。

    下面我们来看一下worker线程执行状态机:

    当主线程调用dispatch_conn_new的时候,worker线程创建conn对象,初始状态为conn_new_cmd。所以当有worker线程监听的client fd有请求过来时,例如客户端发了一行命令(set xxx )会进入conn_new_cmd分支:

    1.          case conn_new_cmd:
    2.             /*
    3.              这里的reqs是请求的意思,其实叫“命令”更准确。一次event发生,有可能包含多个命令,
    4.              从client fd里面read到的一次数据,不能保证这个数据只是包含一个命令,有可能是多个
    5.              命令数据堆在一起的一次事件通知。这个nreqs是用来控制一次event最多能处理多少个命令。
    6.             */
    7.             --nreqs;
    8.             if (nreqs >= 0) {
    9.                 /**
    10.                 准备执行命令。为什么叫reset cmd,reset_cmd_handler其实做了一些解析执行命令之前
    11.                 的初始化动下一个,都会重新进入这个case作。而像上面说的,一次event有可能有多个命令,每执行一个命令,如果还有
    12.                  conn_new_cmd,reset一下再执行下一个命令。
    13.                 */
    14.                 reset_cmd_handler(c);
    15.             } else {
    16. //。。。
    17.             }
    18.             break;

    上面的nreqs在这里暂不详细分析。当client fd第一次有请求过来的时候,会进入reset_cmd_handler函数:

    1.  static void reset_cmd_handler(conn *c) {
    2.     c->cmd = -1;
    3.     c->substate = bin_no_state;
    4.     if(c->item != NULL) {
    5.         item_remove(c->item);
    6.         c->item = NULL;
    7.     }
    8.     conn_shrink(c);
    9.  
    10.  //第一次有请求过来触发到此函数时,c->rbytes为0
    11.     if (c->rbytes > 0) {
    12.         conn_set_state(c, conn_parse_cmd);
    13.     } else {
    14.         conn_set_state(c, c
    15.             onn_waiting);  //第一次请求进入此分支
    16.     }
    17. }

    我们在conn_new函数里面把c->rbytes被始化为0,而直至此我们也没有看到这个c->rbytes有被重新赋新值,所以其实第一次有请求过来,这个值还是0,所以进入else分支,即执行conn_set_state(c,conn_waiting);然后重新回到状态机执行下一次循环,进入conn_waiting分支:

    1. case conn_waiting:
    2. if (!update_event(c, EV_READ | EV_PERSIST)) {
    3. //。。。
    4. }
    5. conn_set_state(c, conn_read);
    6. stop = true;
    7. break;

    在conn_waiting分支你会发现,这里的代码仅仅是把状态改变conn_read然后就stop=true,结束状态机了!没错,退出while循环了!这次事件触发就此结束了!
    你会觉得很奇怪,我客户端明明发了一个请求,(set xxx ),你什么都没处理就只是把连接状态改成conn_read就完事了?!没错,至少这一次状态机的执行行为是这样!

    到底是怎么回事?其实这里是利用了一点:libevent的epoll默认是“水平触发”!也就是说,客户端发来一个set xxx ,我这边一天没有read,epoll还会有下一次通知,也就是说,这个请求有两次事件通知!第一次通知的作用仅是为了把连接状态改为conn_read! 当worker线程因为同一个client fd同一个请求收到第二次通知的时候,再次执行状态机,然后进入conn_read分支。

    为了验证这一点,我在drive_machine函数代码执行的开头处打了一下log:

    1. static void drive_machine(conn *c) {
    2. fprintf(stderr, "event arrive! ");

    然后重新编译memcached运行,测试一下是否worker线程事件通知发生了两次(左边是服务端,右边是客户端):

    客户端telnet发起连接,event_base通知主线程,所以这里会有一次调用drive_machine的情况:

    6619078690909560085

    客户端输入“set testkey 0 0 4”的命令后:

    6608929099073920313

    可以看到当服务端收到命令后,先利用第一次事件通知(上面图中的第二个event arrive)把状态置为conn_read,然后等待第二次事件通知。非常快地,第二次事件通知就到达(上面图中的第三个event arrive),然后进入conn_read状态继续执行。

    下面我们看一下收到第二次通知的时候进入conn_read分支后的代码:

    1.          case conn_read:
    2.             res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
    3.             switch (res) {
    4.             case READ_NO_DATA_RECEIVED:
    5.                 conn_set_state(c, conn_waiting);
    6.                 break;
    7.             case READ_DATA_RECEIVED:
    8.                 conn_set_state(c, conn_parse_cmd);
    9.                 break;
    10.             case READ_ERROR:
    11.                 conn_set_state(c, conn_closing);
    12.                 break;
    13.             case READ_MEMORY_ERROR:
    14.                 break;
    15.             }
    16.             break;

    进入conn_read此时才调用函数try_read_network函数读出请求(set xxx ):

    1.  static enum try_read_result try_read_network(conn *c) {
    2.     enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
    3.     int res;
    4.     int num_allocs = 0;
    5.     assert(c != NULL);
    6.     if (c->rcurr != c->rbuf) {
    7.         if (c->rbytes != 0) /* otherwise there's nothing to copy */
    8.             memmove(c->rbuf, c->rcurr, c->rbytes);
    9.         c->rcurr = c->rbuf;
    10.     }
    11.     while (1) {
    12.         if (c->rbytes >= c->rsize) {//读buffer空间扩充
    13.             //。。。
    14.         }
    15.         int avail = c->rsize - c->rbytes; //读buffer的空间还剩余多少大小可以用
    16.         res = read(c->sfd, c->rbuf + c->rbytes, avail); //往剩下的可用的地方里塞
    17.         if (res > 0) {
    18.             gotdata = READ_DATA_RECEIVED;
    19.             /**
    20.             rbytes是当前指针rcurr至读buffer末尾的数据大小,这里可简单地理解为对rbytes的初始化。
    21.             */
    22.             c->rbytes += res;
    23.             if (res == avail) { //可能还没读完,此时读buffer可用空间满了,那么下次循环会进行读buffer空间扩充
    24.                 continue;
    25.             } else {
    26.                 break; //socket的可读数据都读完了
    27.             }
    28.         }
    29. //。。。
    30.     }
    31.     return gotdata;
    32. }

    try_read_network函数就是从socket中把数据读到c->rbuf中去而已,同时初始化一些变量例如rbytes等,读取数据成功则返回READ_DATA_RECEIVED,状态机 conn_set_state(c, conn_parse_cmd);进入conn_parse_cmd状态:

    1.   case conn_parse_cmd :
    2.             /**
    3.             try_read_network后,到达conn_parse_cmd状态,但try_read_network并不确保每次到达
    4.             的数据都足够一个完整的cmd(ascii协议情况下往往是没有" ",即回车换行),
    5.             所以下面的try_read_command之所以叫try就是这个原因,
    6.             当读到的数据还不够成为一个cmd的时候,返回0,conn继续进入conn_waiting状态等待更多的数据到达。
    7.             */
    8.             if (try_read_command(c) == 0) {
    9.                 /* wee need more data! */
    10.                 conn_set_state(c, conn_waiting);
    11.             }
    12.             break;

    进行conn_parse_cmd主要是调用try_read_command函数读取命令,上面注释也说明了数据不够一个cmd的情况,下面我们进入try_read_command,看看try_read_command不返回0时,也就是足够一个cmd后是怎么解析这个cmd的(只说明tcp ascii协议的情况):

    1. static int try_read_command(conn *c) {
    2. char *el, *cont;
    3.         if (c->rbytes == 0)  //读buffer没有待解析的数据
    4.             return 0;
    5.         el = memchr(c->rcurr, ' ', c->rbytes); //找第一个命令的末尾,即换行符
    6.         if (!el) {
    7.             //。。。
    8.             /*
    9.             如果没有找到换行符,则说明读到的数据还不足以成为一个完整的命令,
    10.             返回0
    11.             */
    12.             return 0;
    13.         }
    14.         cont = el + 1; //下一个命令的开头
    15.         /*
    16.         下面这个if的作用是把el指向当前命令最后一个有效字符的下一个字符,即
    17.         目的是为了在命令后面插上一个,字符串结束符。
    18.         例如 GET abc ******,变成GET abc *****,这样以后读出的字符串就是一个命令。
    19.         */
    20.         if ((el - c->rcurr) > 1 && *(el - 1) == ' ') {
    21.             el--;
    22.         }
    23.         *el = '';
    24.  
    25.         c->last_cmd_time = current_time;
    26.         process_command(c, c->rcurr); //执行命令。分析详见process_command
    27.         //当前命令执行完之后,把当前指针rcurr指向 下一个命令的开头,并调用rbytes(剩余未处理字节数大小)
    28.         //逻辑上相当于把已处理的命令去掉。
    29.         c->rbytes -= (cont - c->rcurr);
    30.         c->rcurr = cont;
    31.     }
    32.     return 1;
    33. }

    上面try_read_command把命令读出(其实只是简单地找出一个完整的命令,在后面加个而已)。

    在这里插一下memcached的SET命令的协议,或者你可以看memcached/doc/protocol.txt中的说明:

    完成一个SET命令,其实需要两行,也就是需要按两次回车换行“ ”,第一行叫“命令行”,格式是SET key flags exptime bytes ,如SET name 0 0 5 , 键为name,flags标志位可暂时不管,超时设为0,value的字节长度是4。然后才有第二行叫“数据行”,格式为:value ,例如:calix 。这两行分别敲下去,SET命令才算完成。

    所以处理SET命令时上面的try_read_command首先处理的是SET name 0 0 5 这个“命令行”。

    看看进入process_command函数如何执行:

    1. /**
    2. 这里就是对命令的解析和执行了
    3. (其实准确来说,这里只是执行了命令的一半(例如如果是SET命令,则是“命令行”部分),
    4. 然后根据命令类型再次改变conn_state使程序再次进入状态机,完成命令的
    5. 另一半工作,后面详说)
    6. command此时的指针值等于conn的rcurr
    7. */
    8. static void process_command(conn *c, char *command) {
    9.     token_t tokens[MAX_TOKENS];
    10.     size_t ntokens;
    11.     int comm; //命令类型
    12.     c->msgcurr = 0;
    13.     c->msgused = 0;
    14.     c->iovused = 0;
    15.     if (add_msghdr(c) != 0) {
    16.         out_of_memory(c, "SERVER_ERROR out of memory preparing response");
    17.         return;
    18.     }
    19.     /**
    20.     下面这个tokenize_command是一个词法分析,把command分解成一个个token
    21.     */
    22.     ntokens = tokenize_command(command, tokens, MAX_TOKENS);
    23.     //下面是对上面分解出来的token再进行语法分析,解析命令,下面的comm变量为最终解析出来命令类型
    24.     if (ntokens >= 3 &&
    25.         ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
    26.          (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
    27.         process_get_command(c, tokens, ntokens, false);
    28.     } else if ((ntokens == 6 || ntokens == 7) &&
    29.                ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
    30.                 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
    31.                 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
    32.                 (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
    33.                 (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
    34.         //add/set/replace/prepend/append为“更新”命令,调用同一个函数执行命令。详见process_update_command定义处
    35.         process_update_command(c, tokens, ntokens, comm, false);
    36.     }
    37.    //。。。  
    38. }

    上面的代码可以看出首先我们要对命令进行“解析”,词法语法分析等等(属于编译原理知识,在这不详讲),最终我们的set name 0 0 5 命令会进入process_update_command函数中执行:

    static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
        if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
            out_string(c, "CLIENT_ERROR bad command line format"); //key过长,out_string函数的作用是输出响应,
            //详见out_string定义处
            return;
        }
        key = tokens[KEY_TOKEN].value; //键名
        nkey = tokens[KEY_TOKEN].length; //键长度
        //下面这个if同时把命令相应的参数(如缓存超时时间等)赋值给相应变量:exptime_int等
        if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
               && safe_strtol(tokens[3].value, &exptime_int)
               && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
            out_string(c, "CLIENT_ERROR bad command line format");
            return;
        }
        exptime = exptime_int;
        if (exptime < 0)
            exptime = REALTIME_MAXDELTA + 1;
        //在这里执行内存分配工作。详见内存管理篇
        it = item_alloc(key, nkey, flags, realtime(exptime), vlen); 
        ITEM_set_cas(it, req_cas_id);
        c->item = it; //将item指针指向分配的item空间
        c->ritem = ITEM_data(it); //将 ritem 指向 it->data中要存放 value 的空间地址
        c->rlbytes = it->nbytes; //data的大小
        c->cmd = comm; //命令类型
        conn_set_state(c, conn_nread); //继续调用状态机,执行命令的另一半工作。
    }

    process_update_command函数最终执行了item_alloc为我们要set的数据(称为item)分配了内存。同时,为c对象赋了相应的一些值。

    但是其实这里仅仅是为item分配了空间,还没有把value塞进去,因为我们仅仅执行了SET命令的“命令行“部分,根据“命令行”部分的信息分配空间。代码最后一行看到在这里,我们又把c的状态变成了conn_nread,等“数据行”达到,epoll事件触发状态机下一次循环进入conn_nread分支,其实就是完成SET命令的第二部分,读出“数据行”:

    1.  case conn_nread:
    2.             /**
    3.             由process_update_command执行后进入此状态,process_update_command函数只执行了add/set/replace 等命令的一半,
    4.             剩下的一半由这里完成。
    5.             例如如果是上面的set命令,process_update_command只完成了“命令行”部分,分配了item空间,
    6. 但还没有把value塞到对应的 item中去。因此,在这一半要完成的动作就是把value的数据从socket中读出来,
    7. 塞到刚拿到的item空间中去
    8.             */
    9.             /*
    10.             下面的rlbytes字段表示要读的“value数据”还剩下多少字节 (注意与"rbytes"的区别)
    11.             如果是第一次由process_update_command进入到此,rlbytes此时在process_update_command中被初始化为item->nbytes,
    12. 即value的总字节数,SET name 0 0 5 中的5。
    13.             */
    14.             if (c->rlbytes == 0) {
    15.                 /**
    16.                 注意rlbytes为0才读完,否则状态机一直会进来这个conn_nread分支继续读value数据,
    17. 读完就调用complete_nread完成收尾工作,程序会跟着complete_nread进入下一个
    18.                 状态。所以执行完complete_nread会break;
    19.                 */
    20.                 complete_nread(c);
    21.                 break;
    22.             }
    23.             //如果还有数据没读完,继续往下执行。可知,下面的动作就是继续从buffer中读value数据往item中的data的value位置塞。
    24.  
    25.             if (c->rbytes > 0) {
    26.                 /**
    27.                  进入到这个if,是因为有可能先前读到的buffer已经有“数据行”部分,因为一次事件通知,
    28. 不保证socket可读数据只有一个 。
    29.                */
    30.                 /**
    31.                 取rbytes与rlbytes中最小的值。
    32.                 为啥?
    33.                 因为这里我们的目的是剩下的还没读的value的字节,而rlbytes代表的是还剩下的字节数
    34.                 如果rlbytes比rbytes小,只读rlbytes长度就够了,rbytes中多出来的部分不是我们这个时候想要的
    35.                 如果rbytes比rlbytes小,即使你要rlbytes这么多,但buffer中没有这么多给你读。
    36.                 */
    37.                 int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
    38.                 if (c->ritem != c->rcurr) {
    39.                     memmove(c->ritem, c->rcurr, tocopy); //往分配的item中塞,即为key设置value的过程
    40.                 }
    41.                 c->ritem += tocopy;
    42.                 c->rlbytes -= tocopy;
    43.                 c->rcurr += tocopy;
    44.                 c->rbytes -= tocopy;
    45.                 if (c->rlbytes == 0) {
    46.                     break;
    47.                 }
    48.             }
    49.             //这里往往是我们先前读到buffer的数据还没足够的情况下,从socket中读。
    50.             res = read(c->sfd, c->ritem, c->rlbytes);//往分配的item中塞,即为key设置value的过程
    51.             if (res > 0) {
    52.                 if (c->rcurr == c->ritem) {
    53.                     c->rcurr += res;
    54.                 }
    55.                 c->ritem += res;
    56.                 c->rlbytes -= res;
    57.                 break;
    58.             }

    上面主要通过这一行 res = read(c->sfd, c->ritem, c->rlbytes); 把value塞到刚分配出来的item空间,完成“数据行”部分的工作,逻辑上就是对key“赋值”。赋值结束后,调用complete_nread做一些收尾的工作,在本篇“状态机”篇只需了解它的作用是向客户端输出命令执行结果(即往socket写“STORED”):

    1.  static void complete_nread(conn *c) {
    2. //。。。
    3.         complete_nread_ascii(c);
    4. //。。。
    5. }
    6.  
    7. static void complete_nread_ascii(conn *c) {
    8.      ret = store_item(it, comm, c);
    9.      switch (ret)
    10.      {
    11.       case STORED:
    12.           out_string(c, "STORED");
    13.           break;
    14.       //。。。
    15.       }
    16.     //。。。
    17. }
    18.  
    19. static void out_string(conn *c, const char *str) {
    20.     size_t len;
    21.     c->msgcurr = 0;
    22.     c->msgused = 0;
    23.     c->iovused = 0;
    24.     add_msghdr(c);
    25.     len = strlen(str);
    26.  
    27.     memcpy(c->wbuf, str, len);
    28.     memcpy(c->wbuf + len, " ", 2);
    29.     c->wbytes = len + 2;
    30.     c->wcurr = c->wbuf;
    31.  
    32.     conn_set_state(c, conn_write);
    33.     c->write_and_go = conn_new_cmd;
    34.     return;
    35. }

    进入状态机conn_write状态进行输出:

    1.  
    2.         case conn_write:
    3.            //。。。
    4.            /* fall through... */
    5.         case conn_mwrite:
    6.               transmit(c);
    7.            //。。。
    8.  
    9.  
    10. static enum transmit_result transmit(conn *c) {
    11.     //。。。
    12.     res = sendmsg(c->sfd, m, 0);
    13.    //。。。
    14. }

    最后通过调用sendmsg把我们的”STORED”字符串响应给客户端。

    附上 处理 SET 命令状态机的状态转换图:

    drive_machine

    本文中我们分析了memcached是怎么利用状态机的方式对请求进行解析和处理,以及SET命令的代码实现细节。而在执行SET命令的时候,我们知道会调用item_alloc函数给数据分配空间,而到底item_alloc背后是怎么实现的?请看下一篇:《Memcached源码分析之内存管理》

  • 相关阅读:
    备份的数据库存儲過程
    用反射调用任意.net库中的方法
    基于.NET的多线程编程入门
    手写分页函数C#
    .net中的泛型
    prototype.js开发笔记
    此方法用于确认用户输入的不是恶意信息
    利用DataSet、DataTable、DataView按照自定义条件过滤数据
    读取文件列表
    注册客户端脚本
  • 原文地址:https://www.cnblogs.com/guolanzhu/p/5850099.html
Copyright © 2011-2022 走看看