zoukankan      html  css  js  c++  java
  • memcached(十)动态扩容

    HASH的扩容

      一般来说,hash容器都会在初始化的时候创建一块空间用于存放对象,对象的数量到达临界值后扩容整个空间而不是一边增加/删除对象一边分配空间。本人根据是否阻塞,区分为静态扩容动态扩容

    静态扩容

      参考java.util.HashMap的实现。在put方法中,

        public V put(K key, V value) {
            if (key == null)
                return putForNullKey(value);
            int hash = hash(key.hashCode());
            int i = indexFor(hash, table.length);
            for (Entry<K,V> e = table[i]; e != null; e = e.next) {
                Object k;
                if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
                    V oldValue = e.value;
                    e.value = value;
                    e.recordAccess(this);
                    return oldValue;
                }
            }
    
            modCount++;
            addEntry(hash, key, value, i);
            return null;
        }

      addEntry:

        void addEntry(int hash, K key, V value, int bucketIndex) {
        Entry<K,V> e = table[bucketIndex];
            table[bucketIndex] = new Entry<K,V>(hash, key, value, e);
            if (size++ >= threshold)
                resize(2 * table.length);
        }

      具体的resize动作:

        /**
         * Rehashes the contents of this map into a new array with a
         * larger capacity.  This method is called automatically when the
         * number of keys in this map reaches its threshold.
         *
         * If current capacity is MAXIMUM_CAPACITY, this method does not
         * resize the map, but sets threshold to Integer.MAX_VALUE.
         * This has the effect of preventing future calls.
         *
         * @param newCapacity the new capacity, MUST be a power of two;
         *        must be greater than current capacity unless current
         *        capacity is MAXIMUM_CAPACITY (in which case value
         *        is irrelevant).
         */
        void resize(int newCapacity) {
            Entry[] oldTable = table;
            int oldCapacity = oldTable.length;
            if (oldCapacity == MAXIMUM_CAPACITY) {
                threshold = Integer.MAX_VALUE;
                return;
            }
    
            Entry[] newTable = new Entry[newCapacity];
            transfer(newTable);
            table = newTable;
            threshold = (int)(newCapacity * loadFactor);
        }
    
        /**
         * Transfers all entries from current table to newTable.
         */
        void transfer(Entry[] newTable) {
            Entry[] src = table;
            int newCapacity = newTable.length;
            for (int j = 0; j < src.length; j++) {
                Entry<K,V> e = src[j];
                if (e != null) {
                    src[j] = null;
                    do {
                        Entry<K,V> next = e.next;
                        int i = indexFor(e.hash, newCapacity);
                        e.next = newTable[i];
                        newTable[i] = e;
                        e = next;
                    } while (e != null);
                }
            }
        }

      hashMap 的put操作后如果到达某个阀值,执行整个resize动作。非常耗性能,而且并发高的时候容易引起数据一致性问题。

      ConcurrentHashMap会把hash桶分为不同的segment,从而减少扩容的范围,减少性能损耗。为了保证并发安全性,使用了lock排它。依然是一个阻塞的,很损耗性能的动作。

            V put(K key, int hash, V value, boolean onlyIfAbsent) {
                lock();
                try {
                    int c = count;
                    if (c++ > threshold) // ensure capacity
                        rehash();
                    HashEntry<K,V>[] tab = table;
                    int index = hash & (tab.length - 1);
                    HashEntry<K,V> first = tab[index];
                    HashEntry<K,V> e = first;
                    while (e != null && (e.hash != hash || !key.equals(e.key)))
                        e = e.next;
    
                    V oldValue;
                    if (e != null) {
                        oldValue = e.value;
                        if (!onlyIfAbsent)
                            e.value = value;
                    }
                    else {
                        oldValue = null;
                        ++modCount;
                        tab[index] = new HashEntry<K,V>(key, hash, first, value);
                        count = c; // write-volatile
                    }
                    return oldValue;
                } finally {
                    unlock();
                }
            }
    
            void rehash() {
                HashEntry<K,V>[] oldTable = table;
                int oldCapacity = oldTable.length;
                if (oldCapacity >= MAXIMUM_CAPACITY)
                    return;
    
                /*
                 * Reclassify nodes in each list to new Map.  Because we are
                 * using power-of-two expansion, the elements from each bin
                 * must either stay at same index, or move with a power of two
                 * offset. We eliminate unnecessary node creation by catching
                 * cases where old nodes can be reused because their next
                 * fields won't change. Statistically, at the default
                 * threshold, only about one-sixth of them need cloning when
                 * a table doubles. The nodes they replace will be garbage
                 * collectable as soon as they are no longer referenced by any
                 * reader thread that may be in the midst of traversing table
                 * right now.
                 */
    
                HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
                threshold = (int)(newTable.length * loadFactor);
                int sizeMask = newTable.length - 1;
                for (int i = 0; i < oldCapacity ; i++) {
                    // We need to guarantee that any existing reads of old Map can
                    //  proceed. So we cannot yet null out each bin.
                    HashEntry<K,V> e = oldTable[i];
    
                    if (e != null) {
                        HashEntry<K,V> next = e.next;
                        int idx = e.hash & sizeMask;
    
                        //  Single node on list
                        if (next == null)
                            newTable[idx] = e;
    
                        else {
                            // Reuse trailing consecutive sequence at same slot
                            HashEntry<K,V> lastRun = e;
                            int lastIdx = idx;
                            for (HashEntry<K,V> last = next;
                                 last != null;
                                 last = last.next) {
                                int k = last.hash & sizeMask;
                                if (k != lastIdx) {
                                    lastIdx = k;
                                    lastRun = last;
                                }
                            }
                            newTable[lastIdx] = lastRun;
    
                            // Clone all remaining nodes
                            for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                                int k = p.hash & sizeMask;
                                HashEntry<K,V> n = newTable[k];
                                newTable[k] = new HashEntry<K,V>(p.key, p.hash,
                                                                 n, p.value);
                            }
                        }
                    }
                }
                table = newTable;
            }

    可见,静态扩容这是一个阻塞的,很耗性能的动作来的

    动态扩容

      对比静态扩容,动态扩容就是为了解决扩容的过程中会阻塞、影响性能的因素。先看看memcached的源码assoc_insert方法:

    int assoc_insert(item *it, const uint32_t hv) {
        unsigned int oldbucket;
    
    //    assert(assoc_find(ITEM_key(it), it->nkey) == 0);  /* shouldn't have duplicately named things defined */
    
        //正在扩容
        if (expanding &&
            (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
        {
            it->h_next = old_hashtable[oldbucket];
            old_hashtable[oldbucket] = it;
        } else {    //不需要扩容,item对象管理hash桶
            it->h_next = primary_hashtable[hv & hashmask(hashpower)];
            primary_hashtable[hv & hashmask(hashpower)] = it;
        }
    
        pthread_mutex_lock(&hash_items_counter_lock);
        hash_items++;
        //超过阀值,开始扩容
        if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
            assoc_start_expand();
        }
        pthread_mutex_unlock(&hash_items_counter_lock);
    
        MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
        return 1;
    }

      扩容是通过线程来扩容。memcached的扩容并不是阻塞的。 

      扩容的逻辑见:

    #define DEFAULT_HASH_BULK_MOVE 1
    int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
    
    static void *assoc_maintenance_thread(void *arg) {
    
        mutex_lock(&maintenance_lock);
        while (do_run_maintenance_thread) {
            int ii = 0;
    
            /* There is only one expansion thread, so no need to global lock. */
            for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
                item *it, *next;
                int bucket;
                void *item_lock = NULL;
    
                /* bucket = hv & hashmask(hashpower) =>the bucket of hash table
                 * is the lowest N bits of the hv, and the bucket of item_locks is
                 *  also the lowest M bits of hv, and N is greater than M.
                 *  So we can process expanding with only one item_lock. cool! */
                if ((item_lock = item_trylock(expand_bucket))) {
                        for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
                            next = it->h_next;
                            bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
                            it->h_next = primary_hashtable[bucket];
                            primary_hashtable[bucket] = it;
                        }
    
                        old_hashtable[expand_bucket] = NULL;
    
                        expand_bucket++;
                        if (expand_bucket == hashsize(hashpower - 1)) {
                            expanding = false;
                            free(old_hashtable);
                            STATS_LOCK();
                            stats.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
                            stats.hash_is_expanding = 0;
                            STATS_UNLOCK();
                            if (settings.verbose > 1)
                                fprintf(stderr, "Hash table expansion done
    ");
                        }
    
                } else {
                    usleep(10*1000);
                }
    
                if (item_lock) {
                    item_trylock_unlock(item_lock);
                    item_lock = NULL;
                }
            }
    
            if (!expanding) {
                /* We are done expanding.. just wait for next invocation */
                started_expanding = false;
                pthread_cond_wait(&maintenance_cond, &maintenance_lock);
                /* assoc_expand() swaps out the hash table entirely, so we need
                 * all threads to not hold any references related to the hash
                 * table while this happens.
                 * This is instead of a more complex, possibly slower algorithm to
                 * allow dynamic hash table expansion without causing significant
                 * wait times.
                 */
                pause_threads(PAUSE_ALL_THREADS);
                assoc_expand();
                pause_threads(RESUME_ALL_THREADS);
            }
        }
        return NULL;
    }

      其实就是线程实现的把旧hash桶的数据丢到新hash桶而已。从而避免了阻塞。

      memcached get 操作的时候,如果正在扩容,就从旧的hash桶中获取数据,否则从primary桶中获取数据。

  • 相关阅读:
    LeetCode Single Number
    Leetcode Populating Next Right Pointers in Each Node
    LeetCode Permutations
    Leetcode Sum Root to Leaf Numbers
    LeetCode Candy
    LeetCode Sort List
    LeetCode Remove Duplicates from Sorted List II
    LeetCode Remove Duplicates from Sorted List
    spring MVC HandlerInterceptorAdapter
    yum
  • 原文地址:https://www.cnblogs.com/ELMND/p/4631191.html
Copyright © 2011-2022 走看看