zoukankan      html  css  js  c++  java
  • Redis学习--渐进式rehash实现原理

    哈希冲突问题

    Redis使用哈希表来存放键值对数据,在插入新键值对数据时,会先按照”key“来计算哈希值,再根据哈希值和哈希表的sizemask来计算出该”key“在对于哈希数组中的索引值,然后将键值对数据封装成dictEntry对象并放入到索引值对应的哈希数组中。

    不同的Key经过相同哈希函数计算后可能得到相同的哈希值和索引值,当两个或多个Key被分配到哈希数组相同的索引位置时,成为哈希冲突,Redis使用单向链表的方式来解决哈希冲突。最后增加的键值对数据会被放置在单向链表的头部位置。

    /* Add an element to the target hash table */
    int dictAdd(dict *d, void *key, void *val)
    {
        /* 使用dictAddRaw来创建一个新的entry */
        dictEntry *entry = dictAddRaw(d,key,NULL);
        if (!entry) return DICT_ERR;
        /* 设置新entry的value数据 */
        dictSetVal(d, entry, val);
        return DICT_OK;
    }
    
    /* Low level add or find:
     * This function adds the entry but instead of setting a value returns the
     * dictEntry structure to the user, that will make sure to fill the value
     * field as he wishes.
     *
     * This function is also directly exposed to the user API to be called
     * mainly in order to store non-pointers inside the hash value, example:
     *
     * entry = dictAddRaw(dict,mykey,NULL);
     * if (entry != NULL) dictSetSignedIntegerVal(entry,1000);
     *
     * Return values:
     *
     * If key already exists NULL is returned, and "*existing" is populated
     * with the existing entry if existing is not NULL.
     *
     * If key was added, the hash entry is returned to be manipulated by the caller.
     */
    dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
    {
        long index;
        dictEntry *entry;
        dictht *ht;
    
        if (dictIsRehashing(d)) _dictRehashStep(d);
    
        /* Get the index of the new element, or -1 if
         * the element already exists. */
        /* 获取到新增key在哈希数组中的索引位置 */
        if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
            return NULL;
    
        /* Allocate the memory and store the new entry.
         * Insert the element in top, with the assumption that in a database
         * system it is more likely that recently added entries are accessed
         * more frequently. */
        /* 找到当前使用的哈希表,
         * 如果处于rehash状态则新增的键值对数据放入到ht[1]中
         * 否则将新增的键值对数据放入到ht[0]中
         */
        ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
        /* 创建新的entry对象 */
        entry = zmalloc(sizeof(*entry));
        /* 将当前位置上数据赋值给新entry的next指针 */
        entry->next = ht->table[index];
        /* 将心得entry对象赋值给当前位置,即将新enrty放到单向链表的头部 */
        ht->table[index] = entry;
        /* 增加哈希表的已使用值 */
        ht->used++;
    
        /* Set the hash entry fields. */
        /* 设置新entry的key数据 */
        dictSetKey(d, entry, key);
        return entry;
    }
    

    在按照key进行哈希查找时,会先按照key来计算出对应的哈希值和索引值,再按照索引值找到哈希数组对应位置上的单向链表,再依次遍历单向链表中的每个entry对象并使用查找的key和entry的key值进行对比,从而找到特定key的entry对象。

    dictEntry *dictFind(dict *d, const void *key)
    {
        dictEntry *he;
        uint64_t h, idx, table;
    	
        if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
        /* 如果当前字典处于rehash状态,则辅助完成1条数据迁移工作 */
        if (dictIsRehashing(d)) _dictRehashStep(d);
        /* 计算key对应的hash值 */
        h = dictHashKey(d, key);
        for (table = 0; table <= 1; table++) {
            /* 计算key对应的索引值 */
            /* 在rehash过程中,ht[0]和ht[1]的sizemask不同,因此需要分别计算 */
            idx = h & d->ht[table].sizemask;
            /* 获取到哈希数组上指定索引位置上的单向链表 */
            he = d->ht[table].table[idx];
            
            /* 遍历单向链表 */
            while(he) {
                /* 针对单向链表上每个entry,对比entry存储的key和要查找的key */
                if (key==he->key || dictCompareKeys(d, key, he->key))
                    /* 找到后立即返回,同一个key只会存在一份数据 */
                    return he;
                he = he->next;
            }
            /* 如果未处于rehas状态,处理完ht[0]后无需再处理ht[1] */
            if (!dictIsRehashing(d)) return NULL;
        }
        return NULL;
    }
    

    解决哈希冲突

    在每个哈希表对象中,使用size字段来存放该哈希表的长度,使用used字段来存放哈希表的已使用量:

    /* This is our hash table structure. Every dictionary has two of this as we
     * implement incremental rehashing, for the old to the new table. */
    typedef struct dictht {
        /* 使用数组方式来存放哈希表数据 */
        dictEntry **table;
        /* 哈希表长度 */
        unsigned long size;
        /* 哈希表长度掩码,用来计算哈希值对应的数组下标 */
        unsigned long sizemask;
        /* 哈希表上已使用量,即当前存储的entry总数量 */
        unsigned long used;
    } dictht;
    

    used * 100 / size < HASHTABLE_MIN_FILL(10)时,哈希数组资源浪费严重,需要对哈希表进行缩容,将哈希数表长度扩展为第一个大于等于used*2的2的n次幂。

    used / size > dict_force_resize_ratio(5) 时,哈希数组上哈希冲突严重,需要对哈希表进行扩容,将哈希数表长度缩小为第一个大于等于used的2的n次幂。

    哈希扩容操作

    在Redis执行命令请求增加新键时,会调用函数dictAddRaw或dictAdd-->dictAddRaw来进行插入,在函数dictAddRaw中会使用_dictKeyIndex --> _dictExpandIfNeeded 来判断当前哈希表是否需要扩容。

    static int dict_can_resize = 1;
    static unsigned int dict_force_resize_ratio = 5;
    
    /* Expand the hash table if needed */
    static int _dictExpandIfNeeded(dict *d)
    {
        /* Incremental rehashing already in progress. Return. */
        if (dictIsRehashing(d)) return DICT_OK;
    
        /* If the hash table is empty expand it to the initial size. */
        if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
    
        /* If we reached the 1:1 ratio, and we are allowed to resize the hash
         * table (global setting) or we should avoid it but the ratio between
         * elements/buckets is over the "safe" threshold, we resize doubling
         * the number of buckets. */
        if (d->ht[0].used >= d->ht[0].size &&
            (dict_can_resize ||
             d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
        {
            return dictExpand(d, d->ht[0].used*2);
        }
        return DICT_OK;
    }
    

    哈希表的扩容操作主要由Redis插入新键值对数据的命令来触发,如:

    • set类型的增加新键值对时调用setTypeAdd-->dictAddRaw来触发。
    • zset类型的增加新键值对时调用zunionInterGenericCommand-->dictAddRaw来触发。
    • string类型的增加新键值对时调用setGenericCommand-->setKey-->dbAdd-->dictAdd-->dictAddRaw来触发。

    哈希缩容操作

    在判断哈希表是否需要缩容时,通过函数tryResizeHashTables来:

    • 调用htNeedsResize判断当前dict是否需要缩容
    • 对需要缩容的dict调用dictResize来进行缩容,按照当前哈希表的已使用量(used)作为缩容标准。
    /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
     * we resize the hash table to save memory */
    void tryResizeHashTables(int dbid) {
        if (htNeedsResize(server.db[dbid].dict))
            dictResize(server.db[dbid].dict);
        if (htNeedsResize(server.db[dbid].expires))
            dictResize(server.db[dbid].expires);
    }
    
    /* This is the initial size of every hash table */
    #define DICT_HT_INITIAL_SIZE     4
    /* Hash table parameters */
    /* Minimal hash table fill 10% */
    #define HASHTABLE_MIN_FILL        10      
    int htNeedsResize(dict *dict) {
        long long size, used;
    
        size = dictSlots(dict);
        used = dictSize(dict);
        /* 当前使用量小于当前长度的10%时,进行缩容 */
        return (size > DICT_HT_INITIAL_SIZE &&
                (used*100/size < HASHTABLE_MIN_FILL));
    }
    
    
    /* Resize the table to the minimal size that contains all the elements,
     * but with the invariant of a USED/BUCKETS ratio near to <= 1 */
    int dictResize(dict *d)
    {
        int minimal;
    
        if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
        /* 按照当前使用量进行缩容 */
        minimal = d->ht[0].used;
        if (minimal < DICT_HT_INITIAL_SIZE)
            minimal = DICT_HT_INITIAL_SIZE;
        return dictExpand(d, minimal);
    }
    

    Redis按照redisServer.hz参数控制的频率执行定时任务执行过程中,会调用serverCron-->databasesCron函数,在databasesCron函数执行过程中,如果当前未执行AOF重写或RDB备份操作,则:

    • 判断当前实例上所有redisDB是否需要缩容,需要则调用dictResize进行缩容操作。
    • 判断当前实例上是否存在rehash操作,存在则调用incrementallyRehash函数进行增量处理。
    /* This function handles 'background' operations we are required to do
     * incrementally in Redis databases, such as active key expiring, resizing,
     * rehashing. */
    void databasesCron(void) {
        /* Expire keys by random sampling. Not required for slaves
         * as master will synthesize DELs for us. */
        if (server.active_expire_enabled) {
            if (server.masterhost == NULL) {
                activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
            } else {
                expireSlaveKeys();
            }
        }
    
        /* Defrag keys gradually. */
        if (server.active_defrag_enabled)
            activeDefragCycle();
    
        /* Perform hash tables rehashing if needed, but only if there are no
         * other processes saving the DB on disk. Otherwise rehashing is bad
         * as will cause a lot of copy-on-write of memory pages. */
        /* 在没有RDB备份或AOF日志重写时,才会触发rehash操作 */
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
            /* We use global counters so if we stop the computation at a given
             * DB we'll be able to start from the successive in the next
             * cron loop iteration. */
            static unsigned int resize_db = 0;
            static unsigned int rehash_db = 0;
            int dbs_per_call = CRON_DBS_PER_CALL;
            int j;
    
            /* Don't test more DBs than we have. */
            if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
    
            /* Resize */
            for (j = 0; j < dbs_per_call; j++) {
                tryResizeHashTables(resize_db % server.dbnum);
                resize_db++;
            }
    
            /* Rehash */
            if (server.activerehashing) {
                for (j = 0; j < dbs_per_call; j++) {
                    int work_done = incrementallyRehash(rehash_db);
                    if (work_done) {
                        /* If the function did some work, stop here, we'll do
                         * more at the next cron loop. */
                        break;
                    } else {
                        /* If this db didn't need rehash, we'll try the next one. */
                        rehash_db++;
                        rehash_db %= server.dbnum;
                    }
                }
            }
        }
    }
    

    渐进式rehash操作

    无论时缩容还是扩容,都会调用dictExpand函数来处理,按照新的hash表容量(unsigned long size)计算出一个接近且大于size的2^N的值,并以此值来初始化dictht对象,再赋值给d->ht[1],同时设置rehash状态值d->rehashidx =0。

    /* Expand or create the hash table */
    int dictExpand(dict *d, unsigned long size)
    {
        /* the size is invalid if it is smaller than the number of
         * elements already inside the hash table */
        if (dictIsRehashing(d) || d->ht[0].used > size)
            return DICT_ERR;
    
        dictht n; /* the new hash table */
        unsigned long realsize = _dictNextPower(size);
    
        /* Rehashing to the same table size is not useful. */
        if (realsize == d->ht[0].size) return DICT_ERR;
    
        /* Allocate the new hash table and initialize all pointers to NULL */
        n.size = realsize;
        n.sizemask = realsize-1;
        n.table = zcalloc(realsize*sizeof(dictEntry*));
        n.used = 0;
    
        /* Is this the first initialization? If so it's not really a rehashing
         * we just set the first hash table so that it can accept keys. */
        if (d->ht[0].table == NULL) {
            d->ht[0] = n;
            return DICT_OK;
        }
    
        /* Prepare a second hash table for incremental rehashing */
        d->ht[1] = n;
        d->rehashidx = 0;
        return DICT_OK;
    }
    
    /* Our hash table capability is a power of two */
    /* 哈希表的长度必须是2的N次方,因此必须选择一个接近且大于指定长度的2^N值 */
    static unsigned long _dictNextPower(unsigned long size)
    {
        unsigned long i = DICT_HT_INITIAL_SIZE;
    
        if (size >= LONG_MAX) return LONG_MAX + 1LU;
        while(1) {
            if (i >= size)
                return i;
            i *= 2;
        }
    }
    

    函数dictExpand仅仅是初始化新哈希表ht[1]并设置dict的状态为rehash状态(rehashidx = 0),为不严重阻塞正常命令请求,采用渐进式哈希(rehashing)的机制来完成数据从ht[0]到ht[1]的迁移。rehash操作以哈希桶为基本单位调用dictRehash函数来完成。

    rehash操作按照触发方式可以分为:

    • active rehashing(主动rehash操作),Redis实例的定时任务通过serverCron-->databasesCron-->incrementallyRehash来触发,按照每次处理100个哈希桶+循环执行1秒的方式来处理。
    • lazy rehashing(惰性rehah操作),在对dict做增删改查的过程中,如dictFind、dictAdd、dictDelete等操作时触发,按照每次处理1个哈希桶的方式进行处理。

    PS1: 虽然databasesCron中采用for循环遍历dbs_per_call个RedisDB,但只有在incrementallyRehash函数中成功调用1次dictRehashMilliseconds(dict,1),就会跳出循环,因此能严格保证databasesCron函数中rehash操作最多消耗1ms

    主动Rehash操作

    在函数incrementallyRehash中,当哈希表处于rehash状态(rehashidx不等于-1)时,会调用dictRehashMilliseconds来进行增量处理,按照每批次出100个键的方式循环执行1ms(硬编码在程序中),避免rehash操作执行时间过长阻塞其他客户请求。

    /* Our hash table implementation performs rehashing incrementally while
     * we write/read from the hash table. Still if the server is idle, the hash
     * table will use two tables for a long time. So we try to use 1 millisecond
     * of CPU time at every call of this function to perform some rehahsing.
     *
     * The function returns 1 if some rehashing was performed, otherwise 0
     * is returned. */
    int incrementallyRehash(int dbid) {
        /* Keys dictionary */
        if (dictIsRehashing(server.db[dbid].dict)) {
            dictRehashMilliseconds(server.db[dbid].dict,1);
            return 1; /* already used our millisecond for this loop... */
        }
        /* Expires */
        if (dictIsRehashing(server.db[dbid].expires)) {
            dictRehashMilliseconds(server.db[dbid].expires,1);
            return 1; /* already used our millisecond for this loop... */
        }
        return 0;
    }
    
    /* 按照rehashidx是否不等于-1来判断当前dict对象是否处于rehas状态 */
    #define dictIsRehashing(d) ((d)->rehashidx != -1)
    
    /* Rehash for an amount of time between ms milliseconds and ms+1 milliseconds */
    int dictRehashMilliseconds(dict *d, int ms) {
        long long start = timeInMilliseconds();
        int rehashes = 0;
    
        while(dictRehash(d,100)) {
            rehashes += 100;
            if (timeInMilliseconds()-start > ms) break;
        }
        return rehashes;
    }
    

    惰性rehah操作

    在dictFind等对dict的增删改查的操作过程中,如果当前dict处于rehash状态,则调用_dictRehashStep(d)来迁移1个哈希桶的数据。

    dictEntry *dictFind(dict *d, const void *key)
    {
        if (dictIsRehashing(d)) _dictRehashStep(d);
    }
    
    /* This function performs just a step of rehashing, and only if there are
     * no safe iterators bound to our hash table. When we have iterators in the
     * middle of a rehashing we can't mess with the two hash tables otherwise
     * some element can be missed or duplicated.
     *
     * This function is called by common lookup or update operations in the
     * dictionary so that the hash table automatically migrates from H1 to H2
     * while it is actively used. */
    static void _dictRehashStep(dict *d) {
        if (d->iterators == 0) dictRehash(d,1);
    }
    
    

    由于惰性rehash的操作仅迁移1个哈希桶的数据,因此不会对操作产生明显的性能影响。

    Rehash操作阻塞问题

    无论时主动rehash还是惰性rehash操作,都会严格控制rehash操作的耗时,不会阻塞正常命令请求,但:

    • 在rehash的初始阶段,需要按照zcalloc(realsize*sizeof(dictEntry*))来为新的哈希表ht[1]申请内存。
    • 在rehash的结尾阶段,需要对老的哈希表ht[0]进行回收并释放内存。

    当涉及到的哈希表size较大时,需要申请或释放较大的内存资源,造成Redis服务器的内存使用量明显变化,同时影响到客户端命令请求的执行时间。如果请求执行过程中触发,则可能导致简单命令耗时较长被记录到慢日志中。

    参考资料

    Redis的内部扩容机制

    Redis关键点(rehash)

    浅谈Redis中的Rehash机制

  • 相关阅读:
    SpringCloud教程第10篇:高可用的服务注册中心(F版本)
    SpringCloud教程第9篇:Sleuth(F版本)
    requests.session保持会话
    Jmeter Constant Throughput Timer 使用
    Jmeter提取响应数据的结果保存到本地的一个文件
    练习2
    练习1
    一道简单的练习题
    Maven下org.junit.Test无法使用
    [转]解决pycharm无法导入本地包的问题(Unresolved reference 'tutorial')
  • 原文地址:https://www.cnblogs.com/gaogao67/p/15126535.html
Copyright © 2011-2022 走看看