本章将介绍键相关命令的源码实现。命令实现的过程中不是直接操作这些数据结构,我们将在10.1节讲解这两个结构。
在理解了redisDb和redisObject对象之后,我们按照查看键信息、设置键信息、查找和操作键将本章命令进行分类讲解,10.2节讲解查看键信息相关命令,其中object和type命令是获取redisObject对象相关属性的操作,过期时间读取和修改相关命令是对redisDb的expires字典的操作,除此之外本章的命令都是对redisDb的dict字典的操作。
10.1 对象结构体和数据库结构体回顾
对象的操作离不开redisObject结构体,数据库的操作离不开redisDb结构体;
10.1.1 对象结构体redisObject
redisObject对象,其定义在server.h文件,redisObject根据type的不同可以分为字符串对象、列表对象、集合对象、有序集合对象、散列表对象、模块对象和流对象,在执行命令前可以根据对象类型判断是否可以执行当前命令;
此外对象保存了数据底层存储所使用的编码,在存储数据时,Redis会自动选择合适的编码。对象还实现了引用计数,当程序不再使用对象时,对象会自动释放。
redisObject结构如图10-1所示,该结构体保存了长度为4bit的Redis对象类型、长度为4bit的内部存储编码、长度为24bit的lru、长度为4byte的引用计数以及8byte数据指针,redisObject是对基础数据结构的封装,命令object、type就是对此对象的encoding和type的读取操作。
图10-1 redisObject结构
1)type:type表示Redis对象的类型,占用4位,目前包含如下类型。
/* The actual Redis Object */
#define OBJ_STRING 0 /* String object. */
#define OBJ_LIST 1 /* 列表List object. */
#define OBJ_SET 2 /* 集合Set object. */
#define OBJ_ZSET 3 /* 有序集合Sorted set object. */
#define OBJ_HASH 4 /* 散列表Hash object. */
#define OBJ_MODULE 5 /* 模块对象*/
#define OBJ_STREAM 6 /* 流对象. */
2)encoding:encoding表示对象内部存储的编码,在一定条件下,对象的编码可以在多个编码之间转化,长度占用4位,包含如下编码。
#define OBJ_ENCODING_RAW 0 /* Raw representation */
#define OBJ_ENCODING_INT 1 /* 编码为整数 */
#define OBJ_ENCODING_HT 2 /* 编码为散列表 */
#define OBJ_ENCODING_ZIPMAP 3 /* 编码为zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* 不再使用:旧列表编码*/
#define OBJ_ENCODING_ZIPLIST 5 /* 编码为压缩列表 */
#define OBJ_ENCODING_INTSET 6 /* 编码为整数集合*/
#define OBJ_ENCODING_SKIPLIST 7 /* 编码为跳表*/
#define OBJ_ENCODING_EMBSTR 8 /* 编码为简短字符串*/
#define OBJ_ENCODING_Quicklist 9 /* 编码为快速链表*/
#define OBJ_ENCODING_STREAM 10 /* 编码为listpacks的基数树*/
3)lru:lru占用24位,当用于LRU时表示最后一次访问时间,当用于LFU时,高16位记录分钟级别的访问时间,低8位记录访问频率0到255,默认配置8位可表示最大100万访问频次,详细参见object命令。
4)refcount:refcount表示对象被引用的计数,类型为整型,实际应用中参考意义不大。
5)ptr:ptr是指向具体数据的指针,比如一个字符串对象,该指针指向存放数据sds的地址。
10.1.2 数据库结构体redisDb
redisDb有两个重要属性——dict和expires,分别是键空间散列表、过期时间散列表。dict保存了所有的键值对,expires保存了键的过期时间,像scan、move、sort等命令是对redisDb键空间散列表的操作,expire、persist等命令是对redisDb键的过期时间散列表的操作,具体我们将在后面讲解,redisDb结构体定义如下:
typedef struct redisDb {
dict *dict; /* 键空间字典, 存放所有键值对*/
dict *expires; /* key的超时时间字典,过期时间散列表,存放键的过期时间,注意dict和expires中的键都指向同一个键的sds*/
dict *blocking_keys; /* 阻塞的key,处于阻塞状态的键和对应的client, (BLPOP)*/
dict *ready_keys; /* 准备好的key ,解除阻塞状态的键和对应的client,与blocking_keys属性相对,为了实现需要阻塞的命令设计 */
dict *watched_keys; /* 执行事务的key,watch的键和对应的client,主要用于事务*/
int id; /* 数据库ID */
long long avg_ttl; /*数据库内所有键的平均生存时间 ,用于统计*/
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* 逐渐尝试逐个碎片整理的key列表*/
} redisDb;
10.2 查看键信息
object、type命令用于获取redisObject对象属性,ttl命令用于获取redisObject对象的过期时间;
10.2.1 查看键属性
object命令用于检查Redis对象的内部属性,一般用于排查问题。格式:
object subcommand [arguments [arguments]]
通过读取redisObject对象的refcount、encoding、lru属性实现。
object有5个子命令。
·help :帮助命令,object命令使用手册。
·refcount :获得指定键关联的值的引用数,即redisObject对象refcount属性。
·encoding :获得指定键关联的值的内部存储使用的编码,即redisObject对象encoding属性的字符串表达,对象类型与底层编码对应关系见表10-1。
·idletime :返回键的空闲时间,即自上次读写键以来经过的近似秒数。
·freq :返回键的对数访问频率计数器。当maxmemory-policy设置为LFU策略时,此子命令可用。
表10-1 对象类型与编码关系
源码分析 :
首先匹配子命令,如果子命令等于help,则返回帮助信息,
反之调用函数objectCommandLookupOrReply,此函数是object命令的辅助函数,可在不修改LRU和其他参数的情况下查找对象,并带有回复功能。如果没有找到对象则返回空;找到对象则读取相应属性,这里主要介绍下idletime和freq子命令。
idletime子命令获取key对应value的空转时间(单位:秒),注意是value的空转时间,而不是key的空转时间,因为Redis对象共享机制(0~10000的int对象会共享,这个区间可配),value对象属性可能会被相互影响,例如如果key1和key2的值都是1,那么value属性会相互影响。代码如下:
void objectCommand(client *c) {
robj *o;
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
const char *help[] = {
" ....help信息",NULL };
addReplyHelp(c, help);
} else if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) {
// 获取对象
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
addReplyLongLong(c,o->refcount);
} else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
addReplyBulkCString(c,strEncoding(o->encoding));
} else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) {
// 获取对象
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
// 如果内存驱逐策略是LFU则返回错误
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
addReplyError(c,"error info.....");
return;
}
// estimateObjectIdleTime返回空转的毫秒时间
addReplyLongLong(c,estimateObjectIdleTime(o)/1000);
}
// 计算频率
addReplyLongLong(c,LFUDecrAndReturn(o));
...
}
robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply) {
robj *o = objectCommandLookup(c,key);
if (!o) addReply(c, reply);
return o;
}
// 这是OBJECT命令的辅助功能。我们需要查找键无需修改LRU或其他参数
robj *objectCommandLookup(client *c, robj *key) {
dictEntry *de;
// 注意这里获取的是value对象(dictGetVal),而非key对象(dictGetKey)
if ((de = dictFind(c->db->dict,key->ptr)) == NULL) return NULL;
return (robj*) dictGetVal(de);
}
当maxmemory-policy设置为LFU策略时,freq子命令调用函数LFUDecrAndReturn获取value对象的访问频率,同样该命令也有因共享对象而导致的问题,LFUDecrAndReturn函数中num_periods表示衰减数量,根据配置lfu_decay_time计算(单位:分钟),与此对应的函数是LFULogIncr :
//获取对象访问频率
unsigned long LFUDecrAndReturn(robj *o) {
unsigned long ldt = o->lru >> 8;
unsigned long counter = o->lru & 255;
// 衰变算法,lfu_decay_time为可配置的衰减因子,默认为1(分钟)
unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
if (num_periods)
counter = (num_periods > counter) ? 0 : counter - num_periods;
return counter;
}
// 获取已经过去的分钟数
unsigned long LFUTimeElapsed(unsigned long ldt) {
unsigned long now = LFUGetTimeInMinutes(); // 获取当前时间分钟数,最大65 535
if (now >= ldt) return now-ldt;
return 65535-ldt+now;
}
/* 增加访问频率, 对数递增计数器。当前计数器值越大*真正实现它的可能性就越小。在255饱和. */
uint8_t LFULogIncr(uint8_t counter) {
if (counter == 255) return 255;
double r = (double)rand()/RAND_MAX;
double baseval = counter - LFU_INIT_VAL;
if (baseval < 0) baseval = 0;
// 访问频率算法,lfu_log_factor为可配置的概率因子,默认为10
double p = 1.0/(baseval*server.lfu_log_factor+1);
if (r < p) counter++;
return counter;
}
10.2.2 查看键类型
type命令用于查看key的类型,一般用于排查问题。格式: type key
返回key对应存储的值的类型,根据Redis对象type属性,可以是none(key不存在),string ,list,set,zset,hash。通过读取redisObject对象的type属性实现。
源码分析 :如果没有查找到对象,返回none,反之,读取type属性并返回他的字符串表达"o->type"即redisObject的type属性。
示例: set redis v5 -> type redis
10.2.3 查看键过期时间
ttl命令返回key剩余的生存时间,单位秒。一般用于根据key生存时间进行业务逻辑判断处理等,也可用于排查问题。类似命令还有pttl返回以毫秒为单位的生存时间,它们调用函数都相同,此处以ttl命令为例。
格式: ttl key
源码分析:
第1个参数是Redis客户端对象,client对象属性很多,这里的db表示选择的数据库,argv表示命令行参数,
第2个参数output_ms表示是否以毫秒为输出单位,首先以不修改查找对象(最后访问时间,LOOKUP_NOTOUCH)方式查找key,若不存在则返回-2,存在则获取其过期时间,若已过期则返回0,没有过期则返回未过期的时间,其他情况返回默认值-1。
/* Implements TTL and PTTL */
void ttlGenericCommand(client *c, int output_ms) {
long long expire, ttl = -1;
/* 在过期字典里查找key对应的过期时间. */
expire = getExpire(c->db,c->argv[1]);
if (expire != -1) { // 表示有过期时间
ttl = expire-mstime();
if (ttl < 0) ttl = 0;
}
if (ttl == -1) {
addReplyLongLong(c,-1);
} else { // 毫秒转秒+500四舍五入
addReplyLongLong(c,output_ms ? ttl : ((ttl+500)/1000));
}
}
示例: expire redis 20 -> ttl redis . -> pttl redis
10.3 设置键信息
本节有 设置过期时间系列命令(expire)、删除过期时间命令(persist)和重命名命令(rename/renamenx);
10.3.1 设置键过期时间
expire命令, 命令底层调用函数为expireGenericCommand,其原理是在redisDb过期字典里面添加或覆盖对应键值对,如果使用set/getset命令覆写会导致原来的过期时间被移除(参考set/getset命令源码分析),但incr/rename/lpush等非覆写命令不会修改key的过期时间。仅移除key的过期时间时可使用persist命令。
源码分析 :
basetime为基准时间,unit为时间单位,首先将秒转化为毫秒,然后再将毫秒时间转化为毫秒时间戳,统一参数之后进行对比,如果小于当前时间则删除,反之执行setExpire设置过期时间,setExpire函数将key加入redisDb对象的expires字典,值为该key的过期时间。代码如下:
void expireGenericCommand(client *c, long long basetime, int unit) {
....
if (unit == UNIT_SECONDS) when *= 1000; //单位转换
when += basetime; // 加上基准时间
// 非loading状态的主库,如果过期时间小于等于当前时间,则删除该过期key
if (checkAlreadyExpired(when)) {
robj *aux;
// 根据配置不同,有同步和异步删除key
int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :
dbSyncDelete(c->db,key);
..... return;
} else {
setExpire(c,c->db,key,when);
..... return;
}
}
10.3.2 删除键过期时间
persist命令用于移除key的过期时间,如果有。有时候我们需要将临时key变成永久key,那么可以使用persist命令处理。
格式: persist key
说明: persist用于删除key的过期时间,使key永久有效,通过将key从redisDb对象的expires字典里删除实现。
源码分析 :
调用lookupKeyWrite函数,在查找前先查询过期字典,如果ttl到期则使键过期,如果键存在,则返回键的值对象,并从数据库的过期字典中删除指定key的对象。
/* PERSIST key */
void persistCommand(client *c) {
if (lookupKeyWrite(c->db,c->argv[1])) { // 为写操作查找key的对象
if (removeExpire(c->db,c->argv[1])) { // 从过期字典里删除key的过期时间
notifyKeyspaceEvent(NOTIFY_GENERIC,"persist",c->argv[1],c->db->id);
addReply(c,shared.cone);
server.dirty++;
} else {
addReply(c,shared.czero);
}
} else {
addReply(c,shared.czero);
}
}
10.3.3 重命名键
rename命令将key重命名,使用频率较低。同样类似命令还有renamenx,表示重命名后的key不存在时才能执行成功,因底层调用同一函数,所以将两个命令合并介绍。
格式:rename key new_key
重命名key,key不存在时返回错误,存在时,将被new_key覆盖;
源码实现:
命令为renamenx时nx参数等于1,先校验旧key名是否相同、是否存在,如果新key也存在,当nx等于1时返回0,反之删除新key。如果旧key有过期时间则给新key也加上过期时间,最后删除旧key。
void renameGenericCommand(client *c, int nx) {
...
expire = getExpire(c->db,c->argv[1]); //将key的过期时间保存到expire变量
if (lookupKeyWrite(c->db,c->argv[2]) != NULL) { //新key存在则删除
if (nx) { // 如果是RENAMENX命令则不操作直接返回0
...
}
dbDelete(c->db,c->argv[2]);
}
dbAdd(c->db,c->argv[2],o); //将旧key的值对象和新的key添加到redis字典
// 如果原key有过期时间则对新key保留
if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
dbDelete(c->db,c->argv[1]); // 删除旧key
...
}
10.3.4 修改键最后访问
touch命令用于更新key的访问时间,避免被lru策略淘汰,使用频率较低。
格式:touch key [key ...]
说明: 改变key的最后访问时间。如果key不存在,则忽略该key。返回成功修改的数量。
源码分析:
在入口函数touchCommand中,循环调用lookupKeyRead函数去修改key的最后访问时间,当然前提条件是没有rbd或者aof进程在运行。关键函数为lookupKey,关键代码如下:
/* 实际上不是直接从命令中调用*应该改为依赖lookupKeyRead(),* lookupKeyWrite()和lookupKeyReadWithFlags()的实现*/
robj *lookupKey(redisDb *db, robj *key, int flags) {
...
/* 更新老化算法的访问时间。 如果有一个要保存的child,请不要这样做,因为这将导致疯狂写的副本。*/
if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
val->lru = LRU_CLOCK();
}
}
return val;
...
}
示例:
127.0.0.1:6379> set touchkey 1
127.0.0.1:6379> touch touchkey
10.4 查找键
exists命令查询键是否存在,keys命令查找符合模式的键,scan命令遍历键,以及randomkey命令随机取键。其中exists命令和randomkey命令比较常用,keys命令由于其特性,一般被禁止在线上环境使用,scan命令在遍历过程中数据可以被修改,可能会造成不严谨的返回结果,但都有其合适的使用场景。
10.4.1 判断键是否存在
判断指定的key是否存在,并返回key存在的数量。
格式:exists key1 key2 ... key_N
源码分析:
for循环调用expireIfNeeded函数,此函数在前面章节讲过,表示尝试删除已过期的key。然后调用dbExists,并根据返回结果累加数量。
void existsCommand(client *c) {
long long count = 0;
int j;
for (j = 1; j < c->argc; j++) {
if (lookupKeyRead(c->db,c->argv[j])) count++;
}
addReplyLongLong(c,count);
}
10.4.2 查找符合模式的键
keys命令匹配合适的key并一次性返回,如果匹配的键较多,则可能阻塞服务器,因此该命令一般禁止在线上使用。
格式:keys pattern
说明: keys命令的作用是查找所有符合给定模式"pattern"的key,使用该命令处理大数据库时,可能会造成服务器长时间阻塞(秒级)。
源码分析:
初始化安全迭代器(迭代过程中允许修改数据),如果传入pattern为'*'则allkeys等于true,迭代过程中判断allkeys或者字符串匹配为true并且没有过期则记录该key。
void keysCommand(client *c) {
... // di为迭代器
di = dictGetSafeIterator(c->db->dict); //将数据库键空间作为参数,初始化安全迭代器
allkeys = (pattern[0] == '*' && plen == 1); // keys *
while((de = dictNext(di)) != NULL) { // 遍历数据库键空间
sds key = dictGetKey(de);
robj *keyobj;
// 判断key是否与正则表达式匹配,若匹配且key没有过期则在回复给客户端的内容中记录
if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
keyobj = createStringObject(key,sdslen(key));
if (!keyIsExpired(c->db,keyobj)) { // key未过期,过期则删除
addReplyBulk(c,keyobj);
numkeys++;
}
decrRefCount(keyobj);
}
}
dictReleaseIterator(di);
setDeferredArrayLen(c,replylen,numkeys);
}
10.4.3 遍历键
scan命令可以遍历数据库中几乎所有的键,并且不用担心阻塞服务器。使用频率较低。
格式:scan cursor [MATCH pattern] [COUNT count]
说明: scan命令和hscan、sscan、zscan命令都用于增量迭代,每次只返回少量数据,不会有像keys命令堵塞服务器的隐患。
源码分析:
scan、sscan、hscan、zsan分别有自己的命令入口,入口中会进行参数检测和游标赋值,然后进入统一的入口函数:dictscan,具体细节详见dict章节。需要注意的是迭代都是以"桶"为单位的,所以有时候因为Hash冲突的原因,scan会多返回一些数据。代码如下:
/* The SCAN command completely relies on scanGenericCommand. */
void scanCommand(client *c) {
unsigned long cursor;
// 解析命令行游标参数
if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
scanGenericCommand(c,NULL,cursor); // scan,sscan,hscan,zsan统一入口函数
}
scanGenericCommand主要分5步。
1)解析count和match参数,如果没有指定count,默认返回10条数据。
2)开始迭代集合,如果key保存为ziplist或者intset,则一次性返回所有数据,游标为0(scan命令的游标参数为0时表示新一轮迭代开始,命令返回的游标值为0时表示迭代结束)。由于Redis设计只有数据量比较小的时候才会保存为ziplist或者intset,所以此处不会影响性能。
3)游标在保存为Hash的时候发挥作用,具体入口函数为dictScan,具体细节详见dict章节。
4)根据match参数过滤返回值,并且如果这个键已经过期也会直接过滤掉(Redis中键过期之后并不会立即删除)。
5)返回结果到客户端,是一个数组,第1个值是游标,第2个值是具体的键值对。
10.4.4 随机取键
randomkey命令随机返回数据库中的key。
格式:randomkey
说明: 在当前数据库中随机返回一个尚未过期的key(不删除)。
源码分析:
命令核心函数为dictGetRandomKey,如果Redis正在rehash,那么将1号散列表也作为随机查找的目标,否则只从0号散列表中查找节点,d->rehashidx表示rehash当前的索引。
/* Return a random entry from the hash table. */
dictEntry *dictGetRandomKey(dict *d)
{
if (dictIsRehashing(d)) { // 正在rehash
do {
/* We are sure there are no elements in indexes from 0 to rehashidx-1 */
h = d->rehashidx + (random() % (d->ht[0].size +
d->ht[1].size - d->rehashidx));
he = (h >= d->ht[0].size) ? d->ht[1].table[h - d->ht[0].size] :
d->ht[0].table[h];
} while(he == NULL);
} else {
do {
h = random() & d->ht[0].sizemask;
he = d->ht[0].table[h];
} while(he == NULL);
}
//上面对散列索引进行随机处理 ,下面这段代码是对散列冲突导致的链表再次进行random处理
/*唯一明智的方法是对元素进行计数并选择一个随机索引。. */
listlen = 0;
orighe = he;
while(he) {
he = he->next;
listlen++; //记录散列冲突链表的长度
}
listele = random() % listlen; // 在链表中随机选取一个
he = orighe;
while(listele--) he = he->next;
return he;
}
10.5 操作键
解删除键、序列化/反序列化键、移动键和键排序操作,其中del是比较常用的删除键命令,unlink是Redis 4.0为了弥补del删除大值时阻塞服务器而加入的异步删除键命令;
10.5.1 删除键
(1)del命令
同步删除一个或多个key,因为是同步删除,所以在删除大key时可能会阻塞服务器。
格式: del key [key ...]
说明: 以阻塞方式删除key。
源码分析:
del调用函数delGenericCommand,再循环调用dbSyncDelete函数,同步删除key、value、过期字典里对应的key(如果有),如果是集群还会删除key与slot(槽位)的对应关系。
void delGenericCommand(client *c, int lazy) ;
//lazy 表示惰性删除,redis 5固定传0,redis 6里面是server的配置属性读取,
int dbSyncDelete(redisDb *db, robj *key) {
/*从expires字典删除entry不会释放键的sds,因为它与主字典共享*/
// 首先从 expires 队列删除,然后再从 db->dict 中删除
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1;
} else { return 0; }
}
(2)unlink命令
以异步方式删除key,这可以避免del删除大key的问题,unlink在删除时会判断删除所需的工作量,以此决定使用同步还是异步删除(另一个线程中进行内存回收,不会阻塞当前线程),通常建议使用unlink代替DEL命令,但需Redis版本在4.0及以上。
格式 : unlink key [key ...]
说明: 根据删除key需要的工作量来选择以阻塞或非阻塞方式删除key。
源码分析:
同del一样,unlink也是调用同一个命令执行函数delGenericCommand,根据传参不同,unlink循环调用dbAsyncDelete,先删除过期字典里的key(如果有),然后调用dictUnlink从键空间删除key的关联关系并返回被删除的实例,根据实例计算删除需要的工作量和是否被引用来决定是否使用惰性删除,最后再使用dictFreeUnlinkedEntry删除dictUnlink返回的实例,同样,如果该Redis是集群模式,还会删除key与slot的对应关系。
int dbAsyncDelete(redisDb *db, robj *key) {
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
/* 如果该值由一些分配组成,那么以懒惰的方式释放*实际上会更慢...因此,在一定限制下,我们只是同步地释放该对象。 */
dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
// 返回释放对象需要的工作量,字符串对象始终返回1
size_t free_effort = lazyfreeGetFreeEffort(val);
/* 如果释放对象的工作太多,请在后台通过将对象添加到延迟空闲列表中来完成,
工作量大于阈值 并且没有被别的对象引用 LAZYFREE_THRESHOLD为64 */
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
atomicIncr(lazyfree_objects,1);
// 创建后台job,将val加入异步删除队列
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
dictSetVal(db->dict,de,NULL);
}
}
/* Release the key-val pair, or just the key if we set the val
* field to NULL in order to lazy free it later. */
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1;
} else {
return 0;
}
}
//,lazyfreeGetFreeEffort是计算删除需要的工作量,计算了几个可能会有大value出现的对象
size_t lazyfreeGetFreeEffort(robj *obj) {
if (obj->type == OBJ_LIST) { //列表对象
quicklist *ql = obj->ptr;
return ql->len;
} else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
//集合对象且编码为散列表
dict *ht = obj->ptr;
return dictSize(ht);
} else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
//有序集合且编码为跳表
zset *zs = obj->ptr;
return zs->zsl->length;
} else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
//散列对象且编码为散列表
dict *ht = obj->ptr;
return dictSize(ht);
} else {
return 1; /* 其他情况返回1. */
}
}
bioCreateBackgroundJob函数创建一个bio任务;
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]); // 线程互斥锁
listAddNodeTail(bio_jobs[type],job); // 追加任务到对应类型的链表尾部
bio_pending[type]++; // 标记未处理数据量
pthread_cond_signal(&bio_newjob_cond[type]); // 唤醒一个异步处理线程
pthread_mutex_unlock(&bio_mutex[type]); // 解锁
}
type表示任务类型,arg1、arg2、arg3为参数,在处理任务时使用。
创建的bio_job结构体包含当前时间和传入的3个参数,将bio_job结构体追加到对应类型的双向链表尾部,这个过程是线程互斥的。添加完成后调用pthread_cond_signal通知异步线程处理。在使用initServer时会调用bioInit来初始化bio,并生成3个异步处理线程,分别对应3个类型(BIO_CLOSE_FILE、BIO_AOF_FSYNC、BIO_LAZY_FREE)的双向链表,处理函数为bioProcessBackgroundJobs,该函数从任务链表头部获取数据,根据类型和参数调用相关释放函数。
void *bioProcessBackgroundJobs(void *arg) {
pthread_mutex_lock(&bio_mutex[type]); // 加上互斥锁
...
while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */
if (listLength(bio_jobs[type]) == 0) { // 链表为空,继续等待
pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
continue;
}
ln = listFirst(bio_jobs[type]); // 从链表头部获取元素
job = ln->value;
pthread_mutex_unlock(&bio_mutex[type]); //解锁
if (type == BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == BIO_AOF_FSYNC) {
redis_fsync((long)job->arg1);
} else if (type == BIO_LAZY_FREE) {
if (job->arg1) // 释放对象
lazyfreeFreeObjectFromBioThread(job->arg1);
else if (job->arg2 && job->arg3 // 释放数据库
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
else if (job->arg3) //释放集群slot与key的映射关系
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
pthread_mutex_lock(&bio_mutex[type]); // 加锁
listDelNode(bio_jobs[type],ln); // 删除任务
bio_pending[type]--; // 任务计数减1
// 广播消息
pthread_cond_broadcast(&bio_step_cond[type]);
}
}
下面主要讲一下函数
lazyfreeFreeObjectFromBioThread如何释放对象的。
void lazyfreeFreeObjectFromBioThread(robj *o) {
decrRefCount(o);
atomicDecr(lazyfree_objects,1);
}
void decrRefCount(robj *o) {
if (o->refcount == 1) {
switch(o->type) {
case OBJ_STRING: freeStringObject(o); break;
case OBJ_LIST: freeListObject(o); break;
case OBJ_SET: freeSetObject(o); break;
case OBJ_ZSET: freeZsetObject(o); break;
case OBJ_HASH: freeHashObject(o); break;
case OBJ_MODULE: freeModuleObject(o); break;
case OBJ_STREAM: freeStreamObject(o); break;
default: serverPanic("Unknown object type"); break;
}
zfree(o);
} else {
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
}
}
在函数decrRefCount中,如果对象的refcount为1表示没有别的引用,可以释放内存,switch里面对应的是各个类型对象的释放函数。在unlink命令出现之前,Redis对象的refcount是有实际意义的,为了实现具有良好性能的惰性删除,Redis对象共享只对0到10000(可配置)的整数进行共享(refcount=2147483647),其他对象都不再共享,以此降低惰性删除时频繁加解锁竞争导致的性能下降。
小结:
1.释放key比较简单,释放value稍微复杂,因为key是string格式的,而value 有多重格式,需要根据value的类型分别进行释放;
2.删除方法的 异步与同步差别就在要求删除的元素影响须大于某阀值(64) ;
3. del和unlink具体实现原理可以参考一下文章:
参考: https://blog.csdn.net/sinat_34560749/article/details/104092505
10.5.2 序列化/反序列化键
(1)dump命令
格式:dump key
说明: 序列化key并返回序列化后的数据。
源码分析:
createDumpPayload是dump命令的关键函数,即组装序列化数据。序列化格式采用以类似RDB的格式序列化对象,它由对象类型字节和序列化对象组成,具体参考第20章。序列化尾部数据如图10-2所示,由RDB数据和2字节的RDB版本号以及8字节的CRC64校验码,注意对象类型和RDB版本号都参与了CRC64的校验,并且RDB版本和CRC都是小端储存。
图10-2 序列化格式
void createDumpPayload(rio *payload, robj *o, robj *key) {
unsigned char buf[2];
uint64_t crc;
/* 以类似RDB的格式序列化对象。它由对象类型和序列化对象组成. */
rioInitWithBuffer(payload,sdsempty()); //初始化payload
serverAssert(rdbSaveObjectType(payload,o)); //序列化对象类型并以此开头
serverAssert(rdbSaveObject(payload,o,key)); //序列化对象
/* RDB version */
buf[0] = RDB_VERSION & 0xff; //保存version的低8位
buf[1] = (RDB_VERSION >> 8) & 0xff; //保存version的高8位
payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
/* CRC64 */
crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
sdslen(payload->io.buffer.ptr));
//对于目标机是大端字节序的机器,进行字节码的转换,使得不同字节序机器生成的rdb文件格式都是统一的
memrev64ifbe(&crc);
payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
}
其中rdbSaveObjectType和rdbSaveObject函数的底层实现均为rioWrite函数。最终把内存中的对象数据直接写入rio(Redis I/O)的io中。
示例 : dump redis
--> "x00xc0x01x00x9fUx0b xx18x9bxc4"
(2)restore命令
该命令使用dump命令序列化后的数据进行反序列化
格式:restore key ttl serialized-value [replace]
说明: 反序列化给定的序列化值,并与key关联。
源码分析:
查找key是否存在,并根据参数replace决定返回与否,然后校验RDB版本号和CRC64校验码,如果没问题,则将序列化数据还原为对象。
void restoreCommand(client *c) {
/* Parse additional options */
....
/* Make sure this key does not already exist here... */
....
/* Check if the TTL value makes sense, 检查TTL值是否合理 */
....
/* 验证RDB版本和数据校验和。 */
....
// 将序列化数据还原为对象
rioInitWithBuffer(&payload,c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload,key->ptr)) == NULL))
{
addReplyError(c,"Bad data format");
return;
}
/* Remove the old key if needed. */
int deleted = 0;
if (replace) //如果覆盖,则删除原对象
deleted = dbDelete(c->db,key);
....
/* Create the key and set the TTL if any */
dbAdd(c->db,key,obj);
if (ttl) { // 设置ttl参数
setExpire(c,c->db,key,ttl);
}
objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
addReply(c,shared.ok);
server.dirty++;
}
示例: restore redis 0 "x00x01zx00x8cO}$x14uxbf+"
10.5.3 移动键
(1)move命令
格式 move key db
说明: 将key移动到另一个数据库。
源码分析:
注意此命令不能工作在集群模式下,在非集群模式下检查目的数据库号,以及目的数据库key是否存在,如果都正常,将键值对添加到目的数据库,并保留原key的ttl(如果有),然后删除原数据库的key。
void moveCommand(client *c) {
...
dbAdd(dst,c->argv[1],o); // 向目的数据库添加原库的键值对
if (expire != -1) setExpire(c,dst,c->argv[1],expire); // 保留ttl,如果有
dbDelete(src,c->argv[1]); // 删除原key
...
}
示例: move redis 1
(2)migrate命令
格式:
migrate host port key|"" destination-db timeout [copy] [replace] [keys key [key ...]]
说明: 将key原子性地从当前实例传送到目标实例的指定数据库上,一旦传送成功,key保证会出现在目标实例上,并且当前实例上的key会被删除。它在执行的时候会阻塞进行迁移的两个实例,直到迁移成功,迁移失败或等待超时。
该命令的实现函数是migrateCommand()函数,其原理是在当前实例对给定key执行dump命令,将对象序列化,然后通过socket传送到目标实例,目标实例再使用restore命令对数据进行反序列化,并将反序列化所得的数据添加到数据库中。
选项copy表示保留当前实例的key,选项replace表示覆盖目标实例上的key,keys表示需要迁移的key,如果为空表示迁移所有key。
示例:
127.0.0.1:6379>migrate 127.0.0.1 6380 "" 0 3000 KEYS redis redix
10.5.4 键排序
sort命令对列表,集合或有序集合中的元素进行排序,使用方法相对复杂。
格式:
sort key [BY pattern] [LIMIT offset count] [GET pattern [GET pattern ...]] [ASC|DESC] [ALPHA] [STORE destination]
说明: 返回或保存List、Set、Zset类型的key中排序后的元素。
参数:
·BY: 使用其他键的值作为权重进行排序,如果其他键不存在则跳过排序。
·LIMIT: 限定排序返回的元素。
·GET: 跟BY作用相反,将排序结果作为权重来排序匹配的其他键,可多次使用。
·ASC|DESC: 正序倒序排序。
·ALPHA: 对字符串进行排序,默认使用数字排序。
·STORE: 将排序后的结果保存到指定的键。
实现步骤如下。
①先查找key,并判断对象类型是否为List、Set或Zset其中之一,然后将命令行参数asc、desc、alpha、limit、store、by或get解析为变量后,根据这些变量进行以下步骤(不一定全部执行)。
②初始化vectorlen长度的排序数组,每个数组都是一个redisSortObject结构,其中obj为排序键的值,score为排序数字值时使用,cmpobj为按照ALPHA排序且有BY选项时使用。
③遍历键对应的值对象,将对象加入redisSortObject的obj属性。
④遍历vector数组,给redisSortBoject的u属性赋值(如果需要)。
⑤快速排序(如果需要)。
⑥遍历数组返回。
⑦释放vector数组。
redisSortObject结构体定义如下:
typedef struct _redisSortObject {
robj *obj;
union {
double score;
robj *cmpobj;
} u;
} redisSortObject;
源码分析:
第1步,根据key找到对应的值对象,根据对象类型调用相应的长度计算方法,根据长度(vectorlen)初始化redisSortObject排序对象。
switch(sortval->type) {
case OBJ_LIST: vectorlen = listTypeLength(sortval); break;
case OBJ_SET: vectorlen = setTypeSize(sortval); break;
case OBJ_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
default: vectorlen = 0; serverPanic("Bad SORT type"); /* Avoid GCC warning */
}
....
vector = zmalloc(sizeof(redisSortObject)*vectorlen);
第2步,遍历值对象具体实现,按照对象类型分别遍历,下面给出遍历list对象代码,如果不需要排序,则只遍历vectorlen次,否则将全部遍历。将遍历得出的对象添加到Redis排序对象。
while(listTypeNext(li,&entry)) {
vector[j].obj = listTypeGet(&entry);
vector[j].u.score = 0;
vector[j].u.cmpobj = NULL;
j++;
}
第3步,如果需要排序,则遍历vector数组给redisSortObject的u赋值,变量alpha即命令行参数ALPHA,变量sortby即命令行参数BY之后的参数,对应的是代码中变量byval,如果有sortby的情况下其值就是sortby对应的值对象,反之就是redisSortObject对象的obj。
if (alpha) {
if (sortby) vector[j].u.cmpobj = getDecodedObject(byval);
} else {
if (sdsEncodedObject(byval)) { // 如果是字符串
char *eptr;
vector[j].u.score = strtod(byval->ptr,&eptr);
...
} else if (byval->encoding == OBJ_ENCODING_INT) { // 如果是整数
vector[j].u.score = (long)byval->ptr;
}
...
}
第4步,如果需要排序,则执行快速排序,pqsort与qsort都是快速排序,区别在于pqsort有左右边界,用于命令行有limit参数时。
if (!dontsort) {
...
if (sortby && (start != 0 || end != vectorlen-1))
pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
else // 快排
qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
}
第5步,输出时调用lookupKeyByPattern,如果有get参数则匹配出对应key的值对象,否则使用排序key的值对象。
byval = lookupKeyByPattern(c->db,sortby,vector[j].obj,storekey!=NULL);
10.6 本章小结
本章介绍的命令不需要判断具体类型,可以作用于任何类型的键,
需要注意的是:
move命令不能在集群模式下工作;
sort命令(子命令by/get)部分功能受限,
del和unlink在使用上应加以区别,与del一样,
在使用时可能导致服务器阻塞的命令还有hgetall、lrange、smembers、flushall、flushdb、keys等,其中前3个命令和del命令在使用时都是比较容易被忽略的;
flushall、flushdb有参数可以异步操作,具体细节可参考相应章节。
keys命令的使用也需要注意,当数据库较大时,可能会导致阻塞;
了解scan工作原理,合理选择使用场景;
dump命令序列化的数据不包含任何过期时间,所以在使用restore反序列化时需要自己指定过期时间,0表示不过期。