zoukankan      html  css  js  c++  java
  • memcached 源代码阅读笔记(4) SET 操作分析

    memcached 源代码阅读笔记- SET 操作分析
    输入
    set yzn 32 0 5
    hell1

    进入static void process_command(conn *c, char *command) 函数
    进入如下分支
     if (ntokens == 6 &&
                   ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
                    (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
                    (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
                    (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
                    (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {

            process_update_command(c, tokens, ntokens, comm, false);

        }
    可以看到调用 process_update_command函数

    static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
        char *key;
        size_t nkey;
        int flags;
        time_t exptime;
        int vlen;
        uint64_t req_cas_id;
        item *it;

        assert(c != NULL);

        if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
            out_string(c, "CLIENT_ERROR bad command line format");
            return;
        }

        key = tokens[KEY_TOKEN].value;
        nkey = tokens[KEY_TOKEN].length;

        flags = strtoul(tokens[2].value, NULL, 10);
        exptime = strtol(tokens[3].value, NULL, 10);
        vlen = strtol(tokens[4].value, NULL, 10);

        // does cas value exist?
        if(handle_cas)
        {
          req_cas_id = strtoull(tokens[5].value, NULL, 10);
        }

        if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
            out_string(c, "CLIENT_ERROR bad command line format");
            return;
        }

        if (settings.detail_enabled) {
            stats_prefix_record_set(key);
        }

        if (settings.managed) {
            int bucket = c->bucket;
            if (bucket == -1) {
                out_string(c, "CLIENT_ERROR no BG data in managed mode");
                return;
            }
            c->bucket = -1;
            if (buckets[bucket] != c->gen) {
                out_string(c, "ERROR_NOT_OWNER");
                return;
            }
        }

        it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);

        if (it == 0) {
            if (! item_size_ok(nkey, flags, vlen + 2))
                out_string(c, "SERVER_ERROR object too large for cache");
            else
                out_string(c, "SERVER_ERROR out of memory");
            /* swallow the data line */
            c->write_and_go = conn_swallow;
            c->sbytes = vlen + 2;
            return;
        }
        if(handle_cas)
          it->cas_id = req_cas_id;

        c->item = it;
        c->ritem = ITEM_data(it);
        c->rlbytes = it->nbytes;
        c->item_comm = comm;
        conn_set_state(c, conn_nread);
    }
    可以看到命令格式就是
    cmdname key flags exptime vlen
        key = tokens[KEY_TOKEN].value;
        nkey = tokens[KEY_TOKEN].length;

        flags = strtoul(tokens[2].value, NULL, 10);
        exptime = strtol(tokens[3].value, NULL, 10);
        vlen = strtol(tokens[4].value, NULL, 10);


     it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
    为item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes) {
        uint8_t nsuffix;
        item *it;
        char suffix[40];
        size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);

        unsigned int id = slabs_clsid(ntotal);
        if (id == 0)
            return 0;

        it = slabs_alloc(ntotal);
        if (it == 0) {
            int tries = 50;
            item *search;

            /* If requested to not push old items out of cache when memory runs out,
             * we're out of luck at this point...
             */

            if (settings.evict_to_free == 0) return NULL;

            /*
             * try to get one off the right LRU
             * don't necessariuly unlink the tail because it may be locked: refcount>0
             * search up from tail an item with refcount==0 and unlink it; give up after 50
             * tries
             */

            if (id > LARGEST_ID) return NULL;
            if (tails[id] == 0) return NULL;

            for (search = tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
                if (search->refcount == 0) {
                   if (search->exptime == 0 || search->exptime > current_time) {
                           STATS_LOCK();
                           stats.evictions++;
                           STATS_UNLOCK();
                    }
                    do_item_unlink(search);
                    break;
                }
            }
            it = slabs_alloc(ntotal);
            if (it == 0) return NULL;
        }

        assert(it->slabs_clsid == 0);

        it->slabs_clsid = id;

        assert(it != heads[it->slabs_clsid]);

        it->next = it->prev = it->h_next = 0;
        it->refcount = 1;     /* the caller will have a reference */
        DEBUG_REFCNT(it, '*');
        it->it_flags = 0;
        it->nkey = nkey;
        it->nbytes = nbytes;
        strcpy(ITEM_key(it), key);
        it->exptime = exptime;
        memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
        it->nsuffix = nsuffix;
        return it;
    }
    static size_t item_make_header(const uint8_t nkey, const int flags, const int nbytes,
                         char *suffix, uint8_t *nsuffix) {
        /* suffix is defined at 40 chars elsewhere.. */
        *nsuffix = (uint8_t) snprintf(suffix, 40, " %d %d\r\n", flags, nbytes - 2);
        return sizeof(item) + nkey + *nsuffix + nbytes;
    }

    选择一个合适的slabs
    unsigned int slabs_clsid(const size_t size) {
        int res = POWER_SMALLEST;

        if (size == 0)
            return 0;
        while (size > slabclass[res].size)
            if (res++ == power_largest)     /* won't fit in the biggest slab */
                return 0;
        return res;
    }

    it = slabs_alloc(ntotal);
    void *do_slabs_alloc(const size_t size) {
        slabclass_t *p;

        unsigned int id = slabs_clsid(size);
        if (id < POWER_SMALLEST || id > power_largest)
            return NULL;

        p = &slabclass[id];
        assert(p->sl_curr == 0 || ((item *)p->slots[p->sl_curr - 1])->slabs_clsid == 0);

    #ifdef USE_SYSTEM_MALLOC
        if (mem_limit && mem_malloced + size > mem_limit)
            return 0;
        mem_malloced += size;
        return malloc(size);
    #endif

        /* fail unless we have space at the end of a recently allocated page,
           we have something on our freelist, or we could allocate a new page */
        if (! (p->end_page_ptr != 0 || p->sl_curr != 0 || do_slabs_newslab(id) != 0))
            return 0;

        /* return off our freelist, if we have one */
        if (p->sl_curr != 0)
            return p->slots[--p->sl_curr];

        /* if we recently allocated a whole page, return from that */
        if (p->end_page_ptr) {
            void *ptr = p->end_page_ptr;
            if (--p->end_page_free != 0) {
                (char*)(p->end_page_ptr) += p->size;
            } else {
                p->end_page_ptr = 0;
            }
            return ptr;
        }

        return NULL;  /* shouldn't ever get here */
    }


    static slabclass_t slabclass[POWER_LARGEST + 1];
    #define POWER_SMALLEST 1
    #define POWER_LARGEST  200
    #define POWER_BLOCK 1048576
    #define CHUNK_ALIGN_BYTES (sizeof(void *))

    typedef struct {
        unsigned int size;      /* sizes of items */
        unsigned int perslab;   /* how many items per slab *//

        void **slots;           /* list of item ptrs */
        unsigned int sl_total;  /* size of previous array */
        unsigned int sl_curr;   /* first free slot */

        void *end_page_ptr;         /* pointer to next free item at end of page, or 0 */
        unsigned int end_page_free; /* number of items remaining at end of last alloced page */

        unsigned int slabs;     /* how many slabs were allocated for this class */

        void **slab_list;       /* array of slab pointers */
        unsigned int list_size; /* size of prev array */

        unsigned int killing;  /* index+1 of dying slab, or zero if none */
    } slabclass_t;

    跟踪代码可知p此时
    -  p 0x0046ebac {size=88 perslab=11915 slots=0x02231e70 ...} slabclass_t *
      size 88 unsigned int
      perslab 11915 unsigned int
      slots 0x02231e70 void * *
      sl_total 16 unsigned int
      sl_curr 1 unsigned int
      end_page_ptr 0x022f00d0 void *
      end_page_free 11913 unsigned int
      slabs 1 unsigned int
      slab_list 0x02231e18 void * *
      list_size 16 unsigned int
      killing 0 unsigned int

    static int do_slabs_newslab(const unsigned int id) {
        slabclass_t *p = &slabclass[id];
    #ifdef ALLOW_SLABS_REASSIGN
        int len = POWER_BLOCK;
    #else
        int len = p->size * p->perslab;
    #endif
        char *ptr;

        if (mem_limit && mem_malloced + len > mem_limit && p->slabs > 0)
            return 0;

        if (grow_slab_list(id) == 0) return 0;

        ptr = malloc((size_t)len);
        if (ptr == 0) return 0;

        memset(ptr, 0, (size_t)len);
        p->end_page_ptr = ptr;
        p->end_page_free = p->perslab;

        p->slab_list[p->slabs++] = ptr;
        mem_malloced += len;
        return 1;
    }

    static int grow_slab_list (const unsigned int id) {
        slabclass_t *p = &slabclass[id];
        if (p->slabs == p->list_size) {
            size_t new_size =  (p->list_size != 0) ? p->list_size * 2 : 16;
            void *new_list = realloc(p->slab_list, new_size * sizeof(void *));
            if (new_list == 0) return 0;
            p->list_size = new_size;
            p->slab_list = new_list;
        }
        return 1;
    }

    自此完成一个item的内存分配
    item的结构如下:
    typedef struct _stritem {
        struct _stritem *next;
        struct _stritem *prev;
        struct _stritem *h_next;    /* hash chain next */
        rel_time_t      time;       /* least recent access */
        rel_time_t      exptime;    /* expire time */
        int             nbytes;     /* size of data */
        unsigned short  refcount;
        uint8_t         nsuffix;    /* length of flags-and-length string */
        uint8_t         it_flags;   /* ITEM_* above */
        uint8_t         slabs_clsid;/* which slab class we're in */
        uint8_t         nkey;       /* key length, w/terminating null and padding */
        uint64_t        cas_id;     /* the CAS identifier */
        void * end[];
        /* then null-terminated key */
        /* then " flags length\r\n" (no terminating null) */
        /* then data with terminating \r\n (no terminating null; it's binary!) */
    } item;

    strcpy(ITEM_key(it), key);
    #define ITEM_key(item) ((char*)&((item)->end[0]))

     memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
    #define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey + 1)


       c->item = it;
        c->ritem = ITEM_data(it);
    #define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + 1 + (item)->nsuffix)
        c->rlbytes = it->nbytes;


    static void complete_nread(conn *c) {
        item *it;
        int comm;
        int ret;
        assert(c != NULL);
       
        comm = c->item_comm;
        it = c->item;

        STATS_LOCK();
        stats.set_cmds++;
        STATS_UNLOCK();

        if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
            out_string(c, "CLIENT_ERROR bad data chunk");
        } else {
          ret = store_item(it, comm);
          if (ret == 1)
              out_string(c, "STORED");
          else if(ret == 2)
              out_string(c, "EXISTS");
          else if(ret == 3)
              out_string(c, "NOT_FOUND");
          else
              out_string(c, "NOT_STORED");
        }

        item_remove(c->item);       /* release the c->item reference */
        c->item = 0;
    }

    ret = store_item(it, comm);
    int do_store_item(item *it, int comm) {
        char *key = ITEM_key(it);
        bool delete_locked = false;
        item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
        int stored = 0;

        item *new_it = NULL;
        int flags;

        if (old_it != NULL && comm == NREAD_ADD) {
            /* add only adds a nonexistent item, but promote to head of LRU */
            do_item_update(old_it);
        } else if (!old_it && (comm == NREAD_REPLACE
            || comm == NREAD_APPEND || comm == NREAD_PREPEND))
        {
            /* replace only replaces an existing value; don't store */
        } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
            || comm == NREAD_APPEND || comm == NREAD_PREPEND))
        {
            /* replace and add can't override delete locks; don't store */
        } else if (comm == NREAD_CAS) {
            /* validate cas operation */
            if (delete_locked)
                old_it = do_item_get_nocheck(key, it->nkey);

            if(old_it == NULL) {
              // LRU expired
              stored = 3;
            }
            else if(it->cas_id == old_it->cas_id) {
              // cas validates
              do_item_replace(old_it, it);
              stored = 1;
            }
            else
            {
              stored = 2;
            }
        } else {
            /*
             * Append - combine new and old record into single one. Here it's
             * atomic and thread-safe.
             */

            if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {

                /* we have it and old_it here - alloc memory to hold both */
                /* flags was already lost - so recover them from ITEM_suffix(it) */

                flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);

                new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);

                if (new_it == NULL) {
                    /* SERVER_ERROR out of memory */
                    return 0;
                }

                /* copy data from it and old_it to new_it */

                if (comm == NREAD_APPEND) {
                    memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
                    memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
                } else {
                    /* NREAD_PREPEND */
                    memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
                    memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
                }

                it = new_it;
            }

            /* "set" commands can override the delete lock
               window... in which case we have to find the old hidden item
               that's in the namespace/LRU but wasn't returned by
               item_get.... because we need to replace it */
            if (delete_locked)
                old_it = do_item_get_nocheck(key, it->nkey);

            if (old_it != NULL)
                do_item_replace(old_it, it);
            else
                do_item_link(it);

            stored = 1;
        }

        if (old_it != NULL)
            do_item_remove(old_it);         /* release our reference */
        if (new_it != NULL)
            do_item_remove(new_it);

        return stored;
    }


    if (old_it != NULL)
                do_item_replace(old_it, it);

    int do_item_replace(item *it, item *new_it) {
        assert((it->it_flags & ITEM_SLABBED) == 0);

        do_item_unlink(it);
        return do_item_link(new_it);
    }


    int do_item_link(item *it) {
        assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
        assert(it->nbytes < (1024 * 1024));  /* 1MB max size */
        it->it_flags |= ITEM_LINKED;
        it->time = current_time;
        assoc_insert(it);

        STATS_LOCK();
        stats.curr_bytes += ITEM_ntotal(it);
        stats.curr_items += 1;
        stats.total_items += 1;
        STATS_UNLOCK();

        /* Allocate a new CAS ID on link. */
        it->cas_id = get_cas_id();

        item_link_q(it);

        return 1;
    }


    /* Note: this isn't an assoc_update.  The key must not already exist to call this */

    int assoc_insert(item *it) {
        uint32_t hv;
        unsigned int oldbucket;

        assert(assoc_find(ITEM_key(it), it->nkey) == 0);  /* shouldn't have duplicately named things defined */

        hv = hash(ITEM_key(it), it->nkey, 0);
        if (expanding &&
            (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
        {
            it->h_next = old_hashtable[oldbucket];
            old_hashtable[oldbucket] = it;
        } else {
            it->h_next = primary_hashtable[hv & hashmask(hashpower)];
            primary_hashtable[hv & hashmask(hashpower)] = it;
        }

        hash_items++;
        if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
            assoc_expand();
        }

        return 1;
    }

    看到这个函数可以看出内部的hash表采用开链存储法。

    当hash_items > 桶的个数的1.5倍的时候,就扩张hash表。

    /* grows the hashtable to the next power of 2. */
    static void assoc_expand(void) {
        old_hashtable = primary_hashtable;

        primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
        if (primary_hashtable) {
            if (settings.verbose > 1)
                fprintf(stderr, "Hash table expansion starting\n");
            hashpower++;
            expanding = true;
            expand_bucket = 0;
            do_assoc_move_next_bucket();
        } else {
            primary_hashtable = old_hashtable;
            /* Bad news, but we can keep running. */
        }
    }

    /* migrates the next bucket to the primary hashtable if we're expanding. */
    void do_assoc_move_next_bucket(void) {
        item *it, *next;
        int bucket;

        if (expanding) {
            for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
                next = it->h_next;

                bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
                it->h_next = primary_hashtable[bucket];
                primary_hashtable[bucket] = it;
            }

            old_hashtable[expand_bucket] = NULL;

            expand_bucket++;
            if (expand_bucket == hashsize(hashpower - 1)) {
                expanding = false;
                free(old_hashtable);
                if (settings.verbose > 1)
                    fprintf(stderr, "Hash table expansion done\n");
            }
        }
    }

    static void item_link_q(item *it) { /* item is the new head */
        item **head, **tail;
        /* always true, warns: assert(it->slabs_clsid <= LARGEST_ID); */
        assert((it->it_flags & ITEM_SLABBED) == 0);

        head = &heads[it->slabs_clsid];
        tail = &tails[it->slabs_clsid];
        assert(it != *head);
        assert((*head && *tail) || (*head == 0 && *tail == 0));
        it->prev = 0;
        it->next = *head;
        if (it->next) it->next->prev = it;
        *head = it;
        if (*tail == 0) *tail = it;
        sizes[it->slabs_clsid]++;
        return;
    }

    放到如下的LRU列表的头位置
    static item *heads[LARGEST_ID];
    static item *tails[LARGEST_ID];
    static unsigned int sizes[LARGEST_ID];

  • 相关阅读:
    鼠标放在图片上出现提示
    NSIS调用dll
    IIS7 CMD命令
    NSIS检测
    NSIS修改文件夹访问权限
    NSIS——检测IIS是否安装及版本
    NSIS——检测SQL Server安装版本
    NSIS使用技巧集合
    提供修复界面的NSIS安装包
    NSIS MUI教程
  • 原文地址:https://www.cnblogs.com/yanzhenan/p/2279524.html
Copyright © 2011-2022 走看看