首先对状态机中的各种状态做个简单总结,具体可见状态转换示意图:
1.listening:这个状态是主线程的默认状态,它只有这一个状态:负责监听socket,接收客户连接,将连接socket派发给工作线程。
2.conn_new_cmd:每个工作线程的接收到新连接的初始状态,为处理该连接socket准系列准备工作:如清空读写buf等。
3.conn_waiting:当读缓冲区无数据可以处理时,工作线进入等待状态:在event_base中注册读事件,然后状态机暂停,挂起当前connection(函数退出,回调函数的attachment会记录这个connection),等待有新的信息过来,然后通过回调函数的attachment重新找到这个connection,然后启动状态机。
4.conn_read:有网络数据到来,工作线程从sfd中读取客户端的指令信息。
5.conn_parse_cmd:解析读缓冲中的数据,得到客户端的具体指令,如果是update的指令,那么需要跳转到conn_nread中,因为需要在从网络中读取固定byte的数据,如果是查询之类的指令,就直接查询完成后,跳转到conn_mwrite中,返回数据到客户端。
6.conn_nread:从网络中读取指定大小的数据,这个数据就是更新到item的数据,然后将数据更新到hash和lru中去,然后跳转到conn_write,向客户端返回处理结果状态信息。
7.conn_write:这个状态主要是调用out_string函数会跳转到这个状态,一般都是提示信息和返回的状态信息,然后输出这些数据,然后根据write_to_go的状态,继续跳转
8.conn_mwrite:这个写是把查询msglist(具体item数据)返回到客户端,这个msglist存的是item的数据,用于那种get等获得item的操作的返回数据。
9.conn_swallow:对于那种update操作,如果分配item失败,显然后面的nread,是无效的,客户端是不知道的,这样客户端继续发送特定的数量的数据,就需要把读到的这些数据忽略掉,然后如果把后面指定的数据都忽略掉了(set的两部提交,数据部分忽略掉),那么conn跳转到conn_new_cmd,如果读nread的那些特定数量的数据没有读到,直接跳转到conn_closing。
10.conn_closing:服务器端主动关闭连接,调用close函数关闭文件描述符,同时把conn结构体放到空闲队列CQ中,供新的连接重用这写conn结构体。
接下来就主要分析conn_nread、conn_mwrite、conn_mwrite等这几个跟具体操作指令如get、update等相关的状态。
这些指令都跟底层的数据item的操作相关,如将某个key对应的item从lru链表中删除
(移动对应的空闲链表上)同时在哈希表中删除该item等操作。这部分可参考前面的《slab和item的主要操作》
以及更前面的关于《哈希表》、《lab内存管理》等分析。
当状态机解析出客户端具体指令后就转而执行其指令操作。
首先解析得到SET指令:
//二进制协议,具体解析函数
dispatch_bin_command(conn *c) {
//....
//...
switch (c->cmd) {
case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */
case PROTOCOL_BINARY_CMD_ADD: /* FALLTHROUGH */
case PROTOCOL_BINARY_CMD_REPLACE:
if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) {
bin_read_key(c, bin_reading_set_header, 8);
} else {
protocol_error = 1;
}
break;
}
其中函数bin_read_key中设置状态为conn_nread,使得状态机从网络读取指定字节的数据,用以SET或ADD或REPLACE对应的item数据。
static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
//...
//...
/* preserve the header in the buffer.. */
c->ritem = c->rcurr + sizeof(protocol_binary_request_header);
conn_set_state(c, conn_nread);
}
到此,进入nread状态:
case conn_nread:
complete_nread(c);
//...
res = read(c->sfd, c->ritem, c->rlbytes);//读取网络item数据
其中complete_nread:
static void complete_nread(conn *c) {
//....
complete_nread_binary(c);
//.....
}
其中complete_nread_binary函数:
static void complete_nread_binary(conn *c) {
switch(c->substate) {
//....
case bin_read_set_value: //将给定的key对应的item的value设为给定值
complete_update_bin(c);
break;
case bin_reading_get_key:
case bin_reading_touch_key:
process_bin_get_or_touch(c);
break;
//....
}
其中函数complete_update_bin,调用store_item函数。 存储item函数:store_item,可用于add set update等命令语义。
* Stores an item in the cache (high level, obeys set/add/replace semantics)
*/
enum store_item_type store_item(item *item, int comm, conn* c) {
enum store_item_type ret;
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey);//根据key及key长度,计算哈希值
item_lock(hv);
ret = do_store_item(item, comm, c, hv);//与底层数据操作的接口函数
item_unlock(hv);
return ret;
}
经过层层调用,终于到达真正执行add set update等命令的底层操作接口函数:
do_store_item:根据具体具有set语义的命令,对哈希表、slab中的item数据进行操作。
并返回存储操作结果:
enum store_item_type {
NOT_STORED=0, STORED, EXISTS, NOT_FOUND //存储失败;成功;已存在;或没找到
};
函数do_store_item:
enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
char *key = ITEM_key(it);//获取要set的item对应的key
item *old_it = do_item_get(key, it->nkey, hv);//根据key获取原来的item,如果没不存在则返回NULL
enum store_item_type stored = NOT_STORED;//item状态标记
item *new_it = NULL;
int flags;
if (old_it != NULL && comm == NREAD_ADD) {//如果old_it不为NULL,且操作为add操作
//add只添加原不存在的item,如果存在则只是将该item移到LRU队列头部
do_item_update(old_it);//更新原item在lru队列中的位置和过期时间
//如果原item为空,且操作为repalce等,则不执行任何操作
} else if (!old_it && (comm == NREAD_REPLACE
|| comm == NREAD_APPEND || comm == NREAD_PREPEND))
{
/* replace only replaces an existing value; don't store */
} else if (comm == NREAD_CAS) {//以cas方式读取
/* validate cas operation */
if(old_it == NULL) {
// LRU expired
stored = NOT_FOUND;//修改状态
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.cas_misses++;
pthread_mutex_unlock(&c->thread->stats.mutex);
}
////old_it不为NULL,且cas属性一致
else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[old_it->slabs_clsid].cas_hits++;//更新线程统计信息
pthread_mutex_unlock(&c->thread->stats.mutex);
//执行item的替换操作,用新的item替换老的item (哈希表 lru)
item_replace(old_it, it, hv);
stored = STORED;
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++;
pthread_mutex_unlock(&c->thread->stats.mutex);
if(settings.verbose > 1) {
fprintf(stderr, "CAS: failure: expected %llu, got %llu
",
(unsigned long long)ITEM_get_cas(old_it),
(unsigned long long)ITEM_get_cas(it));
}
stored = EXISTS;//修改状态值,修改状态值为已经存在,且不存储最新的数据
}
} else {
//以追加方式执行写(追加到旧数据后面 追加到旧数据前面)
if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
/*
* Validate CAS
*/
if (ITEM_get_cas(it) != 0) {
// CAS much be equal
if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
stored = EXISTS;//已存在
}
}
if (stored == NOT_STORED) {//状态值为没有存储,也就是cas验证通过,则执行写操作
flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */, hv);
if (new_it == NULL) {//空间不足
if (old_it != NULL)
do_item_remove(old_it);//试着删除原item
return NOT_STORED;
}
//追加方式,将原item的数据拷贝到新item的数据空间,然后将新item(it)的数据追加到原数据的后面
if (comm == NREAD_APPEND) {
memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
} else {
/* NREAD_PREPEND */
memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);//将新item数据拷贝到new_it的数据区
//将原数据追加到新数据的后面
memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
}
it = new_it;
}
}
if (stored == NOT_STORED) {
if (old_it != NULL)//旧数据不为空,则替换
item_replace(old_it, it, hv);
else
do_item_link(it, hv);//旧数据为空,则将新的item添加到哈希表 lru队列
c->cas = ITEM_get_cas(it);
stored = STORED;//数据已存储
}
}
if (old_it != NULL)
do_item_remove(old_it); //释放本次引用
if (new_it != NULL)
do_item_remove(new_it);//释放本次引用
if (stored == STORED) {
c->cas = ITEM_get_cas(it);
}
return stored;//返回操作结果状态
}
以上就是SET语义相关的操作,其逻辑还是很清晰的。其中调用了很多关于哈希表,lru队列的操作函数,具体可参见前面的相关内容,在此不再详述。
下面继续看GET相关的操作指令。至于怎么由状态机逐步调用到底层操作函数的过程不再列出了,直接看底层的相关操作接口吧:
//根据key信息和key的长度信息读取数据
item *item_get(const char *key, const size_t nkey) {
item *it;
uint32_t hv;
hv = hash(key, nkey, 0);//获取哈希值
item_lock(hv);//执行分段加锁
it = do_item_get(key, nkey, hv);//执行get操作
item_unlock(hv);//释放锁
return it;
}
函数do_item_get:
get操作在读取数据时,会判断数据的有效性,从而不用专门去处理过期数据,这也正是惰性机制的体现。
//执行读取操作 (get中体现的正是惰性删除机制)
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
item *it = assoc_find(key, nkey, hv);//在哈希表中查找该item,然后利用该item的clsid信息去lru中查找该item
if (it != NULL) {
refcount_incr(&it->refcount);//item的引用次数+1
if (slab_rebalance_signal && //如果正在进行slab调整,且该item是调整的对象
((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) {
do_item_unlink_nolock(it, hv);//将item从hashtable和LRU链中移除
do_item_remove(it);//删除item
it = NULL;//置为空
}
}
int was_found = 0;
//打印调试信息
if (settings.verbose > 2) {
if (it == NULL) {
fprintf(stderr, "> NOT FOUND %s", key);
} else {
fprintf(stderr, "> FOUND KEY %s", ITEM_key(it));
was_found++;
}
}
if (it != NULL) {
//判断Memcached初始化是否开启过期删除机制,如果开启,则执行删除相关操作
if (settings.oldest_live != 0 && settings.oldest_live <= current_time &&
it->time <= settings.oldest_live) {
do_item_unlink(it, hv);//将item从hashtable和LRU链中移除
do_item_remove(it);//删除item
it = NULL;
if (was_found) {
fprintf(stderr, " -nuked by flush");
}
//判断item是否过期
} else if (it->exptime != 0 && it->exptime <= current_time) {
do_item_unlink(it, hv);//将item从hashtable和LRU链中移除
do_item_remove(it);//删除item
it = NULL;
if (was_found) {
fprintf(stderr, " -nuked by expire");
}
} else {
it->it_flags |= ITEM_FETCHED;//item的标识修改为已经读取
DEBUG_REFCNT(it, '+');
}
}
if (settings.verbose > 2)
fprintf(stderr, "
");
return it;
}
以上就是SET、GET的操作过程了。至于其他的命令实现于此类似,由于底层的的所有操作在前面的相关小节中已经作了较为深入的分析,因此在这就不再一一详述了。
到这,整个状态机drive_machine的主要过程就分析得差不多了。通过最近三小节的分析,对memcached的整个运行流程,有了本质的理解,再结合前面底层数据存储结构的分析,整个memcached的基本框架终于搭起来了。
后面将进行一系列的总结归纳,并结合几个小的实例具体去配置、运行下,不能总是纸上谈兵的!