zoukankan      html  css  js  c++  java
  • LRU工程实现源码(一):Redis 内存淘汰策略

    源码版本 Redis 6.0.0

    内存淘汰是什么?什么时候内存淘汰

    我们知道,当某个key被设置了过期时间之后,客户端每次对该key的访问(读写)都会事先检测该key是否过期,如果过期就直接删除;但有一些键只访问一次,因此需要主动删除,默认情况下redis每秒检测10次,检测的对象是所有设置了过期时间的键集合,每次从这个集合中随机检测20个键查看他们是否过期,如果过期就直接删除。

    内存淘汰虽然也是删除key,但是与我们设置的过期key删除不同。

    内存淘汰是说,如果redis已使用内存达到预设的limit值(你可以当做内存满了),如果想要继续存储新的key,那么必然要淘汰一些key来释放内存空间。具体淘汰哪些key由不同的淘汰策来决定。

    由于需要淘汰掉key,所以此时redis不适合用作key-value数据库了,开启了内存淘汰的redis一般用于缓存中,因此很多人也把他叫缓存淘汰和缓存淘汰策略。

    又因为内存淘汰的触发时机是内存使用达到阈值,为了存储新的key然后淘汰旧的key,因此也有人把内存淘汰策略叫做缓存替换策略

    内存淘汰策略

    Redis 4.0 之前一共实现了 6 种内存淘汰策略,在 4.0 之后,又增加了 2 种策略。

    我们可以按照是否会进行数据淘汰把它们分成两类:

    • 不进行数据淘汰的策略,只有 noeviction 这一种。
    • 会进行淘汰的 7 种其他策略。

    会进行淘汰的 7 种策略,我们可以再进一步根据淘汰候选数据集的范围把它们分成两类:

    • 在设置了过期时间的数据中进行淘汰,包括 volatile-random、volatile-ttl、volatile-lru、volatile-lfu(Redis 4.0 后新增)四种。
    • 在所有数据范围内进行淘汰,包括 allkeys-lru、allkeys-random、allkeys-lfu(Redis 4.0 后新增)三种。我把这 8 种策略的分类,画到了一张图里:

    这8种策略的简单描述都可以在redis.conf文件中找到

    # MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
    # is reached. You can select one from the following behaviors:
    #
    # volatile-lru -> Evict using approximated LRU, only keys with an expire set.
    # allkeys-lru -> Evict any key using approximated LRU.
    # volatile-lfu -> Evict using approximated LFU, only keys with an expire set.
    # allkeys-lfu -> Evict any key using approximated LFU.
    # volatile-random -> Remove a random key having an expire set.
    # allkeys-random -> Remove a random key, any key.
    # volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
    # noeviction -> Don't evict anything, just return an error on write operations.
    

    翻译下:

    # MAXMEMORY POLICY:当达到最大内存时,Redis 将如何选择要删除的内容
    # 到达了。 您可以从以下行为中选择一种:
    #
    # volatile-lru -> Evict 使用近似 LRU,只有设置了过期时间的键。
    # allkeys-lru -> 使用近似 LRU 驱逐所有键。
    # volatile-lfu -> 使用近似 LFU 驱逐,只有设置了过期时间的键。
    # allkeys-lfu -> 使用近似 LFU 驱逐所有键。
    # volatile-random -> 随机删除设置了过期时间的key。
    # allkeys-random -> 随机删除一个键,任何键。
    # volatile-ttl -> 删除过期时间最近的key(次TTL)
    # noeviction -> 不要驱逐任何东西,只是在写操作时返回一个错误。
    

    通过redis.conf文件中的内容我们可以知道默认策略是noeviction

    # The default is:
    #
    # maxmemory-policy noeviction
    

    Redis中的LRU淘汰算法

    限于篇幅,本文主要以LRU淘汰算法展开讲解,其他算法流程大同小异,可在我的redis源码中文注释项目中查看

    不了解LRU算法基础的可以先去百度了解下,很简单

    这里就不讲LRU算法的基本知识了,我们来看Redis的LRU算法的原理是怎么样的。

    其实,LRU 算法背后的想法非常朴素:它认为刚刚被访问的数据,肯定还会被再次访问,所以就把它放在 MRU 端;长久不访问的数据,肯定就不会再被访问了,所以就让它逐渐后移到 LRU 端,在缓存满时,就优先删除它。

    不过,LRU 算法在实际实现时,需要用链表管理所有的缓存数据,这会带来额外的空间开销。而且,当有数据被访问时,需要在链表上把该数据移动到 MRU 端,如果有大量数据被访问,就会带来很多链表移动操作,会很耗时,进而会降低 Redis 缓存性能。

    所以,在 Redis 中,LRU 算法被做了简化,以减轻数据淘汰对缓存性能的影响。具体来说,Redis 默认会记录每个数据的最近一次访问的时间戳(由键值对数据结构 RedisObject 中的 lru 字段记录)。然后,Redis 在决定淘汰的数据时,第一次会随机选出 N 个数据,把它们作为一个候选集合。接下来,Redis 会比较这 N 个数据的 lru 字段,把 lru 字段值最小的数据从缓存中淘汰出去。

    Redis 提供了一个配置参数 maxmemory-samples,这个参数就是 Redis 选出的数据个数 N。例如,我们执行如下命令,可以让 Redis 选出 100 个数据作为候选数据集:

    CONFIG SET maxmemory-samples 100
    

    redis.conf文件中默认配置是5

    
    # The default of 5 produces good enough results. 10 Approximates very closely
    # true LRU but costs more CPU. 3 is faster but not very accurate.
    #
    # maxmemory-samples 5
    

    当需要再次淘汰数据时,Redis 需要挑选数据进入第一次淘汰时创建的候选集合。这儿的挑选标准是:能进入候选集合的数据的 lru 字段值必须小于候选集合中最小的 lru 值。当有新数据进入候选数据集后,如果候选数据集中的数据个数达到了 maxmemory-samples,Redis 就把候选数据集中 lru 字段值最小的数据淘汰出去。

    这样一来,Redis 缓存不用为所有的数据维护一个大链表,也不用在每次数据访问时都移动链表项,提升了缓存的性能。

    源码剖析

    源码中文注释可以在我的 github 上 的 Redis 6.0 中文注释项目里看到,欢迎访问

    第一步:什么时候开始淘汰key

    配置读取
    struct redisServer {
        // 可使用的最大内存字节数
        unsigned long long maxmemory;   /* Max number of memory bytes to use */
        // key 淘汰策略
        int maxmemory_policy;           /* Policy for key eviction */
        // 随机抽样精度
        int maxmemory_samples;          /* Precision of random sampling */
    }
    
    检查时机

    执行内存淘汰的方法是freeMemoryIfNeeded,实现在evict.c中.

    注意redis 3.0版本中该方法在redis.c

    这个方法主要的作用是检查内存状态,如果内存达到阈值就进行内存淘汰,释放内存空间直至到阈值以下。

    打开redis源码可以很轻易搜索到该方法是被freeMemoryIfNeededAndSafe调用的

    
    /* 
     * 这是 freeMemoryIfNeeded() 的包装器,只有在现在有条件安全地执行时才真正调用该函数:
     * - 超时条件下不能有脚本。
     * - 现在没有加载数据。
     */
    int freeMemoryIfNeededAndSafe(void) {
        if (server.lua_timedout || server.loading) return C_OK;
        return freeMemoryIfNeeded();
    }
    

    那么这个方法又是被谁调用的呢,有两个地方

    • updateMaxmemory
    • processCommand

    第一个看名字是在更新Maxmemory时调用的,本文不讲,我们主要来看第二个方法

    在处理命令的时候,会调用server.c中的processCommand方法

    /* If this function gets called we already read a whole
     * command, arguments are in the client argv/argc fields.
     * 
     * 如果这个函数被调用,我们已经读取了整个命令,参数在客户端 argv/argc 字段中。
     * 
     * processCommand() execute the command or prepare the
     * server for a bulk read from the client.
     *  
     * processCommand() 执行命令或准备服务器以从客户端进行批量读取。
     */
    int processCommand(client *c) {
        // 省略...
        // 如果配置了 server.maxmemory 并且 lua脚本没有在超时条件下运行
        if (server.maxmemory && !server.lua_timedout) {
            int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
            // 省略...
        }
        // 省略...
    }
    
    getMaxmemoryState

    紧接着会在freeMemoryIfNeeded方法中检查内存状态来判断是否需要内存淘汰

    /* 
     * 根据当前的“maxmemory”设置,定期调用此函数以查看是否有可用内存。 如果我们超过内存限制,该函数将尝试释放一些内存以返回低于限制。
     *
     * 如果我们低于内存限制或超过限制但尝试释放内存成功,该函数将返回 C_OK。
     * 否则,如果我们超过了内存限制,但没有足够的内存被释放以返回低于限制,则该函数返回 C_ERR。
     */
    int freeMemoryIfNeeded(void) {
        // 默认情况下,从节点应该忽略maxmemory指令,仅仅做从节点该做的事情就好
        if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
        // 当客户端暂停时,数据集应该是静态的,不仅来自客户端的 POV 无法写入,还有来自POV过期和驱逐key也无法执行。
        if (clientsArePaused()) return C_OK;
        // 检查内存状态,有没有超出限制,如果有,会计算需要释放的内存和已使用内存
        if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
            return C_OK;
    }
    

    getMaxmemoryState 方法也在evict.c中:

    /* 从maxmemory指令的角度获取内存状态:
     * 如果使用的内存低于 maxmemory 设置,则返回 C_OK。
     * 否则,如果超过内存限制,函数返回
     * C_ERR。
     *
     * 该函数可能会通过引用返回附加信息,仅当
     * 指向相应参数的指针不为 NULL。某些字段是
     * 仅在返回 C_ERR 时填充:
     *
     * 'total' 使用的总字节数。
     *(为 C_ERR 和 C_OK 填充)
     *
     * 'logical' 使用的内存量减去从/AOF缓冲区。
     *(返回 C_ERR 时填充)
     *
     * 'tofree' 为了回到内存限制应该释放的内存量
     * 
     *(返回 C_ERR 时填充)
     *
     * 'level' 这通常范围从 0 到 1,并报告数量
     * 当前使用的内存。如果我们超过了内存限制可能会 > 1。
     *(为 C_ERR 和 C_OK 填充)
     */
    int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level)
    

    通过以上源码,我们能够知道什么时候会内存淘汰。

    下一步我们来看看会淘汰哪些key

    第二步:淘汰哪些key

    freeMemoryIfNeeded

    文章正文中我们提到过内存淘汰策略,但是第一步并没有涉及到具体的策略。那么不同的策略是在哪里实现的呢?这是在freeMemoryIfNeeded方法中判断的,顺便这里先带你了解下freeMemoryIfNeeded方法的骨架

    
    int freeMemoryIfNeeded(void) {
        
        /*
         * 
         * mem_reported 已使用内存
         * mem_tofree 需要释放的内存
         * mem_freed 已释放内存
         */
        size_t mem_reported, mem_tofree, mem_freed;
        
        // 检查内存状态,有没有超出限制,如果有,会计算需要释放的内存和已使用内存
        // 这个方法不但会计算内存,还会赋值 mem_reported mem_freed
        if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
            return C_OK;
        // 初始化已释放内存的字节数为 0
        mem_freed = 0;
        
        // 不进行内存淘汰
        if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
            goto cant_free; /* We need to free memory, but policy forbids. */
            
        // 根据 maxmemory 策略,
        // 遍历字典,释放内存并记录被释放内存的字节数
        while (mem_freed < mem_tofree) {
            // 最佳淘汰key
            sds bestkey = NULL;
            // LRU策略或者LFU策略或者VOLATILE_TTL策略
            if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
                server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
            {
                // 不同的策略找bestKey
            }
            /* volatile-random and allkeys-random policy */
            else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                     server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
            {
                // 不同的策略找bestKey
            }
            
            // 最后选定的要删除的key
            if (bestkey) {
                // 在这里删除key
            }
        }
        
    }
    

    上面的源码省略了很多东西,但我相信你已经知道这个方法在做什么了,也应该知道不同的策略其实就是在找不同的key。我们重点分析LRU策略下的bestKey选择。

    redis索引

    这个时候要去redis数据库拿东西了,查找的话还是需要索引的,因此我先向你简单罗列下Redis索引相关的数据结构,很简单,不做文字表述了

    server.h
    
    // Redis 数据库。 有多个数据库由从 0(默认数据库)到最大配置数据库的整数标识。 数据库编号是结构中的“id”字段。
    typedef struct redisDb {
        /* 数据库键空间,保存着数据库中的所有键值对 */
        dict *dict;               /* The keyspace for this DB */
        // 设置超时时间的key集合
        dict *expires;              /* Timeout of keys with a timeout set */
        // 客户端等待数据的key (BLPOP)
        dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
        // 收到 PUSH 被阻塞的key
        dict *ready_keys;           /* Blocked keys that received a PUSH */
        dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
        int id;                     /* Database ID */
        // 数据库的键的平均 TTL ,统计信息
        long long avg_ttl;          /* Average TTL, just for stats */
        // 
        unsigned long expires_cursor; /* Cursor of the active expire cycle. */
        // 尝试逐一进行碎片整理的键名列表。
        list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */
    } redisDb;
    
    

    注意 redisDb 有两个重要的dict,*dict是主字典,存储所有的键集合, *expires存储设置了超时时间的集合,后面根据不同的淘汰策略就可以从不同的数据集拿数据

    dict.h
    
    /*
     * 字典
     */
    typedef struct dict {
        // 类型特定函数
        dictType *type;
        // 私有数据
        void *privdata;
        // 哈希表
        dictht ht[2];
        // rehash 索引
        // 当 rehash 不在进行时,值为 -1
        long rehashidx; /* rehashing not in progress if rehashidx == -1 */
        // 当前运行的迭代器数量
        unsigned long iterators; /* number of iterators currently running */
    } dict;
    

    每个dict又存储了2个dictht,为什么是两个呢?用来实现渐进式 rehash

    /* 哈希表
     * 每个字典都使用两个哈希表,从而实现渐进式 rehash 。
     */
    typedef struct dictht {
        // 哈希表数组
        dictEntry **table;
        // 哈希表大小
        unsigned long size;
        // 哈希表大小掩码,用于计算索引值
        // 总是等于 size - 1
        unsigned long sizemask;
        // 该哈希表已有节点的数量
        unsigned long used;
    } dictht;
    

    到了哈希表了,这个是真正存储元素的地方了,再来看哈希表元素长什么样

    /*
     * 哈希表节点
     */
    typedef struct dictEntry {
        // 键
        void *key;
        // 值
        union {
            void *val;
            uint64_t u64;
            int64_t s64;
            double d;
        } v;
        // 指向下个哈希表节点,形成链表
        struct dictEntry *next;
    } dictEntry;
    

    以上就是redis索引相关的数据结构,我们可以看到redis索引是通过哈希表来实现的。上面说的是索引的存储,那么我们放入redis的value存储在哪呢?

    server.h
    
    typedef struct redisObject {
        // 类型
        unsigned type:4;
    
        // 编码
        unsigned encoding:4;
    
        // 对象最后一次被访问的时间
        unsigned lru:LRU_BITS;
    
        // 引用计数
        int refcount;
    
        // 指向实际值的指针
        void *ptr;
    } robj;
    

    这是我们的值对象数据结构定义,type就是具体的值的数据类型了,lru是对象最后一次被访问的时间,由于只有24位,无法记录完整的时间,因此只记录了unix time的低24位,24 bits数据要溢出的话需要194天,而缓存的数据更新非常频繁,已经足够了。详细的关于lru精度的问题可以查看源码。到这里你应该对redis的结构有了基本了解。下面在来看如何挑选bestkey更简单一些。

    淘汰池
    // 最佳淘汰key
    sds bestkey = NULL;
    // key所属db
    int bestdbid;
    // Redis数据库
    redisDb *db;
    // 字典
    dict *dict;
    // 哈希表节点
    dictEntry *de;
    // LRU策略或者LFU策略或者VOLATILE_TTL策略
    if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
        server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
    {
        // 淘汰池
        struct evictionPoolEntry *pool = EvictionPoolLRU;
    
        while(bestkey == NULL) {
            unsigned long total_keys = 0, keys;
    
            // 从每个数据库抽样key填充淘汰池
            for (i = 0; i < server.dbnum; i++) {
                db = server.db+i;
                // db->dict: 数据库所有key集合
                // db->expires: 数据中设置过期时间的key集合
                // 判断淘汰策略是否是针对所有键的
                dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                        db->dict : db->expires;
                // 计算字典元素数量,不为0才可以挑选key
                if ((keys = dictSize(dict)) != 0) {
                    // 填充淘汰池,四个参数分别为 dbid,候选集合,主字典集合,淘汰池
                    // 填充完的淘汰池内部是有序的,按空闲时间升序
                    evictionPoolPopulate(i, dict, db->dict, pool);
                    // 已遍历检测过的key数量
                    total_keys += keys;
                }
            }
            // 如果 total_keys = 0,没有要淘汰的key(redis没有key或者没有设置过期时间的key),break
            if (!total_keys) break; /* No keys to evict. */
    
            // 遍历淘汰池,从淘汰池末尾(空闲时间最长)开始向前迭代
            for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                if (pool[k].key == NULL) continue;
                // 获取当前key所属的dbid
                bestdbid = pool[k].dbid;
    
                if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                    // 如果淘汰策略针对所有key,从 redisDb.dict 中取数据,redisDb.dict 指向所有的键值集合
                    de = dictFind(server.db[pool[k].dbid].dict,
                        pool[k].key);
                } else { // 如果淘汰策略不是针对所有key,从 redisDb.expires 中取数据,redisDb.expires 指向已过期键值集合
                    de = dictFind(server.db[pool[k].dbid].expires,
                        pool[k].key);
                }
    
               
                if (pool[k].key != pool[k].cached)
                    sdsfree(pool[k].key);
                // 从池中删除这个key,不管这个key还在不在
                pool[k].key = NULL;
                pool[k].idle = 0;
    
                // 如果这个节点存在,就跳出这个循环,否则尝试下一个元素。
                // 这个节点可能已经不存在了,比如到了过期时间被删除了
                if (de) {
                    // de是key所在哈希表节点,bestkey是 key 名
                    bestkey = dictGetKey(de);
                    break;
                } else {
                    /* Ghost... Iterate again. */
                }
            }
        }
    }
    

    上面的源码注释已经很清晰了,大概有如下几个步骤:

    1. 初始化淘汰池
    2. 遍历数据库
    3. 根据淘汰策略是所有key还是过期key,从而选择不同的数据集(redisDb.expires or redisDb.dict)
    4. 调用evictionPoolPopulate方法,传入上一步选择的数据集,填充淘汰池数据并排好序
    5. 从淘汰池中选取要淘汰的key(空闲时间最长的key),并且删除淘汰池中该key的引用。如果该key已经没了,那么选取淘汰池中次优的key,直到找到一个还存在的key

    核心步骤当然是evictionPoolPopulate方法,我们来来看看淘汰池的元素结构。

    evict.c
    
    // 淘汰池大小
    #define EVPOOL_SIZE 16
    // 淘汰池缓存的最大sds大小
    #define EVPOOL_CACHED_SDS_SIZE 255
    
    struct evictionPoolEntry {
        // 对象空闲时间
        // 这被称为空闲只是因为代码最初处理 LRU,但实际上只是一个分数,其中更高的分数意味着更好的候选者。
        unsigned long long idle;    /* Object idle time (inverse frequency for LFU) */
        sds key;                    /* Key name. */
        // 用来存储一个sds对象留待复用,注意我们要复用的是sds的内存空间,只需关注cached的长度(决定是否可以复用),无需关注他的内容
        sds cached;                 /* Cached SDS object for key name. */
        int dbid;                   /* Key DB number. */
    };
    

    淘汰池不只可以给LRU使用,你可以在后面的源码中看到,LFU以及TTL也会使用淘汰池

    知道了淘汰池长什么样子我相信下面的代码你就好理解了,无非是按idle字段升序排序

    evict.c
    
    /* 
     * 这是 freeMemoryIfNeeded() 的辅助函数,用于在每次我们想要key过期时用一些条目填充 evictionPool。
     * 
     * 添加空闲时间小于当前所有key空闲时间的key,如果池是空的则key会一直被添加
     * 
     * 我们按升序将键依次插入,因此空闲时间较小的键在左侧,而空闲时间较长的键在右侧。
     */
    void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
        int j, k, count;
        // 初始化抽样集合,大小为 server.maxmemory_samples
        dictEntry *samples[server.maxmemory_samples];
        // 此函数对字典进行采样以从随机位置返回一些键
        count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
        for (j = 0; j < count; j++) {
            // 这被称为空闲只是因为代码最初处理 LRU,但实际上只是一个分数,其中更高的分数意味着更好的候选者。
            unsigned long long idle;
            // key
            sds key;
            // 值对象
            robj *o;
            // 哈希表节点
            dictEntry *de;
    
            de = samples[j];
            key = dictGetKey(de);
    
            /* 如果我们采样的字典不是主字典(而是过期的字典),我们需要在键字典中再次查找键以获得值对象。*/
            if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
                if (sampledict != keydict) de = dictFind(keydict, key);
                o = dictGetVal(de);
            }
    
            /* 根据策略计算空闲时间。 这被称为空闲只是因为代码最初处理 LRU,但实际上只是一个分数,其中更高的分数意味着更好的候选者。*/
            if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
                // 给定一个对象,使用近似的 LRU 算法返回未请求过该对象的最小毫秒数
                idle = estimateObjectIdleTime(o);
            } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { // LFU 策略也是用这个池子
                idle = 255-LFUDecrAndReturn(o);
            } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
                // 在这种情况下,越早过期越好。
                idle = ULLONG_MAX - (long)dictGetVal(de);
            } else {
                serverPanic("Unknown eviction policy in evictionPoolPopulate()");
            }
    
            /* 将元素插入池中*/
            k = 0;
            // 遍历淘汰池,从左边开始,找到第一个空桶或者第一个空闲时间大于等于待选元素的桶,k是该元素的坐标
            while (k < EVPOOL_SIZE &&
                   pool[k].key &&
                   pool[k].idle < idle) k++;
            if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
                /* 如果元素小于我们拥有的最差元素并且没有空桶,则无法插入。
                 * 
                 * key == 0 说明上面的while循环一次也没有进入
                 * 要么第一个元素就是空的,要么所有已有元素的空闲时间都大于等于待插入元素的空闲时间(待插入元素比已有所有元素都优质)
                 * 又因为数组最后一个key不为空,因为是从左边开始插入的,所以排除了第一个元素是空的
                 */
                continue;
            } else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
                /* 插入空桶,插入前无需设置 */
            } else {
                /* 插入中间,现在 k 指向比要插入的元素空闲时间大的第一个元素 */
                if (pool[EVPOOL_SIZE-1].key == NULL) {
                    /* 数组末尾有空桶,将所有元素从 k 向右移动到末尾。*/
                    /* 覆盖前保存 SDS */
                    sds cached = pool[EVPOOL_SIZE-1].cached;
                    // 注意这里不设置 pool[k], 只是给 pool[k] 腾位置
                    memmove(pool+k+1,pool+k,
                        sizeof(pool[0])*(EVPOOL_SIZE-k-1));
                    // 转移 cached (sds对象)
                    pool[k].cached = cached;
                } else {
                    
                    /* 右边没有可用空间? 在 k-1 处插入 */
                    k--;
                    /*
                     * 将k(包含)左侧的所有元素向左移动,因此我们丢弃空闲时间较小的元素。
                     */
                    sds cached = pool[0].cached;
                    if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
                    memmove(pool,pool+1,sizeof(pool[0])*k);
                    pool[k].cached = cached;
                }
            }
            /*
             * 尝试重用在池条目中分配的缓存 SDS 字符串,因为分配和释放此对象的成本很高
             * 注意真正要复用的sds内存空间,避免重新申请内存,而不是他的值
             */
            int klen = sdslen(key);
            // 判断字符串长度来决定是否复用sds
            if (klen > EVPOOL_CACHED_SDS_SIZE) {
                // 复制一个新的 sds 字符串并赋值
                pool[k].key = sdsdup(key);
            } else {
                /*
                 * 内存拷贝函数,从数据源拷贝num个字节的数据到目标数组
                 * 
                 * destination:指向目标数组的指针 
                 * source:指向数据源的指针 
                 * num:要拷贝的字节数
                 *
                 */
                // 复用sds对象
                memcpy(pool[k].cached,key,klen+1);
                // 重新设置sds长度
                sdssetlen(pool[k].cached,klen);
                // 真正设置key
                pool[k].key = pool[k].cached;
            }
            // 设置空闲时间
            pool[k].idle = idle;
            // 设置key所在db
            pool[k].dbid = dbid;
        }
    }
    
    

    至此,经过以上步骤,我们能够得到一个bestkey(LRU策略下即最长空闲时间的key),还有一个有了部分数据的淘汰池。

    下一步,我们就要删除这个key来释放内存空间了。

    第三步:如何删除key

    // 不断循环删除key,直至释放足够的内存
    while (mem_freed < mem_tofree) {
        // 最佳淘汰key
        sds bestkey = NULL;
        // LRU策略或者LFU策略或者VOLATILE_TTL策略
        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {
            // 不同的策略找bestKey
        }
        // 最后选定的要删除的key
        if (bestkey) {
            db = server.db+bestdbid;
            robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
            // 处理过期key到从节点和 AOF 文件
            // 当 master 中的 key 过期时,则将此 key 的 DEL 操作发送到所有 slaves 和 AOF 文件(如果启用)。
            propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
           
            // 获取已使用内存
            delta = (long long) zmalloc_used_memory();
            
            // 是否开启lazyfree机制
            // lazyfree的原理就是在删除对象时只是进行逻辑删除,然后把对象丢给后台,让后台线程去执行真正的destruct,避免由于对象体积过大而造成阻塞。
            if (server.lazyfree_lazy_eviction)
                dbAsyncDelete(db,keyobj);
            else
                dbSyncDelete(db,keyobj);
            
            // 计算删除key后的内存变化量
            delta -= (long long) zmalloc_used_memory();
            
            // 计算已释放内存
            mem_freed += delta;
            
        }
    }
    
    

    第四步:什么时候停止淘汰key

    通过上面的源码我们能看到while循环条件是 while (mem_freed < mem_tofree),而在每一次删除key的时候,都会累加已释放key所占有的内存:

    // 不断循环删除key,直至释放足够的内存
    while (mem_freed < mem_tofree) {
        if (bestkey) {
            // 获取已使用内存
            delta = (long long) zmalloc_used_memory();
            
            // 是否开启lazyfree机制
            // lazyfree的原理就是在删除对象时只是进行逻辑删除,然后把对象丢给后台,让后台线程去执行真正的destruct,避免由于对象体积过大而造成阻塞。
            if (server.lazyfree_lazy_eviction)
                dbAsyncDelete(db,keyobj);
            else
                dbSyncDelete(db,keyobj);
            
            // 计算删除key后的内存变化量
            delta -= (long long) zmalloc_used_memory();
            
            // 计算已释放内存
            mem_freed += delta;
            
        }
    }
    

    除此之外,如果配置开启了server.lazyfree_lazy_eviction(异步淘汰),那么每淘汰一些key后还会检查一下内存状态,如果内存已经达到期望了,那么就可以手动满足循环终止条件。

    // 不断循环删除key,直至释放足够的内存
    while (mem_freed < mem_tofree) {
        if (bestkey) {
            // 获取已使用内存
            delta = (long long) zmalloc_used_memory();
            
            // 是否开启lazyfree机制
            // lazyfree的原理就是在删除对象时只是进行逻辑删除,然后把对象丢给后台,让后台线程去执行真正的destruct,避免由于对象体积过大而造成阻塞。
            if (server.lazyfree_lazy_eviction)
                dbAsyncDelete(db,keyobj);
            else
                dbSyncDelete(db,keyobj);
            
            // 计算删除key后的内存变化量
            delta -= (long long) zmalloc_used_memory();
            
            // 计算已释放内存
            mem_freed += delta;
            
            /*
             * 通常我们的停止条件是能够释放固定的、预先计算的内存量。 
             * 然而,当我们在另一个线程中删除对象时,最好不时检查我们是否已经到达我们的目标内存,因为“mem_freed”数量仅在 dbAsyncDelete() 调用中计算,而线程可以无时无刻释放内存
             */
            if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
                if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                    /* Let's satisfy our stop condition. */
                    // 手动满足停止条件
                    mem_freed = mem_tofree;
                }
            }
            
        }
        
    }
    

    最后

    限于篇幅,尽可能多的放了源码,没有连贯的文字描述,如果你觉得这样看起来比较累的话,可以去我的Github看看源码注释

  • 相关阅读:
    vm12序列号
    三星手机官方固件下载
    MSTP故障处理手册
    分享一个高清壁纸网站
    ThinkPad X220 完美黑苹果 Hackintosh OS X 10.11 El Capitan
    一句命令激活windows/office
    Win10+VMplayer12中U盘无法挂载解决
    记一次金士顿DT100 G3 32G修复
    飘雪代码2枚
    禁用安全模式
  • 原文地址:https://www.cnblogs.com/lifan1998/p/15001630.html
Copyright © 2011-2022 走看看