转载请注明出处:http://blog.csdn.net/luotuo44/article/details/44236591
前一篇博文以get命令为样例把整个处理流程简单讲述了一遍。本篇博文将以set命令具体讲述memcached的处理流程。
具体的命令为“set tt 3 0 10”。并如果当然memcachedserver没有名为tt的item。
读取命令:
在前一篇博文的最后,conn的状态被设置为conn_new_cmd,回到了一開始的状态。
假设此时conn结构体里面的buff还有其它命令,或者该client的socket缓冲区里面还有数据(命令),那么就会继续处理命令而不会退出drive_machine函数。处理完后。又会回到conn_new_cmd状态。
《半同步半异步网络模型》指明了memcached是通过worker线程运行client的命令,而且一个worker线程要处理多个client的命令。假设某一个恶意的client发送了大量的get命令,那么worker线程将不断地反复前一篇博文讲述的处理流程。换言之,worker线程将困死在drive_machine里面不能出来。这造成的后果是导致该worker线程负责的其它client处于饥饿状态,由于它们的命令得不到处理(要退出drive_machine才干知道其它client也发送了命令,进而进行处理)。
为了避免client发现饥饿现象,memcached的解决方法是:worker线程连续处理某一个client的命令数不能超过一个特定值。
这个特定值由全局变量settings.reqs_per_event确定(默认值是20), 能够在启动memcached的时候通过命令行參数设置。具体參考《memcached启动參数具体解释以及关键配置的默认值》。
static void drive_machine(conn *c) { bool stop = false; int nreqs = settings.reqs_per_event;//20 assert(c != NULL); //drive_machine被调用会进行状态推断,并进行一些处理。但也可能发生状态的转换 //此时就须要一个循环,当进行状态转换时。也能处理 while (!stop) { switch(c->state) { case conn_new_cmd: --nreqs; if (nreqs >= 0) { //假设该conn的读缓冲区没有数据,那么将状态改成conn_waiting //假设该conn的读缓冲区有数据, 那么将状态改成conn_pase_cmd reset_cmd_handler(c); } else { if (c->rbytes > 0) { /* We have already read in data into the input buffer, so libevent will most likely not signal read events on the socket (unless more data is available. As a hack we should just put in a request to write data, because that should be possible ;-) */ if (!update_event(c, EV_WRITE | EV_PERSIST)) { if (settings.verbose > 0) fprintf(stderr, "Couldn't update event "); conn_set_state(c, conn_closing); break; } } stop = true; } break; } } return; }
从上面代码能够得知。假设某个client的命令数过多,会被memcached强制退出drive_mahcine。假设该client的socket里面还有数据而且是libevent是水平触发的,那么libevent会自己主动触发事件,能再次进入drive_mahcine函数。
但假设该client的命令都读进conn结构体的读缓冲区,那么就必须等到client再次发送命令,libevent才会触发。
但client一直不再发送命令了呢?为了解决问题,memcached採用了一种非常巧妙的处理方法:为这个clientsocket设置可写事件。除非clientsocket的写缓冲区已满,否则libevent都会为这个client触发事件。事件一触发。那么worker线程就会进入drive_machine函数处理这个client的命令。
当然我们如果nreqs大于0,然后看一下reset_cmd_handler函数。该函数会推断conn的读缓冲区是否还有数据。此外,该函数另一个关键的数据:调节conn缓冲区的大小。前一篇博文已经说到,memcached会尽可能把clientsocket里面的数据读入conn的读缓冲区。这样的特性会撑大conn的读缓冲区。除了读缓冲区,用于回写数据的iovec和msghdr数组也会被撑大,这也要收缩。由于是在处理完一条命令后才进行的收缩,所以收缩不会导致数据的丢失。
写缓冲区呢?不须要收缩写缓冲区吗。conn结构体也是有写缓冲区的啊?这是由于写缓冲区不会被撑大。从前一篇博文的回应命令能够知道,回应命令时并没有使用到写缓冲区。写缓冲区是在向client返回错误信息时才会用到的,而错误信息不会太大。也就不会撑大写缓冲区了。
struct conn { int sfd;//该conn相应的socket fd sasl_conn_t *sasl_conn; bool authenticated; enum conn_states state;//当前状态 enum bin_substates substate; rel_time_t last_cmd_time; struct event event;//该conn相应的event short ev_flags;//event当前监听的事件类型 short which; /** which events were just triggered */ //触发event回调函数的原因 //读缓冲区 char *rbuf; /** buffer to read commands into */ //有效数据的開始位置。从rbuf到rcurr之间的数据是已经处理的了。变成无效数据了 char *rcurr; /** but if we parsed some already, this is where we stopped */ //读缓冲区的长度 int rsize; /** total allocated size of rbuf */ //有效数据的长度 int rbytes; /** how much data, starting from rcur, do we have unparsed */ char *wbuf; char *wcurr; int wsize; int wbytes; /** which state to go into after finishing current write */ enum conn_states write_and_go; void *write_and_free; /** free this memory after finishing writing */ //数据直通车 char *ritem; /** when we read in an item's value, it goes here */ int rlbytes; /* data for the nread state */ /** * item is used to hold an item structure created after reading the command * line of set/add/replace commands, but before we finished reading the actual * data. The data is read into ITEM_data(item) to avoid extra copying. */ void *item; /* for commands set/add/replace */ /* data for the swallow state */ int sbytes; /* how many bytes to swallow */ /* data for the mwrite state */ //ensure_iov_space函数会扩大数组长度.以下的msglist数组所使用到的 //iovec结构体数组就是iov指针所指向的。所以当调用ensure_iov_space //分配新的iovec数组后,须要又一次调整msglist数组元素的值。这个调整 //也是在ensure_iov_space函数里面完毕的 struct iovec *iov;//iovec数组指针 //数组大小 int iovsize; /* number of elements allocated in iov[] */ //已经使用的数组元素个数 int iovused; /* number of elements used in iov[] */ //由于msghdr结构体里面的iovec结构体数组长度是有限制的。所以为了能 //传输很多其它的数据,仅仅能添加msghdr结构体的个数.add_msghdr函数负责添加 struct msghdr *msglist;//msghdr数组指针 //数组大小 int msgsize; /* number of elements allocated in msglist[] */ //已经使用了的msghdr元素个数 int msgused; /* number of elements used in msglist[] */ //正在用sendmsg函数传输msghdr数组中的哪一个元素 int msgcurr; /* element in msglist[] being transmitted now */ //msgcurr指向的msghdr总共同拥有多少个字节 int msgbytes; /* number of bytes in current msg */ //worker线程须要占有这个item。直至把item的数据都写回给client了 //故须要一个item指针数组记录本conn占有的item item **ilist; /* list of items to write out */ int isize;//数组的大小 item **icurr;//当前使用到的item(在释放占用item时会用到) int ileft;//ilist数组中有多少个item须要释放 enum protocol protocol; /* which protocol this connection speaks */ enum network_transport transport; /* what transport is used by this connection */ bool noreply; /* True if the reply should not be sent. */ /* current stats command */ ... conn *next; /* Used for generating a list of conn structures */ LIBEVENT_THREAD *thread;//这个conn属于哪个worker线程 }; static void reset_cmd_handler(conn *c) { c->cmd = -1; c->substate = bin_no_state; if(c->item != NULL) {//conn_new_cmd状态下,item为NULL item_remove(c->item); c->item = NULL; } conn_shrink(c); if (c->rbytes > 0) {//读缓冲区里面有数据 conn_set_state(c, conn_parse_cmd);//接着去解析读到的数据 } else { conn_set_state(c, conn_waiting);//否则等待数据的到来 } } #define DATA_BUFFER_SIZE 2048 /** Initial size of list of items being returned by "get". */ #define ITEM_LIST_INITIAL 200 /** Initial size of list of CAS suffixes appended to "gets" lines. */ #define SUFFIX_LIST_INITIAL 20 /** Initial size of the sendmsg() scatter/gather array. */ #define IOV_LIST_INITIAL 400 /** Initial number of sendmsg() argument structures to allocate. */ #define MSG_LIST_INITIAL 10 /** High water marks for buffer shrinking */ #define READ_BUFFER_HIGHWAT 8192 #define ITEM_LIST_HIGHWAT 400 #define IOV_LIST_HIGHWAT 600 #define MSG_LIST_HIGHWAT 100 //收缩到初始大小 static void conn_shrink(conn *c) { assert(c != NULL); if (IS_UDP(c->transport)) return; //c->rbytes指明了当前读缓冲区有效数据的长度。当其小于DATA_BUFFER_SIZE //才进行读缓冲区收缩。所以不会导致client命令数据的丢失。
if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) { char *newbuf; if (c->rcurr != c->rbuf) memmove(c->rbuf, c->rcurr, (size_t)c->rbytes); newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE); if (newbuf) { c->rbuf = newbuf; c->rsize = DATA_BUFFER_SIZE; } /* TODO check other branch... */ c->rcurr = c->rbuf; } if (c->isize > ITEM_LIST_HIGHWAT) { item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0])); if (newbuf) { c->ilist = newbuf; c->isize = ITEM_LIST_INITIAL; } /* TODO check error condition? */ } if (c->msgsize > MSG_LIST_HIGHWAT) { struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0])); if (newbuf) { c->msglist = newbuf; c->msgsize = MSG_LIST_INITIAL; } /* TODO check error condition? */ } if (c->iovsize > IOV_LIST_HIGHWAT) { struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0])); if (newbuf) { c->iov = newbuf; c->iovsize = IOV_LIST_INITIAL; } /* TODO check return value */ } }
读取数据:
我们如果conn的读缓冲区里面没有数据,此时conn的状态被设置为conn_waiting,等待client发送命令数据。
如果client发送数据过来。libevent将检測到clientsocket变成可读,然后进入在libevent的回调函数中调用drive_machine函数,进入有限状态机。在有限状态机里面。conn的状态会被设置为conn_read。
接着在conn_read case中,memcached会把client发送的命令数据尽可能地读入到conn的读缓冲区中。
当然为了防止没有恶意的client,memcached也是有限度的:仅仅撑大读缓冲区4次。这对于正常的client命令来说已经是足够的了。
static void drive_machine(conn *c) { bool stop = false; int res; assert(c != NULL); //drive_machine被调用会进行状态推断,并进行一些处理。但也可能发生状态的转换 //此时就须要一个循环,当进行状态转换时,也能处理 while (!stop) { switch(c->state) { case conn_waiting://等待socket变成可读的 if (!update_event(c, EV_READ | EV_PERSIST)) {//更新监听事件失败 if (settings.verbose > 0) fprintf(stderr, "Couldn't update event "); conn_set_state(c, conn_closing); break; } conn_set_state(c, conn_read); stop = true;//竟然stop循环。只是没关系,由于event的可读事件是水平触发的 break; case conn_read: res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c); switch (res) { case READ_NO_DATA_RECEIVED://没有读取到数据 conn_set_state(c, conn_waiting);//等待 break; case READ_DATA_RECEIVED://读取到了数据,接着就去解析数据 conn_set_state(c, conn_parse_cmd); break; case READ_ERROR://read函数的返回值等于0或者-1时。会返回这个值 conn_set_state(c, conn_closing);//直接关闭这个client break; case READ_MEMORY_ERROR: /* Failed to allocate more memory */ /* State already set by try_read_network */ break; } break; case conn_parse_cmd : //返回1表示正在处理读取的一条命令 //返回0表示须要继续读取socket的数据才干解析命令 //假设读取到了一条完整的命令。那么函数内部会去解析, //并进行调用process_command函数进行一些处理. //像set、add、replace这些命令。会在处理的时候调用 //conn_set_state(c, conn_nread) if (try_read_command(c) == 0) { /* wee need more data! */ conn_set_state(c, conn_waiting); } break; } } return; } //尽可能把socket的全部数据都读进c指向的一个缓冲区里面 static enum try_read_result try_read_network(conn *c) { enum try_read_result gotdata = READ_NO_DATA_RECEIVED; int res; int num_allocs = 0; assert(c != NULL); if (c->rcurr != c->rbuf) { //rcurr 和 rbuf之间是一条已经解析了的命令。如今能够丢弃了 if (c->rbytes != 0) /* otherwise there's nothing to copy */ memmove(c->rbuf, c->rcurr, c->rbytes); c->rcurr = c->rbuf; } while (1) { //由于本函数会尽可能把socket数据都读取到rbuf指向的缓冲区里面, //所以可能出现当前缓冲区不够大的情况(即rbytes>=rsize) if (c->rbytes >= c->rsize) { //可能有坏蛋发无穷无尽的数据过来,而本函数又是尽可能把全部数据都 //读进缓冲区。为了防止坏蛋耗光server的内存,所以就仅仅分配4次内存 if (num_allocs == 4) { return gotdata; } ++num_allocs; char *new_rbuf = realloc(c->rbuf, c->rsize * 2); if (!new_rbuf) { //尽管分配内存失败,但realloc保证c->rbuf还是合法可用的指针 c->rbytes = 0; /* ignore what we read */ out_of_memory(c, "SERVER_ERROR out of memory reading request"); c->write_and_go = conn_closing;//关闭这个conn return READ_MEMORY_ERROR; } c->rcurr = c->rbuf = new_rbuf; c->rsize *= 2; } int avail = c->rsize - c->rbytes; res = read(c->sfd, c->rbuf + c->rbytes, avail); if (res > 0) { pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.bytes_read += res;//记录该线程读取了多少字节 pthread_mutex_unlock(&c->thread->stats.mutex); gotdata = READ_DATA_RECEIVED; c->rbytes += res; if (res == avail) {//可能还有数据没有读出来 continue; } else { break;//socket临时还没数据了(即已经读取完) } } if (res == 0) { return READ_ERROR; } if (res == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { break; } return READ_ERROR; } } return gotdata; }
假设conn没有读取到clientsocket的数据,那么conn的状态又会设置为conn_waiting(等待数据状态)。假设读取到数据后,就会把状态设置为conn_parse_cmd,接着就会去解析该数据。因为网络原因,可能这一次并没有接收到完整的一条命令。在解析命令的时候会发现这样的情况。此时将conn的状态设置为conn_waiting。再次等待socket数据。
解析命令:
通信协议:
在解说memcached怎么解析命令前,先说一下memcached的通信协议。平时使用的都是”sett 3 0 10”这种命令形式,还真不知道有什么通信协议。事实上memcached同一时候支持文本协议和二进制这两种协议,memcached同意client使用二进制和文本两种通信协议中的一种。平时我们使用的是文本协议。之所以我们不须要显式地选择某一种协议,是由于client选择哪种协议,由client第一次发送的命令确定(一旦确定就不能更改)。Memcached推断client选定哪种协议的方法也非常easy:推断命令的第一个字符。假设第一个字符等于128,那么就是二进制协议,否则就是文本协议。
这样行得通,是由于文本协议中不论什么字符(ascii码)都不会取128这个值。本文仅仅解说文本协议。
推断命令的完整性:
在详细解析client命令的内容之前,还须要做一个工作:推断是否接收到完整的一条命令。Memcached推断的方法也简单:假设接收的数据中包括换行符就说明接收到完整的一条命令,否则就不完整,须要又一次读取clientsocket(把conn状态设置为conn_waiting)。
因为不同的平台对于行尾有不同的处理,有的为” ”,有的为” ”。memcached必须处理这样的情况。Memcached的解决方式是:不管它!直接把命令最后一个字符的后一个字符(the character past the end of the command)改为’ ’,这样命令数据就变成一个C语言的字符串了。更巧妙的是。memcached还用一个暂时变量指向’ ’字符的下一个字符。
这样,不管行尾是”
”还是”
”都不重要了。
static int try_read_command(conn *c) { assert(c != NULL); assert(c->rcurr <= (c->rbuf + c->rsize)); assert(c->rbytes > 0); //memcached支持文本和二进制两种协议。对于TCP这种有连接协议,memcached为该 //fd分配conn的时候,并不指明其是用哪种协议的。此时用negotiating_prot代表待 //协商的意思(negotiate是谈判、协商)。而是在client第一次发送数据给 //memcached的时候用第一个字节来指明.之后的通信都是使用指明的这种协议。 //对于UDP这种无连接协议,指明每次都指明使用哪种协议了 if (c->protocol == negotiating_prot || c->transport == udp_transport) { //对于TCP仅仅会进入该推断体里面一次,而UDP就要次次都进入了 //PROTOCOL_BINARY_REQ为0x80,即128。对于ascii的文本来说。是不会取这个值的 if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) { c->protocol = binary_prot; } else { c->protocol = ascii_prot; } } if (c->protocol == binary_prot) { ...//二进制协议。这里不展开解说 } else {//文本协议 char *el, *cont; if (c->rbytes == 0)//读缓冲区里面没有数据,被耍啦 return 0;//返回0表示须要继续读取socket的数据才干解析命令 el = memchr(c->rcurr, ' ', c->rbytes); if (!el) {//没有找到 ,说明没有读取到一条完整的命令 if (c->rbytes > 1024) {//接收了1024个字符都没有回车符,值得怀疑 /* * We didn't have a ' ' in the first k. This _has_ to be a * large multiget, if not we should just nuke the connection. */ char *ptr = c->rcurr; while (*ptr == ' ') { /* ignore leading whitespaces */ ++ptr; } if (ptr - c->rcurr > 100 || //太多的空格符 (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {//是get或者gets命令,但一次获取太多信息了 conn_set_state(c, conn_closing);//必须干掉这种扯蛋的connclient return 1; } } return 0;//返回0表示须要继续读取socket的数据才干解析命令 } //来到这里,说明已经读取到至少一条完整的命令 cont = el + 1;//用cont指向下一行的開始。不管行尾是 还是 //不同的平台对于行尾有不同的处理,有的为 有的则是 。
所以memcached //还要推断一下 前面的一个字符是否为 if ((el - c->rcurr) > 1 && *(el - 1) == ' ') { el--;//指向行尾的開始字符 } //'