zoukankan      html  css  js  c++  java
  • slabs.c

    /* -*- Mode: C; tab- 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
    /*
     * Slabs memory allocation, based on powers-of-N. Slabs are up to 1MB in size
     * and are divided into chunks. The chunk sizes start off at the size of the
     * "item" structure plus space for a small key and value. They increase by
     * a multiplier factor from there, up to half the maximum slab size. The last
     * slab size is always 1MB, since that's the maximum item size allowed by the
     * memcached protocol.
     */
    #include "memcached.h"
    #include <sys/stat.h>
    #include <sys/socket.h>
    #include <sys/resource.h>
    #include <fcntl.h>
    #include <netinet/in.h>
    #include <errno.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <string.h>
    #include <signal.h>
    #include <assert.h>
    #include <pthread.h>
    
    //#define DEBUG_SLAB_MOVER
    /* powers-of-N allocation structures */
    
    typedef struct {
        unsigned int size;      /* sizes of items */
        unsigned int perslab;   /* how many items per slab */
    
        void *slots;           /* list of item ptrs */
        unsigned int sl_curr;   /* total free items in list */
    
        unsigned int slabs;     /* how many slabs were allocated for this class */
    
        void **slab_list;       /* array of slab pointers */
        unsigned int list_size; /* size of prev array */
    
        size_t requested; /* The number of requested bytes */
    } slabclass_t;
    
    static slabclass_t slabclass[MAX_NUMBER_OF_SLAB_CLASSES];
    static size_t mem_limit = 0;
    static size_t mem_malloced = 0;
    /* If the memory limit has been hit once. Used as a hint to decide when to
     * early-wake the LRU maintenance thread */
    static bool mem_limit_reached = false;
    static int power_largest;
    
    static void *mem_base = NULL;
    static void *mem_current = NULL;
    static size_t mem_avail = 0;
    
    /**
     * Access to the slab allocator is protected by this lock
     */
    static pthread_mutex_t slabs_lock = PTHREAD_MUTEX_INITIALIZER;
    static pthread_mutex_t slabs_rebalance_lock = PTHREAD_MUTEX_INITIALIZER;
    
    /*
     * Forward Declarations
     */
    static int do_slabs_newslab(const unsigned int id);
    static void *memory_allocate(size_t size);
    static void do_slabs_free(void *ptr, const size_t size, unsigned int id);
    
    /* Preallocate as many slab pages as possible (called from slabs_init)
       on start-up, so users don't get confused out-of-memory errors when
       they do have free (in-slab) space, but no space to make new slabs.
       if maxslabs is 18 (POWER_LARGEST - POWER_SMALLEST + 1), then all
       slab types can be made.  if max memory is less than 18 MB, only the
       smaller ones will be made.  */
    static void slabs_preallocate (const unsigned int maxslabs);
    
    /*
     * Figures out which slab class (chunk size) is required to store an item of
     * a given size.
     *
     * Given object size, return id to use when allocating/freeing memory for object
     * 0 means error: can't store such a large object
     */
    
    unsigned int slabs_clsid(const size_t size) {
        int res = POWER_SMALLEST;
    
        if (size == 0 || size > settings.item_size_max)
            return 0;
        // commit: haizhu.shao 2016-12-03 19:17
        // 习惯写成for循环
        /**
         for (; res != power_largest; ++res)
         {
            if (size <= slabclass[res].size)
                break;
         }
         return res;
         *
         */
        while (size > slabclass[res].size)
            if (res++ == power_largest)     /* won't fit in the biggest slab */
                return power_largest;
        return res;
    }
    
    /**
     * Determines the chunk sizes and initializes the slab class descriptors
     * accordingly.
     */
    void slabs_init(const size_t limit, const double factor, const bool prealloc, const uint32_t *slab_sizes) {
        int i = POWER_SMALLEST - 1;
        unsigned int size = sizeof(item) + settings.chunk_size;
    
        mem_limit = limit;
    
        if (prealloc) {
            /* Allocate everything in a big chunk with malloc */
            mem_base = malloc(mem_limit);
            if (mem_base != NULL) {
                mem_current = mem_base;
                mem_avail = mem_limit;
            } else {
                fprintf(stderr, "Warning: Failed to allocate requested memory in"
                        " one large chunk.
    Will allocate in smaller chunks
    ");
            }
        }
    
        memset(slabclass, 0, sizeof(slabclass));
    
        while (++i < MAX_NUMBER_OF_SLAB_CLASSES-1) {
            if (slab_sizes != NULL) {
                if (slab_sizes[i-1] == 0)
                    break;
                size = slab_sizes[i-1];
            } else if (size >= settings.slab_chunk_size_max / factor) {
                break;
            }
            /* Make sure items are always n-byte aligned */
            if (size % CHUNK_ALIGN_BYTES)
                size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);
    
            slabclass[i].size = size;
            slabclass[i].perslab = settings.slab_page_size / slabclass[i].size;
            if (slab_sizes == NULL)
                size *= factor;
            if (settings.verbose > 1) {
                fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u
    ",
                        i, slabclass[i].size, slabclass[i].perslab);
            }
        }
    
        power_largest = i;
        slabclass[power_largest].size = settings.slab_chunk_size_max;
        slabclass[power_largest].perslab = settings.slab_page_size / settings.slab_chunk_size_max;
        if (settings.verbose > 1) {
            fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u
    ",
                    i, slabclass[i].size, slabclass[i].perslab);
        }
    
        /* for the test suite:  faking of how much we've already malloc'd */
        {
            char *t_initial_malloc = getenv("T_MEMD_INITIAL_MALLOC");
            if (t_initial_malloc) {
                mem_malloced = (size_t)atol(t_initial_malloc);
            }
    
        }
    
        if (prealloc) {
            slabs_preallocate(power_largest);
        }
    }
    
    static void slabs_preallocate (const unsigned int maxslabs) {
        int i;
        unsigned int prealloc = 0;
    
        /* pre-allocate a 1MB slab in every size class so people don't get
           confused by non-intuitive "SERVER_ERROR out of memory"
           messages.  this is the most common question on the mailing
           list.  if you really don't want this, you can rebuild without
           these three lines.  */
    
        for (i = POWER_SMALLEST; i < MAX_NUMBER_OF_SLAB_CLASSES; i++) {
            if (++prealloc > maxslabs)
                return;
            if (do_slabs_newslab(i) == 0) {
                fprintf(stderr, "Error while preallocating slab memory!
    "
                    "If using -L or other prealloc options, max memory must be "
                    "at least %d megabytes.
    ", power_largest);
                exit(1);
            }
        }
    
    }
    
    static int grow_slab_list (const unsigned int id) {
        slabclass_t *p = &slabclass[id];
        if (p->slabs == p->list_size) {
            size_t new_size =  (p->list_size != 0) ? p->list_size * 2 : 16;
            void *new_list = realloc(p->slab_list, new_size * sizeof(void *));
            if (new_list == 0) return 0;
            p->list_size = new_size;
            p->slab_list = new_list;
        }
        return 1;
    }
    
    static void split_slab_page_into_freelist(char *ptr, const unsigned int id) {
        slabclass_t *p = &slabclass[id];
        int x;
        for (x = 0; x < p->perslab; x++) {
            do_slabs_free(ptr, 0, id);
            ptr += p->size;
        }
    }
    
    /* Fast FIFO queue */
    static void *get_page_from_global_pool(void) {
        slabclass_t *p = &slabclass[SLAB_GLOBAL_PAGE_POOL];
        if (p->slabs < 1) {
            return NULL;
        }
        char *ret = p->slab_list[p->slabs - 1];
        p->slabs--;
        return ret;
    }
    
    static int do_slabs_newslab(const unsigned int id) {
        slabclass_t *p = &slabclass[id];
        slabclass_t *g = &slabclass[SLAB_GLOBAL_PAGE_POOL];
        int len = (settings.slab_reassign || settings.slab_chunk_size_max != settings.slab_page_size)
            ? settings.slab_page_size
            : p->size * p->perslab;
        char *ptr;
    
        if ((mem_limit && mem_malloced + len > mem_limit && p->slabs > 0
             && g->slabs == 0)) {
            mem_limit_reached = true;
            MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);
            return 0;
        }
    
        if ((grow_slab_list(id) == 0) ||
            (((ptr = get_page_from_global_pool()) == NULL) &&
            ((ptr = memory_allocate((size_t)len)) == 0))) {
    
            MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);
            return 0;
        }
    
        memset(ptr, 0, (size_t)len);
        split_slab_page_into_freelist(ptr, id);
    
        // commit: haizhu.shao 2016-12-03 20:50
        // 這裡ptr是保存在slots的,沒有保存在slab_list裡面。
        // 不懂這塊
        p->slab_list[p->slabs++] = ptr;
        MEMCACHED_SLABS_SLABCLASS_ALLOCATE(id);
    
        return 1;
    }
    
    /* This calculation ends up adding sizeof(void *) to the item size. */
    static void *do_slabs_alloc_chunked(const size_t size, slabclass_t *p, unsigned int id) {
        void *ret = NULL;
        item *it = NULL;
        int x;
        int csize = p->size - sizeof(item_chunk);
        unsigned int chunks_req = size / csize;
        if (size % csize != 0)
            chunks_req++;
        while (p->sl_curr < chunks_req) {
            if (do_slabs_newslab(id) == 0)
                break;
        }
    
        if (p->sl_curr >= chunks_req) {
            item_chunk *chunk = NULL;
    
            /* Configure the head item in the chain. */
            it = (item *)p->slots;
            p->slots = it->next;
            if (it->next) it->next->prev = 0;
    
            /* Squirrel away the "top chunk" into h_next for now */
            it->h_next = (item *)p->slots;
            assert(it->h_next != 0);
            chunk = (item_chunk *) it->h_next;
    
            /* roll down the chunks, marking them as such. */
            for (x = 0; x < chunks_req-1; x++) {
                chunk->it_flags &= ~ITEM_SLABBED;
                chunk->it_flags |= ITEM_CHUNK;
                /* Chunks always have a direct reference to the head item */
                chunk->head = it;
                chunk->size = p->size - sizeof(item_chunk);
                chunk->used = 0;
                chunk = chunk->next;
            }
    
            /* The final "next" is now the top of the slab freelist */
            p->slots = chunk;
            if (chunk && chunk->prev) {
                /* Disconnect the final chunk from the chain */
                chunk->prev->next = 0;
                chunk->prev = 0;
            }
    
            it->it_flags &= ~ITEM_SLABBED;
            it->it_flags |= ITEM_CHUNKED;
            it->refcount = 1;
            p->sl_curr -= chunks_req;
            ret = (void *)it;
        } else {
            ret = NULL;
        }
    
        return ret;
    }
    
    /*@null@*/
    static void *do_slabs_alloc(const size_t size, unsigned int id, uint64_t *total_bytes,
            unsigned int flags) {
        slabclass_t *p;
        void *ret = NULL;
        item *it = NULL;
    
        if (id < POWER_SMALLEST || id > power_largest) {
            MEMCACHED_SLABS_ALLOCATE_FAILED(size, 0);
            return NULL;
        }
        p = &slabclass[id];
        assert(p->sl_curr == 0 || ((item *)p->slots)->slabs_clsid == 0);
        if (total_bytes != NULL) {
            *total_bytes = p->requested;
        }
    
        if (size <= p->size) {
            /* fail unless we have space at the end of a recently allocated page,
               we have something on our freelist, or we could allocate a new page */
            if (p->sl_curr == 0 && flags != SLABS_ALLOC_NO_NEWPAGE) {
                do_slabs_newslab(id);
            }
    
            if (p->sl_curr != 0) {
                /* return off our freelist */
                it = (item *)p->slots;
                p->slots = it->next;
                if (it->next) it->next->prev = 0;
                /* Kill flag and initialize refcount here for lock safety in slab
                 * mover's freeness detection. */
                it->it_flags &= ~ITEM_SLABBED;
                it->refcount = 1;
                p->sl_curr--;
                ret = (void *)it;
            } else {
                ret = NULL;
            }
        } else {
            /* Dealing with a chunked item. */
            ret = do_slabs_alloc_chunked(size, p, id);
        }
    
        if (ret) {
            p->requested += size;
            MEMCACHED_SLABS_ALLOCATE(size, id, p->size, ret);
        } else {
            MEMCACHED_SLABS_ALLOCATE_FAILED(size, id);
        }
    
        return ret;
    }
    
    // commit: haizhu.shao 2016-12-03 21:40
    // TODO: alloc_chunk和free_chunk第一个chunk的分配不一样啊
    // alloc_chunk时候并没有ITEM_data
    static void do_slabs_free_chunked(item *it, const size_t size, unsigned int id,
                                      slabclass_t *p) {
        item_chunk *chunk = (item_chunk *) ITEM_data(it);
        size_t realsize = size;
        while (chunk) {
            realsize += sizeof(item_chunk);
            chunk = chunk->next;
        }
        chunk = (item_chunk *) ITEM_data(it);
        unsigned int chunks_found = 1;
    
        it->it_flags = ITEM_SLABBED;
        it->slabs_clsid = 0;
        it->prev = 0;
        it->next = (item *) chunk->next;
        assert(it->next);
        /* top chunk should already point back to head */
        assert(it->next && (void*)it->next->prev == (void*)chunk);
        chunk = chunk->next;
        chunk->prev = (item_chunk *)it;
    
        while (chunk) {
            assert(chunk->it_flags == ITEM_CHUNK);
            chunk->it_flags = ITEM_SLABBED;
            chunk->slabs_clsid = 0;
            chunks_found++;
            if (chunk->next) {
                chunk = chunk->next;
            } else {
                break;
            }
        }
        /* must have had nothing hanging off of the final chunk */
        assert(chunk && chunk->next == 0);
        /* Tail chunk, link the freelist here. */
        chunk->next = p->slots;
        if (chunk->next) chunk->next->prev = chunk;
    
        p->slots = it;
        p->sl_curr += chunks_found;
        p->requested -= size;
    
        return;
    }
    
    
    static void do_slabs_free(void *ptr, const size_t size, unsigned int id) {
        slabclass_t *p;
        item *it;
    
        assert(id >= POWER_SMALLEST && id <= power_largest);
        if (id < POWER_SMALLEST || id > power_largest)
            return;
    
        MEMCACHED_SLABS_FREE(size, id, ptr);
        p = &slabclass[id];
    
        it = (item *)ptr;
        if ((it->it_flags & ITEM_CHUNKED) == 0) {
            it->it_flags = ITEM_SLABBED;
            it->slabs_clsid = 0;
            it->prev = 0;
            it->next = p->slots;
            if (it->next) it->next->prev = it;
            p->slots = it;
    
            p->sl_curr++;
            p->requested -= size;
        } else {
            do_slabs_free_chunked(it, size, id, p);
        }
        return;
    }
    
    static int nz_strcmp(int nzlength, const char *nz, const char *z) {
        int zlength=strlen(z);
        return (zlength == nzlength) && (strncmp(nz, z, zlength) == 0) ? 0 : -1;
    }
    
    bool get_stats(const char *stat_type, int nkey, ADD_STAT add_stats, void *c) {
        bool ret = true;
    
        if (add_stats != NULL) {
            if (!stat_type) {
                /* prepare general statistics for the engine */
                STATS_LOCK();
                APPEND_STAT("bytes", "%llu", (unsigned long long)stats_state.curr_bytes);
                APPEND_STAT("curr_items", "%llu", (unsigned long long)stats_state.curr_items);
                APPEND_STAT("total_items", "%llu", (unsigned long long)stats.total_items);
                STATS_UNLOCK();
                if (settings.slab_automove > 0) {
                    pthread_mutex_lock(&slabs_lock);
                    APPEND_STAT("slab_global_page_pool", "%u", slabclass[SLAB_GLOBAL_PAGE_POOL].slabs);
                    pthread_mutex_unlock(&slabs_lock);
                }
                item_stats_totals(add_stats, c);
            } else if (nz_strcmp(nkey, stat_type, "items") == 0) {
                item_stats(add_stats, c);
            } else if (nz_strcmp(nkey, stat_type, "slabs") == 0) {
                slabs_stats(add_stats, c);
            } else if (nz_strcmp(nkey, stat_type, "sizes") == 0) {
                item_stats_sizes(add_stats, c);
            } else if (nz_strcmp(nkey, stat_type, "sizes_enable") == 0) {
                item_stats_sizes_enable(add_stats, c);
            } else if (nz_strcmp(nkey, stat_type, "sizes_disable") == 0) {
                item_stats_sizes_disable(add_stats, c);
            } else {
                ret = false;
            }
        } else {
            ret = false;
        }
    
        return ret;
    }
    
    /*@null@*/
    static void do_slabs_stats(ADD_STAT add_stats, void *c) {
        int i, total;
        /* Get the per-thread stats which contain some interesting aggregates */
        struct thread_stats thread_stats;
        threadlocal_stats_aggregate(&thread_stats);
    
        total = 0;
        for(i = POWER_SMALLEST; i <= power_largest; i++) {
            slabclass_t *p = &slabclass[i];
            if (p->slabs != 0) {
                uint32_t perslab, slabs;
                slabs = p->slabs;
                perslab = p->perslab;
    
                char key_str[STAT_KEY_LEN];
                char val_str[STAT_VAL_LEN];
                int klen = 0, vlen = 0;
    
                APPEND_NUM_STAT(i, "chunk_size", "%u", p->size);
                APPEND_NUM_STAT(i, "chunks_per_page", "%u", perslab);
                APPEND_NUM_STAT(i, "total_pages", "%u", slabs);
                APPEND_NUM_STAT(i, "total_chunks", "%u", slabs * perslab);
                APPEND_NUM_STAT(i, "used_chunks", "%u",
                                slabs*perslab - p->sl_curr);
                APPEND_NUM_STAT(i, "free_chunks", "%u", p->sl_curr);
                /* Stat is dead, but displaying zero instead of removing it. */
                APPEND_NUM_STAT(i, "free_chunks_end", "%u", 0);
                APPEND_NUM_STAT(i, "mem_requested", "%llu",
                                (unsigned long long)p->requested);
                APPEND_NUM_STAT(i, "get_hits", "%llu",
                        (unsigned long long)thread_stats.slab_stats[i].get_hits);
                APPEND_NUM_STAT(i, "cmd_set", "%llu",
                        (unsigned long long)thread_stats.slab_stats[i].set_cmds);
                APPEND_NUM_STAT(i, "delete_hits", "%llu",
                        (unsigned long long)thread_stats.slab_stats[i].delete_hits);
                APPEND_NUM_STAT(i, "incr_hits", "%llu",
                        (unsigned long long)thread_stats.slab_stats[i].incr_hits);
                APPEND_NUM_STAT(i, "decr_hits", "%llu",
                        (unsigned long long)thread_stats.slab_stats[i].decr_hits);
                APPEND_NUM_STAT(i, "cas_hits", "%llu",
                        (unsigned long long)thread_stats.slab_stats[i].cas_hits);
                APPEND_NUM_STAT(i, "cas_badval", "%llu",
                        (unsigned long long)thread_stats.slab_stats[i].cas_badval);
                APPEND_NUM_STAT(i, "touch_hits", "%llu",
                        (unsigned long long)thread_stats.slab_stats[i].touch_hits);
                total++;
            }
        }
    
        /* add overall slab stats and append terminator */
    
        APPEND_STAT("active_slabs", "%d", total);
        APPEND_STAT("total_malloced", "%llu", (unsigned long long)mem_malloced);
        add_stats(NULL, 0, NULL, 0, c);
    }
    
    static void *memory_allocate(size_t size) {
        void *ret;
    
        if (mem_base == NULL) {
            /* We are not using a preallocated large memory chunk */
            ret = malloc(size);
        } else {
            ret = mem_current;
    
            if (size > mem_avail) {
                return NULL;
            }
    
            /* mem_current pointer _must_ be aligned!!! */
            if (size % CHUNK_ALIGN_BYTES) {
                size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);
            }
    
            mem_current = ((char*)mem_current) + size;
            if (size < mem_avail) {
                mem_avail -= size;
            } else {
                mem_avail = 0;
            }
        }
        mem_malloced += size;
    
        return ret;
    }
    
    /* Must only be used if all pages are item_size_max */
    static void memory_release() {
        void *p = NULL;
        // commit: haizhu.shao 2016-12-03 21:45
        // TODO: why?
        if (mem_base != NULL)
            return;
    
        // commit: haizhu.shao 2016-12-03 21:45
        // TODO: why? slab_reassign是啥东西
        if (!settings.slab_reassign)
            return;
    
        while (mem_malloced > mem_limit &&
                (p = get_page_from_global_pool()) != NULL) {
            free(p);
            mem_malloced -= settings.item_size_max;
        }
    }
    
    void *slabs_alloc(size_t size, unsigned int id, uint64_t *total_bytes,
            unsigned int flags) {
        void *ret;
    
        pthread_mutex_lock(&slabs_lock);
        ret = do_slabs_alloc(size, id, total_bytes, flags);
        pthread_mutex_unlock(&slabs_lock);
        return ret;
    }
    
    void slabs_free(void *ptr, size_t size, unsigned int id) {
        pthread_mutex_lock(&slabs_lock);
        do_slabs_free(ptr, size, id);
        pthread_mutex_unlock(&slabs_lock);
    }
    
    void slabs_stats(ADD_STAT add_stats, void *c) {
        pthread_mutex_lock(&slabs_lock);
        do_slabs_stats(add_stats, c);
        pthread_mutex_unlock(&slabs_lock);
    }
    
    static bool do_slabs_adjust_mem_limit(size_t new_mem_limit) {
        /* Cannot adjust memory limit at runtime if prealloc'ed */
        if (mem_base != NULL)
            return false;
        settings.maxbytes = new_mem_limit;
        mem_limit = new_mem_limit;
        mem_limit_reached = false; /* Will reset on next alloc */
        memory_release(); /* free what might already be in the global pool */
        return true;
    }
    
    bool slabs_adjust_mem_limit(size_t new_mem_limit) {
        bool ret;
        pthread_mutex_lock(&slabs_lock);
        ret = do_slabs_adjust_mem_limit(new_mem_limit);
        pthread_mutex_unlock(&slabs_lock);
        return ret;
    }
    
    // commit: haizhu.shao 2016-12-03 21:49
    // TODO: 不懂这个函数的目的
    void slabs_adjust_mem_requested(unsigned int id, size_t old, size_t ntotal)
    {
        pthread_mutex_lock(&slabs_lock);
        slabclass_t *p;
        if (id < POWER_SMALLEST || id > power_largest) {
            fprintf(stderr, "Internal error! Invalid slab class
    ");
            abort();
        }
    
        p = &slabclass[id];
        p->requested = p->requested - old + ntotal;
        pthread_mutex_unlock(&slabs_lock);
    }
    
    unsigned int slabs_available_chunks(const unsigned int id, bool *mem_flag,
            uint64_t *total_bytes, unsigned int *chunks_perslab) {
        unsigned int ret;
        slabclass_t *p;
    
        pthread_mutex_lock(&slabs_lock);
        p = &slabclass[id];
        ret = p->sl_curr;
        if (mem_flag != NULL)
            *mem_flag = mem_limit_reached;
        if (total_bytes != NULL)
            *total_bytes = p->requested;
        if (chunks_perslab != NULL)
            *chunks_perslab = p->perslab;
        pthread_mutex_unlock(&slabs_lock);
        return ret;
    }
    
    static pthread_cond_t slab_rebalance_cond = PTHREAD_COND_INITIALIZER;
    static volatile int do_run_slab_thread = 1;
    static volatile int do_run_slab_rebalance_thread = 1;
    
    #define DEFAULT_SLAB_BULK_CHECK 1
    int slab_bulk_check = DEFAULT_SLAB_BULK_CHECK;
    
    static int slab_rebalance_start(void) {
        slabclass_t *s_cls;
        int no_go = 0;
    
        pthread_mutex_lock(&slabs_lock);
    
        if (slab_rebal.s_clsid < POWER_SMALLEST ||
            slab_rebal.s_clsid > power_largest  ||
            slab_rebal.d_clsid < SLAB_GLOBAL_PAGE_POOL ||
            slab_rebal.d_clsid > power_largest  ||
            slab_rebal.s_clsid == slab_rebal.d_clsid)
            no_go = -2;
    
        s_cls = &slabclass[slab_rebal.s_clsid];
    
        if (!grow_slab_list(slab_rebal.d_clsid)) {
            no_go = -1;
        }
    
        if (s_cls->slabs < 2)
            no_go = -3;
    
        if (no_go != 0) {
            pthread_mutex_unlock(&slabs_lock);
            return no_go; /* Should use a wrapper function... */
        }
    
        /* Always kill the first available slab page as it is most likely to
         * contain the oldest items
         */
        slab_rebal.slab_start = s_cls->slab_list[0];
        slab_rebal.slab_end   = (char *)slab_rebal.slab_start +
            (s_cls->size * s_cls->perslab);
        slab_rebal.slab_pos   = slab_rebal.slab_start;
        slab_rebal.done       = 0;
    
        /* Also tells do_item_get to search for items in this slab */
        slab_rebalance_signal = 2;
    
        if (settings.verbose > 1) {
            fprintf(stderr, "Started a slab rebalance
    ");
        }
    
        pthread_mutex_unlock(&slabs_lock);
    
        STATS_LOCK();
        stats_state.slab_reassign_running = true;
        STATS_UNLOCK();
    
        return 0;
    }
    
    /* CALLED WITH slabs_lock HELD */
    static void *slab_rebalance_alloc(const size_t size, unsigned int id) {
        slabclass_t *s_cls;
        s_cls = &slabclass[slab_rebal.s_clsid];
        int x;
        item *new_it = NULL;
    
        for (x = 0; x < s_cls->perslab; x++) {
            new_it = do_slabs_alloc(size, id, NULL, SLABS_ALLOC_NO_NEWPAGE);
            /* check that memory isn't within the range to clear */
            if (new_it == NULL) {
                break;
            }
            if ((void *)new_it >= slab_rebal.slab_start
                && (void *)new_it < slab_rebal.slab_end) {
                /* Pulled something we intend to free. Mark it as freed since
                 * we've already done the work of unlinking it from the freelist.
                 */
                s_cls->requested -= size;
                new_it->refcount = 0;
                new_it->it_flags = ITEM_SLABBED|ITEM_FETCHED;
    #ifdef DEBUG_SLAB_MOVER
                memcpy(ITEM_key(new_it), "deadbeef", 8);
    #endif
                new_it = NULL;
                slab_rebal.inline_reclaim++;
            } else {
                break;
            }
        }
        return new_it;
    }
    
    /* CALLED WITH slabs_lock HELD */
    /* detatches item/chunk from freelist. */
    static void slab_rebalance_cut_free(slabclass_t *s_cls, item *it) {
        /* Ensure this was on the freelist and nothing else. */
        assert(it->it_flags == ITEM_SLABBED);
        if (s_cls->slots == it) {
            s_cls->slots = it->next;
        }
        if (it->next) it->next->prev = it->prev;
        if (it->prev) it->prev->next = it->next;
        s_cls->sl_curr--;
    }
    
    enum move_status {
        MOVE_PASS=0, MOVE_FROM_SLAB, MOVE_FROM_LRU, MOVE_BUSY, MOVE_LOCKED
    };
    
    /* refcount == 0 is safe since nobody can incr while item_lock is held.
     * refcount != 0 is impossible since flags/etc can be modified in other
     * threads. instead, note we found a busy one and bail. logic in do_item_get
     * will prevent busy items from continuing to be busy
     * NOTE: This is checking it_flags outside of an item lock. I believe this
     * works since it_flags is 8 bits, and we're only ever comparing a single bit
     * regardless. ITEM_SLABBED bit will always be correct since we're holding the
     * lock which modifies that bit. ITEM_LINKED won't exist if we're between an
     * item having ITEM_SLABBED removed, and the key hasn't been added to the item
     * yet. The memory barrier from the slabs lock should order the key write and the
     * flags to the item?
     * If ITEM_LINKED did exist and was just removed, but we still see it, that's
     * still safe since it will have a valid key, which we then lock, and then
     * recheck everything.
     * This may not be safe on all platforms; If not, slabs_alloc() will need to
     * seed the item key while holding slabs_lock.
     */
    static int slab_rebalance_move(void) {
        slabclass_t *s_cls;
        int x;
        int was_busy = 0;
        int refcount = 0;
        uint32_t hv;
        void *hold_lock;
        enum move_status status = MOVE_PASS;
    
        pthread_mutex_lock(&slabs_lock);
    
        s_cls = &slabclass[slab_rebal.s_clsid];
    
        for (x = 0; x < slab_bulk_check; x++) {
            hv = 0;
            hold_lock = NULL;
            item *it = slab_rebal.slab_pos;
            item_chunk *ch = NULL;
            status = MOVE_PASS;
            if (it->it_flags & ITEM_CHUNK) {
                /* This chunk is a chained part of a larger item. */
                ch = (item_chunk *) it;
                /* Instead, we use the head chunk to find the item and effectively
                 * lock the entire structure. If a chunk has ITEM_CHUNK flag, its
                 * head cannot be slabbed, so the normal routine is safe. */
                it = ch->head;
                assert(it->it_flags & ITEM_CHUNKED);
            }
    
            /* ITEM_FETCHED when ITEM_SLABBED is overloaded to mean we've cleared
             * the chunk for move. Only these two flags should exist.
             */
            if (it->it_flags != (ITEM_SLABBED|ITEM_FETCHED)) {
                /* ITEM_SLABBED can only be added/removed under the slabs_lock */
                if (it->it_flags & ITEM_SLABBED) {
                    assert(ch == NULL);
                    slab_rebalance_cut_free(s_cls, it);
                    status = MOVE_FROM_SLAB;
                } else if ((it->it_flags & ITEM_LINKED) != 0) {
                    /* If it doesn't have ITEM_SLABBED, the item could be in any
                     * state on its way to being freed or written to. If no
                     * ITEM_SLABBED, but it's had ITEM_LINKED, it must be active
                     * and have the key written to it already.
                     */
                    hv = hash(ITEM_key(it), it->nkey);
                    if ((hold_lock = item_trylock(hv)) == NULL) {
                        status = MOVE_LOCKED;
                    } else {
                        refcount = refcount_incr(&it->refcount);
                        if (refcount == 2) { /* item is linked but not busy */
                            /* Double check ITEM_LINKED flag here, since we're
                             * past a memory barrier from the mutex. */
                            if ((it->it_flags & ITEM_LINKED) != 0) {
                                status = MOVE_FROM_LRU;
                            } else {
                                /* refcount == 1 + !ITEM_LINKED means the item is being
                                 * uploaded to, or was just unlinked but hasn't been freed
                                 * yet. Let it bleed off on its own and try again later */
                                status = MOVE_BUSY;
                            }
                        } else {
                            if (settings.verbose > 2) {
                                fprintf(stderr, "Slab reassign hit a busy item: refcount: %d (%d -> %d)
    ",
                                    it->refcount, slab_rebal.s_clsid, slab_rebal.d_clsid);
                            }
                            status = MOVE_BUSY;
                        }
                        /* Item lock must be held while modifying refcount */
                        if (status == MOVE_BUSY) {
                            refcount_decr(&it->refcount);
                            item_trylock_unlock(hold_lock);
                        }
                    }
                } else {
                    /* See above comment. No ITEM_SLABBED or ITEM_LINKED. Mark
                     * busy and wait for item to complete its upload. */
                    status = MOVE_BUSY;
                }
            }
    
            int save_item = 0;
            item *new_it = NULL;
            size_t ntotal = 0;
            switch (status) {
                case MOVE_FROM_LRU:
                    /* Lock order is LRU locks -> slabs_lock. unlink uses LRU lock.
                     * We only need to hold the slabs_lock while initially looking
                     * at an item, and at this point we have an exclusive refcount
                     * (2) + the item is locked. Drop slabs lock, drop item to
                     * refcount 1 (just our own, then fall through and wipe it
                     */
                    /* Check if expired or flushed */
                    ntotal = ITEM_ntotal(it);
                    /* REQUIRES slabs_lock: CHECK FOR cls->sl_curr > 0 */
                    if (ch == NULL && (it->it_flags & ITEM_CHUNKED)) {
                        /* Chunked should be identical to non-chunked, except we need
                         * to swap out ntotal for the head-chunk-total. */
                        ntotal = s_cls->size;
                    }
                    if ((it->exptime != 0 && it->exptime < current_time)
                        || item_is_flushed(it)) {
                        /* Expired, don't save. */
                        save_item = 0;
                    } else if (ch == NULL &&
                            (new_it = slab_rebalance_alloc(ntotal, slab_rebal.s_clsid)) == NULL) {
                        /* Not a chunk of an item, and nomem. */
                        save_item = 0;
                        slab_rebal.evictions_nomem++;
                    } else if (ch != NULL &&
                            (new_it = slab_rebalance_alloc(s_cls->size, slab_rebal.s_clsid)) == NULL) {
                        /* Is a chunk of an item, and nomem. */
                        save_item = 0;
                        slab_rebal.evictions_nomem++;
                    } else {
                        /* Was whatever it was, and we have memory for it. */
                        save_item = 1;
                    }
                    pthread_mutex_unlock(&slabs_lock);
                    unsigned int requested_adjust = 0;
                    if (save_item) {
                        if (ch == NULL) {
                            assert((new_it->it_flags & ITEM_CHUNKED) == 0);
                            /* if free memory, memcpy. clear prev/next/h_bucket */
                            memcpy(new_it, it, ntotal);
                            new_it->prev = 0;
                            new_it->next = 0;
                            new_it->h_next = 0;
                            /* These are definitely required. else fails assert */
                            new_it->it_flags &= ~ITEM_LINKED;
                            new_it->refcount = 0;
                            do_item_replace(it, new_it, hv);
                            /* Need to walk the chunks and repoint head  */
                            if (new_it->it_flags & ITEM_CHUNKED) {
                                item_chunk *fch = (item_chunk *) ITEM_data(new_it);
                                fch->next->prev = fch;
                                while (fch) {
                                    fch->head = new_it;
                                    fch = fch->next;
                                }
                            }
                            it->refcount = 0;
                            it->it_flags = ITEM_SLABBED|ITEM_FETCHED;
    #ifdef DEBUG_SLAB_MOVER
                            memcpy(ITEM_key(it), "deadbeef", 8);
    #endif
                            slab_rebal.rescues++;
                            requested_adjust = ntotal;
                        } else {
                            item_chunk *nch = (item_chunk *) new_it;
                            /* Chunks always have head chunk (the main it) */
                            ch->prev->next = nch;
                            if (ch->next)
                                ch->next->prev = nch;
                            memcpy(nch, ch, ch->used + sizeof(item_chunk));
                            ch->refcount = 0;
                            ch->it_flags = ITEM_SLABBED|ITEM_FETCHED;
                            slab_rebal.chunk_rescues++;
    #ifdef DEBUG_SLAB_MOVER
                            memcpy(ITEM_key((item *)ch), "deadbeef", 8);
    #endif
                            refcount_decr(&it->refcount);
                            requested_adjust = s_cls->size;
                        }
                    } else {
                        /* restore ntotal in case we tried saving a head chunk. */
                        ntotal = ITEM_ntotal(it);
                        do_item_unlink(it, hv);
                        slabs_free(it, ntotal, slab_rebal.s_clsid);
                        /* Swing around again later to remove it from the freelist. */
                        slab_rebal.busy_items++;
                        was_busy++;
                    }
                    item_trylock_unlock(hold_lock);
                    pthread_mutex_lock(&slabs_lock);
                    /* Always remove the ntotal, as we added it in during
                     * do_slabs_alloc() when copying the item.
                     */
                    s_cls->requested -= requested_adjust;
                    break;
                case MOVE_FROM_SLAB:
                    it->refcount = 0;
                    it->it_flags = ITEM_SLABBED|ITEM_FETCHED;
    #ifdef DEBUG_SLAB_MOVER
                    memcpy(ITEM_key(it), "deadbeef", 8);
    #endif
                    break;
                case MOVE_BUSY:
                case MOVE_LOCKED:
                    slab_rebal.busy_items++;
                    was_busy++;
                    break;
                case MOVE_PASS:
                    break;
            }
    
            slab_rebal.slab_pos = (char *)slab_rebal.slab_pos + s_cls->size;
            if (slab_rebal.slab_pos >= slab_rebal.slab_end)
                break;
        }
    
        if (slab_rebal.slab_pos >= slab_rebal.slab_end) {
            /* Some items were busy, start again from the top */
            if (slab_rebal.busy_items) {
                slab_rebal.slab_pos = slab_rebal.slab_start;
                STATS_LOCK();
                stats.slab_reassign_busy_items += slab_rebal.busy_items;
                STATS_UNLOCK();
                slab_rebal.busy_items = 0;
            } else {
                slab_rebal.done++;
            }
        }
    
        pthread_mutex_unlock(&slabs_lock);
    
        return was_busy;
    }
    
    static void slab_rebalance_finish(void) {
        slabclass_t *s_cls;
        slabclass_t *d_cls;
        int x;
        uint32_t rescues;
        uint32_t evictions_nomem;
        uint32_t inline_reclaim;
        uint32_t chunk_rescues;
    
        pthread_mutex_lock(&slabs_lock);
    
        s_cls = &slabclass[slab_rebal.s_clsid];
        d_cls = &slabclass[slab_rebal.d_clsid];
    
    #ifdef DEBUG_SLAB_MOVER
        /* If the algorithm is broken, live items can sneak in. */
        slab_rebal.slab_pos = slab_rebal.slab_start;
        while (1) {
            item *it = slab_rebal.slab_pos;
            assert(it->it_flags == (ITEM_SLABBED|ITEM_FETCHED));
            assert(memcmp(ITEM_key(it), "deadbeef", 8) == 0);
            it->it_flags = ITEM_SLABBED|ITEM_FETCHED;
            slab_rebal.slab_pos = (char *)slab_rebal.slab_pos + s_cls->size;
            if (slab_rebal.slab_pos >= slab_rebal.slab_end)
                break;
        }
    #endif
    
        /* At this point the stolen slab is completely clear.
         * We always kill the "first"/"oldest" slab page in the slab_list, so
         * shuffle the page list backwards and decrement.
         */
        s_cls->slabs--;
        for (x = 0; x < s_cls->slabs; x++) {
            s_cls->slab_list[x] = s_cls->slab_list[x+1];
        }
    
        d_cls->slab_list[d_cls->slabs++] = slab_rebal.slab_start;
        /* Don't need to split the page into chunks if we're just storing it */
        if (slab_rebal.d_clsid > SLAB_GLOBAL_PAGE_POOL) {
            memset(slab_rebal.slab_start, 0, (size_t)settings.item_size_max);
            split_slab_page_into_freelist(slab_rebal.slab_start,
                slab_rebal.d_clsid);
        } else if (slab_rebal.d_clsid == SLAB_GLOBAL_PAGE_POOL) {
            /* mem_malloc'ed might be higher than mem_limit. */
            memory_release();
        }
    
        slab_rebal.done       = 0;
        slab_rebal.s_clsid    = 0;
        slab_rebal.d_clsid    = 0;
        slab_rebal.slab_start = NULL;
        slab_rebal.slab_end   = NULL;
        slab_rebal.slab_pos   = NULL;
        evictions_nomem    = slab_rebal.evictions_nomem;
        inline_reclaim = slab_rebal.inline_reclaim;
        rescues   = slab_rebal.rescues;
        chunk_rescues = slab_rebal.chunk_rescues;
        slab_rebal.evictions_nomem    = 0;
        slab_rebal.inline_reclaim = 0;
        slab_rebal.rescues  = 0;
    
        slab_rebalance_signal = 0;
    
        pthread_mutex_unlock(&slabs_lock);
    
        STATS_LOCK();
        stats.slabs_moved++;
        stats.slab_reassign_rescues += rescues;
        stats.slab_reassign_evictions_nomem += evictions_nomem;
        stats.slab_reassign_inline_reclaim += inline_reclaim;
        stats.slab_reassign_chunk_rescues += chunk_rescues;
        stats_state.slab_reassign_running = false;
        STATS_UNLOCK();
    
        if (settings.verbose > 1) {
            fprintf(stderr, "finished a slab move
    ");
        }
    }
    
    /* Slab mover thread.
     * Sits waiting for a condition to jump off and shovel some memory about
     */
    static void *slab_rebalance_thread(void *arg) {
        int was_busy = 0;
        /* So we first pass into cond_wait with the mutex held */
        mutex_lock(&slabs_rebalance_lock);
    
        while (do_run_slab_rebalance_thread) {
            if (slab_rebalance_signal == 1) {
                if (slab_rebalance_start() < 0) {
                    /* Handle errors with more specifity as required. */
                    slab_rebalance_signal = 0;
                }
    
                was_busy = 0;
            } else if (slab_rebalance_signal && slab_rebal.slab_start != NULL) {
                was_busy = slab_rebalance_move();
            }
    
            if (slab_rebal.done) {
                slab_rebalance_finish();
            } else if (was_busy) {
                /* Stuck waiting for some items to unlock, so slow down a bit
                 * to give them a chance to free up */
                usleep(50);
            }
    
            if (slab_rebalance_signal == 0) {
                /* always hold this lock while we're running */
                pthread_cond_wait(&slab_rebalance_cond, &slabs_rebalance_lock);
            }
        }
        return NULL;
    }
    
    /* Iterate at most once through the slab classes and pick a "random" source.
     * I like this better than calling rand() since rand() is slow enough that we
     * can just check all of the classes once instead.
     */
    static int slabs_reassign_pick_any(int dst) {
        static int cur = POWER_SMALLEST - 1;
        int tries = power_largest - POWER_SMALLEST + 1;
        for (; tries > 0; tries--) {
            cur++;
            if (cur > power_largest)
                cur = POWER_SMALLEST;
            if (cur == dst)
                continue;
            if (slabclass[cur].slabs > 1) {
                return cur;
            }
        }
        return -1;
    }
    
    static enum reassign_result_type do_slabs_reassign(int src, int dst) {
        if (slab_rebalance_signal != 0)
            return REASSIGN_RUNNING;
    
        if (src == dst)
            return REASSIGN_SRC_DST_SAME;
    
        /* Special indicator to choose ourselves. */
        if (src == -1) {
            src = slabs_reassign_pick_any(dst);
            /* TODO: If we end up back at -1, return a new error type */
        }
    
        if (src < POWER_SMALLEST        || src > power_largest ||
            dst < SLAB_GLOBAL_PAGE_POOL || dst > power_largest)
            return REASSIGN_BADCLASS;
    
        if (slabclass[src].slabs < 2)
            return REASSIGN_NOSPARE;
    
        slab_rebal.s_clsid = src;
        slab_rebal.d_clsid = dst;
    
        slab_rebalance_signal = 1;
        pthread_cond_signal(&slab_rebalance_cond);
    
        return REASSIGN_OK;
    }
    
    enum reassign_result_type slabs_reassign(int src, int dst) {
        enum reassign_result_type ret;
        if (pthread_mutex_trylock(&slabs_rebalance_lock) != 0) {
            return REASSIGN_RUNNING;
        }
        ret = do_slabs_reassign(src, dst);
        pthread_mutex_unlock(&slabs_rebalance_lock);
        return ret;
    }
    
    /* If we hold this lock, rebalancer can't wake up or move */
    void slabs_rebalancer_pause(void) {
        pthread_mutex_lock(&slabs_rebalance_lock);
    }
    
    void slabs_rebalancer_resume(void) {
        pthread_mutex_unlock(&slabs_rebalance_lock);
    }
    
    static pthread_t rebalance_tid;
    
    int start_slab_maintenance_thread(void) {
        int ret;
        slab_rebalance_signal = 0;
        slab_rebal.slab_start = NULL;
        char *env = getenv("MEMCACHED_SLAB_BULK_CHECK");
        if (env != NULL) {
            slab_bulk_check = atoi(env);
            if (slab_bulk_check == 0) {
                slab_bulk_check = DEFAULT_SLAB_BULK_CHECK;
            }
        }
    
        if (pthread_cond_init(&slab_rebalance_cond, NULL) != 0) {
            fprintf(stderr, "Can't intiialize rebalance condition
    ");
            return -1;
        }
        pthread_mutex_init(&slabs_rebalance_lock, NULL);
    
        if ((ret = pthread_create(&rebalance_tid, NULL,
                                  slab_rebalance_thread, NULL)) != 0) {
            fprintf(stderr, "Can't create rebal thread: %s
    ", strerror(ret));
            return -1;
        }
        return 0;
    }
    
    /* The maintenance thread is on a sleep/loop cycle, so it should join after a
     * short wait */
    void stop_slab_maintenance_thread(void) {
        mutex_lock(&slabs_rebalance_lock);
        do_run_slab_thread = 0;
        do_run_slab_rebalance_thread = 0;
        pthread_cond_signal(&slab_rebalance_cond);
        pthread_mutex_unlock(&slabs_rebalance_lock);
    
        /* Wait for the maintenance thread to stop */
        pthread_join(rebalance_tid, NULL);
    }
  • 相关阅读:
    Openwrt智能路由系统开发--内容总结
    EdgeX foundry树莓派搭建与MQTT通讯
    C语言学习截图保存
    使用python生成Latex公式语法
    数据结构与算法之排序
    数据结构与算法之常用数据结构
    MySQL数据库规范 (设计规范+开发规范+操作规范)
    动态规划之LIS(最长上升子序列)
    动态规划之区间DP
    动态规划之状态压缩DP
  • 原文地址:https://www.cnblogs.com/i4oolish/p/6129961.html
Copyright © 2011-2022 走看看