zoukankan      html  css  js  c++  java
  • memcached(十)动态扩容

    HASH的扩容

      一般来说,hash容器都会在初始化的时候创建一块空间用于存放对象,对象的数量到达临界值后扩容整个空间而不是一边增加/删除对象一边分配空间。本人根据是否阻塞,区分为静态扩容动态扩容

    静态扩容

      参考java.util.HashMap的实现。在put方法中,

        public V put(K key, V value) {
            if (key == null)
                return putForNullKey(value);
            int hash = hash(key.hashCode());
            int i = indexFor(hash, table.length);
            for (Entry<K,V> e = table[i]; e != null; e = e.next) {
                Object k;
                if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
                    V oldValue = e.value;
                    e.value = value;
                    e.recordAccess(this);
                    return oldValue;
                }
            }
    
            modCount++;
            addEntry(hash, key, value, i);
            return null;
        }

      addEntry:

        void addEntry(int hash, K key, V value, int bucketIndex) {
        Entry<K,V> e = table[bucketIndex];
            table[bucketIndex] = new Entry<K,V>(hash, key, value, e);
            if (size++ >= threshold)
                resize(2 * table.length);
        }

      具体的resize动作:

        /**
         * Rehashes the contents of this map into a new array with a
         * larger capacity.  This method is called automatically when the
         * number of keys in this map reaches its threshold.
         *
         * If current capacity is MAXIMUM_CAPACITY, this method does not
         * resize the map, but sets threshold to Integer.MAX_VALUE.
         * This has the effect of preventing future calls.
         *
         * @param newCapacity the new capacity, MUST be a power of two;
         *        must be greater than current capacity unless current
         *        capacity is MAXIMUM_CAPACITY (in which case value
         *        is irrelevant).
         */
        void resize(int newCapacity) {
            Entry[] oldTable = table;
            int oldCapacity = oldTable.length;
            if (oldCapacity == MAXIMUM_CAPACITY) {
                threshold = Integer.MAX_VALUE;
                return;
            }
    
            Entry[] newTable = new Entry[newCapacity];
            transfer(newTable);
            table = newTable;
            threshold = (int)(newCapacity * loadFactor);
        }
    
        /**
         * Transfers all entries from current table to newTable.
         */
        void transfer(Entry[] newTable) {
            Entry[] src = table;
            int newCapacity = newTable.length;
            for (int j = 0; j < src.length; j++) {
                Entry<K,V> e = src[j];
                if (e != null) {
                    src[j] = null;
                    do {
                        Entry<K,V> next = e.next;
                        int i = indexFor(e.hash, newCapacity);
                        e.next = newTable[i];
                        newTable[i] = e;
                        e = next;
                    } while (e != null);
                }
            }
        }

      hashMap 的put操作后如果到达某个阀值,执行整个resize动作。非常耗性能,而且并发高的时候容易引起数据一致性问题。

      ConcurrentHashMap会把hash桶分为不同的segment,从而减少扩容的范围,减少性能损耗。为了保证并发安全性,使用了lock排它。依然是一个阻塞的,很损耗性能的动作。

            V put(K key, int hash, V value, boolean onlyIfAbsent) {
                lock();
                try {
                    int c = count;
                    if (c++ > threshold) // ensure capacity
                        rehash();
                    HashEntry<K,V>[] tab = table;
                    int index = hash & (tab.length - 1);
                    HashEntry<K,V> first = tab[index];
                    HashEntry<K,V> e = first;
                    while (e != null && (e.hash != hash || !key.equals(e.key)))
                        e = e.next;
    
                    V oldValue;
                    if (e != null) {
                        oldValue = e.value;
                        if (!onlyIfAbsent)
                            e.value = value;
                    }
                    else {
                        oldValue = null;
                        ++modCount;
                        tab[index] = new HashEntry<K,V>(key, hash, first, value);
                        count = c; // write-volatile
                    }
                    return oldValue;
                } finally {
                    unlock();
                }
            }
    
            void rehash() {
                HashEntry<K,V>[] oldTable = table;
                int oldCapacity = oldTable.length;
                if (oldCapacity >= MAXIMUM_CAPACITY)
                    return;
    
                /*
                 * Reclassify nodes in each list to new Map.  Because we are
                 * using power-of-two expansion, the elements from each bin
                 * must either stay at same index, or move with a power of two
                 * offset. We eliminate unnecessary node creation by catching
                 * cases where old nodes can be reused because their next
                 * fields won't change. Statistically, at the default
                 * threshold, only about one-sixth of them need cloning when
                 * a table doubles. The nodes they replace will be garbage
                 * collectable as soon as they are no longer referenced by any
                 * reader thread that may be in the midst of traversing table
                 * right now.
                 */
    
                HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
                threshold = (int)(newTable.length * loadFactor);
                int sizeMask = newTable.length - 1;
                for (int i = 0; i < oldCapacity ; i++) {
                    // We need to guarantee that any existing reads of old Map can
                    //  proceed. So we cannot yet null out each bin.
                    HashEntry<K,V> e = oldTable[i];
    
                    if (e != null) {
                        HashEntry<K,V> next = e.next;
                        int idx = e.hash & sizeMask;
    
                        //  Single node on list
                        if (next == null)
                            newTable[idx] = e;
    
                        else {
                            // Reuse trailing consecutive sequence at same slot
                            HashEntry<K,V> lastRun = e;
                            int lastIdx = idx;
                            for (HashEntry<K,V> last = next;
                                 last != null;
                                 last = last.next) {
                                int k = last.hash & sizeMask;
                                if (k != lastIdx) {
                                    lastIdx = k;
                                    lastRun = last;
                                }
                            }
                            newTable[lastIdx] = lastRun;
    
                            // Clone all remaining nodes
                            for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                                int k = p.hash & sizeMask;
                                HashEntry<K,V> n = newTable[k];
                                newTable[k] = new HashEntry<K,V>(p.key, p.hash,
                                                                 n, p.value);
                            }
                        }
                    }
                }
                table = newTable;
            }

    可见,静态扩容这是一个阻塞的,很耗性能的动作来的

    动态扩容

      对比静态扩容,动态扩容就是为了解决扩容的过程中会阻塞、影响性能的因素。先看看memcached的源码assoc_insert方法:

    int assoc_insert(item *it, const uint32_t hv) {
        unsigned int oldbucket;
    
    //    assert(assoc_find(ITEM_key(it), it->nkey) == 0);  /* shouldn't have duplicately named things defined */
    
        //正在扩容
        if (expanding &&
            (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
        {
            it->h_next = old_hashtable[oldbucket];
            old_hashtable[oldbucket] = it;
        } else {    //不需要扩容,item对象管理hash桶
            it->h_next = primary_hashtable[hv & hashmask(hashpower)];
            primary_hashtable[hv & hashmask(hashpower)] = it;
        }
    
        pthread_mutex_lock(&hash_items_counter_lock);
        hash_items++;
        //超过阀值,开始扩容
        if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
            assoc_start_expand();
        }
        pthread_mutex_unlock(&hash_items_counter_lock);
    
        MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
        return 1;
    }

      扩容是通过线程来扩容。memcached的扩容并不是阻塞的。 

      扩容的逻辑见:

    #define DEFAULT_HASH_BULK_MOVE 1
    int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
    
    static void *assoc_maintenance_thread(void *arg) {
    
        mutex_lock(&maintenance_lock);
        while (do_run_maintenance_thread) {
            int ii = 0;
    
            /* There is only one expansion thread, so no need to global lock. */
            for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
                item *it, *next;
                int bucket;
                void *item_lock = NULL;
    
                /* bucket = hv & hashmask(hashpower) =>the bucket of hash table
                 * is the lowest N bits of the hv, and the bucket of item_locks is
                 *  also the lowest M bits of hv, and N is greater than M.
                 *  So we can process expanding with only one item_lock. cool! */
                if ((item_lock = item_trylock(expand_bucket))) {
                        for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
                            next = it->h_next;
                            bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
                            it->h_next = primary_hashtable[bucket];
                            primary_hashtable[bucket] = it;
                        }
    
                        old_hashtable[expand_bucket] = NULL;
    
                        expand_bucket++;
                        if (expand_bucket == hashsize(hashpower - 1)) {
                            expanding = false;
                            free(old_hashtable);
                            STATS_LOCK();
                            stats.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
                            stats.hash_is_expanding = 0;
                            STATS_UNLOCK();
                            if (settings.verbose > 1)
                                fprintf(stderr, "Hash table expansion done
    ");
                        }
    
                } else {
                    usleep(10*1000);
                }
    
                if (item_lock) {
                    item_trylock_unlock(item_lock);
                    item_lock = NULL;
                }
            }
    
            if (!expanding) {
                /* We are done expanding.. just wait for next invocation */
                started_expanding = false;
                pthread_cond_wait(&maintenance_cond, &maintenance_lock);
                /* assoc_expand() swaps out the hash table entirely, so we need
                 * all threads to not hold any references related to the hash
                 * table while this happens.
                 * This is instead of a more complex, possibly slower algorithm to
                 * allow dynamic hash table expansion without causing significant
                 * wait times.
                 */
                pause_threads(PAUSE_ALL_THREADS);
                assoc_expand();
                pause_threads(RESUME_ALL_THREADS);
            }
        }
        return NULL;
    }

      其实就是线程实现的把旧hash桶的数据丢到新hash桶而已。从而避免了阻塞。

      memcached get 操作的时候,如果正在扩容,就从旧的hash桶中获取数据,否则从primary桶中获取数据。

  • 相关阅读:
    nextSibling VS nextElementSibling
    线程实现连续启动停,并在某一时间段内运行
    线程:安全终止与重启
    监控知识体系
    后台服务变慢解决方案
    Java泛型类型擦除以及类型擦除带来的问题
    常见的 CSRF、XSS、sql注入、DDOS流量攻击
    Spring对象类型——单例和多例
    一次线上OOM过程的排查
    深入浅出理解基于 Kafka 和 ZooKeeper 的分布式消息队列
  • 原文地址:https://www.cnblogs.com/ELMND/p/4631191.html
Copyright © 2011-2022 走看看