zoukankan      html  css  js  c++  java
  • memcached学习笔记——存储命令源码分析上篇

    原创文章,转载请标明,谢谢。

    上一篇分析过memcached的连接模型,了解memcached是如何高效处理客户端连接,这一篇分析memcached源码中的process_update_command函数,探究memcached客户端的set命令,解读memcached是如何解析客户端文本命令,剖析memcached的内存管理,LRU算法是如何工作等等。

    解析客户端文本命令

    客户端向memcached server发出set操作,memcached server读取客户端的命令,客户端的连接状态由 conn_read > conn_parse_cmd 转换,这时候,memcached server开始解析命令。memcached server调用try_read_command函数解析命令,memcached接收两种格式的命令,一种是二进制格式,另一种是文本格式(本文只讲文本格式的命令)。

     1 static int try_read_command(conn *c) {
     2 
     3     // ..........
     4     
     5     if (c->protocol == binary_prot) {
     6 
     7         // 二进制格式
     8         // ....
     9         
    10     } else {
    11         char *el, *cont;
    12 
    13         // 没有接收到客户端的命令,返回进入conn_waiting状态,等待更多的客户端数据
    14         if (c->rbytes == 0)
    15             return 0;
    16 
    17         el = memchr(c->rcurr, '
    ', c->rbytes);
    18         if (!el) {
    19             if (c->rbytes > 1024) {
    20                 /*
    21                  * We didn't have a '
    ' in the first k. This _has_ to be a
    22                  * large multiget, if not we should just nuke the connection.
    23                  */
    24                 char *ptr = c->rcurr;
    25                 while (*ptr == ' ') { /* ignore leading whitespaces */
    26                     ++ptr;
    27                 }
    28 
    29                 if (ptr - c->rcurr > 100 ||
    30                     (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {
    31 
    32                     conn_set_state(c, conn_closing);
    33                     return 1;
    34                 }
    35             }
    36 
    37             return 0;
    38         }
    39         
    40         // 客户端报文以'
    '结尾
    41         cont = el + 1;
    42         if ((el - c->rcurr) > 1 && *(el - 1) == '
    ') {
    43             el--;
    44         }
    45         *el = '';
    46 
    47         assert(cont <= (c->rcurr + c->rbytes));
    48 
    49         // 真正解析命令的地方
    50         process_command(c, c->rcurr);
    51 
    52         c->rbytes -= (cont - c->rcurr);
    53         c->rcurr = cont;
    54 
    55         assert(c->rcurr <= (c->rbuf + c->rsize));
    56     }
    57 
    58     return 1;
    59 }

    在分析process_command函数前,我们先看看memcached的命令格式:

     1     <command name> <key> <flags> <exptime> <bytes> [noreply]
      
     2       
     3     cas <key> <flags> <exptime> <bytes> <cas unique> [noreply]
     
     4     
     5     // 例如 set 命令 :    
     6     set key 0 60 2  
     7     12  
     8     STORED
     9     
    10     // 空格对应着空格
    11     set => <command name>
    12     key => <key>
    13     0   => <flags>
    14     60  => <exptime>
    15     2   => <bytes>

    memcached在process_command中调用tokenize_command函数根据上面的命令格式处理命令,把相应位置的字段保存在 token_t *tokens 的相应位置。

     1 // 参数1:命令的字符串
     2 // 参数2:解析命令后,存放命令各个字段的结构体数组
     3 // 参数3:命令字段的最大数量
     4 /*
     5 *    tokens[0] => <command name> 的信息
     6 *    tokens[1] => <key> 的信息
     7 *    tokens[2] => <flags> 的信息
     8 */
     9 static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
    10     char *s, *e;
    11     size_t ntokens = 0;
    12     size_t len = strlen(command);
    13     unsigned int i = 0;
    14 
    15     assert(command != NULL && tokens != NULL && max_tokens > 1);
    16 
    17     s = e = command;
    18     for (i = 0; i < len; i++) {
    19         if (*e == ' ') {
    20             if (s != e) {
    21                 tokens[ntokens].value = s;  // value存放各个字段的字符串值,例如:'set'
    22                 tokens[ntokens].length = e - s; // length表示各个字段相应的长度,例如:'set'的长度为3
    23                 ntokens++;
    24                 *e = '';
    25                 if (ntokens == max_tokens - 1) {
    26                     e++;
    27                     s = e; /* so we don't add an extra token */
    28                     break;
    29                 }
    30             }
    31             s = e + 1;
    32         }
    33         e++;
    34     }
    35 
    36     if (s != e) {
    37         tokens[ntokens].value = s;
    38         tokens[ntokens].length = e - s;
    39         ntokens++;
    40     }
    41 
    42     /*
    43      * If we scanned the whole string, the terminal value pointer is null,
    44      * otherwise it is the first unprocessed character.
    45      */
    46     tokens[ntokens].value =  *e == '' ? NULL : e;
    47     tokens[ntokens].length = 0;
    48     ntokens++;
    49 
    50     return ntokens;
    51 }

    解析完文本命令后,回到process_command函数中,我们可以看到很熟悉的命令,是的,接下来,在一个if-else的多分支判断中,memcached根据tokens[COMMAND_TOKEN].value决定调用那一个函数处理相应的命令:

     1 static void process_command(conn *c, char *command) {
     2     
     3     // ....
     4     
     5     ntokens = tokenize_command(command, tokens, MAX_TOKENS);
     6     if (ntokens >= 3 &&
     7         ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
     8          (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
     9         
    10         // 这里就是执行get命令的分支
    11         process_get_command(c, tokens, ntokens, false);
    12 
    13     } else if ((ntokens == 6 || ntokens == 7) &&
    14                ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
    15                 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
    16                 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
    17                 (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
    18                 (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
    19 
    20         // 这里就是执行set、add、replace等命令的分支
    21         process_update_command(c, tokens, ntokens, comm, false);
    22 
    23     } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
    24 
    25         // 这里也执行process_update_command函数,也是对相应的key执行写操作,与上面一个分支不同的是最后一个参数是true,意思是写的过程使用CAS协议,这里不侧重讲,
    26         // CAS目的是保证在并发写的时候保证一致性
    27         process_update_command(c, tokens, ntokens, comm, true);
    28 
    29     } else if .............    

    memcached存储命令分析

    memcached把内存分割成各种尺寸的块(chunk),并把尺寸相同的块分成组(chunk的集合),每个chunk集合被称为slab。Memcached的内存分配以Page为单位,Page默认值为1M,可以在启动时通过-I参数来指定。Slab是由多个Page组成的,Page按照指定大小切割成多个chunk。

    每一对[key,value]的数据被封装到item的结构体里,每种类型的slab用一个item链表来维护它的所有item。例如,一个item项的数据大小加上item的头部信息(为了方便描述,下面把这两项的和统称[key,value]大小吧)是90KB,slab[i]的chunk块大小是136KB,slab[i-1]的chunk块大小是88KB,那么item会被分配slab[i]的一个chunk块(并保存到slab[i]维护的一个item链表),这样做的目的是为了尽量减少内存碎片。更多关于Slab Allocation的原理可以查找其他的资料。这里不详解

    memcached的存储命令:add、set、replace、append、prepend等,上面简单地说了memcached slab机制,知道memcached是根据相应的[key,value]大小找到相应的slab,那么,我们再次调用set命令某个已存在的key的value的时候,memcached是怎么工作的呢?

    起初,我的直觉思维是,找到key相应的item,修改item的value就好了。那么,问题来了,假如先前[key,value]大小是90KB,被分配到slab[i]的,现在我们修改了key对应的value,[key,value]大小也改变了,变成了80KB,应该分配到slab[i-1]的,如果只是修改原来item的数据,那么就不符合Slab Allocation的原理,会造成很大的内存碎片浪费。

    memcached对存储命令:add、set、replace、append、prepend处理方法大体都相似的,从上面的源码可以看出,都是通过执行process_update_command函数来处理。

    memcached的处理存储命令思路是这样的:例如,客户端的一个set命令,memcached都会重新根据[key,value]大小找到合适slab,并把相应的数据封装到新的item里面【源码的注1】(不会直接修改旧的item项),如果对应的slab没有内存空间不足,就调用LRU算法把该slab的一个最近最少使用项的空间分配给新的item【源码的注2,出现在do_item_alloc函数】(如果关闭LRU移除项的功能,那么就会报“SERVER_ERROR out of memory storing object”错误,是set命令的话,还会把key对应的旧的item项移除【源码的注3】,即我们这时候不能通过get key来获取到旧的数据了),分配空间成功,那就是对add、set、replace、append、prepend这几个存储命令做差异化处理。

     1 static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
     2     
     3     // ....
     4 
     5     set_noreply_maybe(c, tokens, ntokens);  // 设置命令可选字段的[noreply]
     6 
     7     if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
     8         out_string(c, "CLIENT_ERROR bad command line format");
     9         return;
    10     }
    11 
    12     key = tokens[KEY_TOKEN].value;
    13     nkey = tokens[KEY_TOKEN].length;
    14 
    15     // 把命令相应字段的字符串安全转换成整数
    16     if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
    17            && safe_strtol(tokens[3].value, &exptime_int)
    18            && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
    19         out_string(c, "CLIENT_ERROR bad command line format");
    20         return;
    21     }
    22 
    23     exptime = exptime_int;
    24 
    25     // #define REALTIME_MAXDELTA 60*60*24*30
    26     if (exptime < 0)
    27         exptime = REALTIME_MAXDELTA + 1;
    28 
    29     // CAS协议,防止并发写不一致
    30     if (handle_cas) {
    31         if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
    32             out_string(c, "CLIENT_ERROR bad command line format");
    33             return;
    34         }
    35     }
    36 
    37     // ........
    38 
    39     // 注1:无论是add、set或者是replace命令,都会从新分配一个新的item
    40     it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
    41 
    42     // 如果新的item分配失败
    43     if (it == 0) {
    44         if (! item_size_ok(nkey, flags, vlen))
    45             out_string(c, "SERVER_ERROR object too large for cache");        // 一种错误情况:数据太大,没有合适slab,不能缓存数据
    46         else
    47             out_string(c, "SERVER_ERROR out of memory storing object");      // 另一种是:没有了内存空间缓存数据,通常这种事在关闭LRU功能的情况下出现
    48         /* swallow the data line */
    49         c->write_and_go = conn_swallow;
    50         c->sbytes = vlen;
    51 
    52         // 注3:新的item分配失败,如果是set命令,并且key对应着存在旧的item,那么就把旧的item删除
    53         if (comm == NREAD_SET) {
    54             it = item_get(key, nkey);
    55             if (it) {
    56                 item_unlink(it);
    57                 item_remove(it);
    58             }
    59         }
    60 
    61         return;
    62     }
    63     ITEM_set_cas(it, req_cas_id);
    64 
    65     c->item = it;
    66     c->ritem = ITEM_data(it);
    67     c->rlbytes = it->nbytes;
    68     c->cmd = comm;
    69     
    70     // 会在这一步进行add、set、replace等存储命令的差异化处理
    71     conn_set_state(c, conn_nread);
    72 }

    do_item_alloc函数:

      1 item *do_item_alloc(char *key, const size_t nkey, const int flags,
      2                     const rel_time_t exptime, const int nbytes,
      3                     const uint32_t cur_hv) {
      4     uint8_t nsuffix;
      5     item *it = NULL;
      6     char suffix[40];
      7     size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);   //接收到的item数据长度+item头部长度
      8     if (settings.use_cas) {
      9         ntotal += sizeof(uint64_t);
     10     }
     11 
     12     unsigned int id = slabs_clsid(ntotal);
     13     if (id == 0)
     14         return 0;
     15 
     16     mutex_lock(&cache_lock);
     17     /* do a quick check if we have any expired items in the tail.. */
     18     int tries = 5;
     19     int tried_alloc = 0;
     20     item *search;
     21     void *hold_lock = NULL;
     22     rel_time_t oldest_live = settings.oldest_live;
     23 
     24     search = tails[id];
     25     
     26     // tries = 5 ,循环查找过期的item,最多循环5次
     27     for (; tries > 0 && search != NULL; tries--, search=search->prev) {
     28         uint32_t hv = hash(ITEM_key(search), search->nkey, 0);
     29         
     30         // 如果当前item被上锁,那么就跳过 
     31         if (hv != cur_hv && (hold_lock = item_trylock(hv)) == NULL)
     32             continue;
     33         /* Now see if the item is refcount locked */
     34         if (refcount_incr(&search->refcount) != 2) {
     35             refcount_decr(&search->refcount);
     36             /* Old rare bug could cause a refcount leak. We haven't seen
     37              * it in years, but we leave this code in to prevent failures
     38              * just in case */
     39             if (search->time + TAIL_REPAIR_TIME < current_time) {
     40                 itemstats[id].tailrepairs++;
     41                 search->refcount = 1;
     42                 do_item_unlink_nolock(search, hv);
     43             }
     44             if (hold_lock)
     45                 item_trylock_unlock(hold_lock);
     46             continue;
     47         }
     48 
     49         // item过期,如果没有设置过期时间,那么就使用系统设置的默认过期时间
     50         if ((search->exptime != 0 && search->exptime < current_time) 
     51             || (search->time <= oldest_live && oldest_live <= current_time)) {
     52             itemstats[id].reclaimed++;
     53             if ((search->it_flags & ITEM_FETCHED) == 0) {
     54                 itemstats[id].expired_unfetched++;
     55             }
     56             it = search;
     57             slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);   // 当前搜索的item过期,重新计算slab已经分配的字节
     58             do_item_unlink_nolock(it, hv);   // 把当前搜索的item从链表中移除
     59             /* Initialize the item block: */
     60             it->slabs_clsid = 0;
     61         } else if ((it = slabs_alloc(ntotal, id)) == NULL) {  // 没有找到过期的item,新分配一个item,分配失败就执行else if里面的代码
     62             tried_alloc = 1;
     63             if (settings.evict_to_free == 0) {  // 注2:内存耗尽,如果evict_to_free = 1(默认)LRU算法启动,移除最近最少使用的item
     64                 itemstats[id].outofmemory++;
     65             } else {
     66                 itemstats[id].evicted++;
     67                 itemstats[id].evicted_time = current_time - search->time;
     68                 if (search->exptime != 0)
     69                     itemstats[id].evicted_nonzero++;
     70                 if ((search->it_flags & ITEM_FETCHED) == 0) {
     71                     itemstats[id].evicted_unfetched++;
     72                 }
     73                 it = search;
     74                 slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);   // 当前搜索的item过期,重新计算slab已经分配的字节
     75                 do_item_unlink_nolock(it, hv);  // 把当前需要移除的item从链表中移除
     76                 /* Initialize the item block: */
     77                 it->slabs_clsid = 0;
     78 
     79                 if (settings.slab_automove == 2)
     80                     slabs_reassign(-1, id);
     81             }
     82         }
     83 
     84         refcount_decr(&search->refcount);
     85         /* If hash values were equal, we don't grab a second lock */
     86         if (hold_lock)
     87             item_trylock_unlock(hold_lock);
     88         break;
     89     }
     90 
     91     if (!tried_alloc && (tries == 0 || search == NULL))
     92         it = slabs_alloc(ntotal, id);
     93 
     94     if (it == NULL) {
     95         itemstats[id].outofmemory++;
     96         mutex_unlock(&cache_lock);
     97         return NULL;
     98     }
     99 
    100     assert(it->slabs_clsid == 0);
    101     assert(it != heads[id]);
    102 
    103 
    104     it->refcount = 1;     
    105     mutex_unlock(&cache_lock);
    106     it->next = it->prev = it->h_next = 0;
    107     it->slabs_clsid = id;
    108 
    109     DEBUG_REFCNT(it, '*');
    110     it->it_flags = settings.use_cas ? ITEM_CAS : 0;
    111     it->nkey = nkey;
    112     it->nbytes = nbytes;
    113     memcpy(ITEM_key(it), key, nkey);
    114     it->exptime = exptime;
    115     memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
    116     it->nsuffix = nsuffix;
    117     return it;
    118 }

    以上memcached只是为[key,value]找到了新的slab,分配了新的item,并把命令相关的头部信息保存到,但是,还有一个重要的步奏没有说的,那就是[key,value]中的value怎么和item关联起来的,add和set的区别又是怎样区分的,由于还有很长的一段代码,所以我还是分篇记录,预告一下,下一篇《memcached学习笔记——存储命令源码分析下》会讲遗留的这两个问题。

    未完,待续。

     更多阅读查看:JC&hcoding

  • 相关阅读:
    Eclipse中常用的快捷键总结!不收藏后悔!
    MySQL基本命令整理,java数据库秘籍!
    centos 禁用ip v6
    win7 & win10 安装AD管理工具
    CentOS LVM 卷在线扩容
    Windows 与 Linux 、esxi下面查看内存容量和数量
    ESX/ESXi 主机上的每个插槽中安装了多少内存
    使用 esxcli storage vmfs unmap 命令在精简置备的 LUN 上回收 VMFS 删除的块
    vSphere 高级特性FT配置与管理
    vSphere HA 原理与配置
  • 原文地址:https://www.cnblogs.com/szuyuan/p/4153299.html
Copyright © 2011-2022 走看看