13.1 相关命令介绍
Redis列表对象的底层数据结构是quicklist;
13.1.1 命令列表
表13-1 列表命令
13.1.2 栈和队列命令列表
栈与队列是操作受限制的线性表,
栈只允许在线性表的同一侧执行插入或删除操作,具有先进后出的特性;
队列只允许在一侧插入另一侧删除,具有先进先出的特性。
如表13-2所示,通过这些列表命令可以很容易实现栈与队列。
表13-2 栈和队列相关命令
1.实现栈
如图13-1所示,列表节点为(1,3,4),我们对它进行了两次右侧入栈操作,列表节点变为了(1,3,4,10,9)。然后我们对该列表进行了一次出栈操作,列表节点最终为(1,3,4,10)。
图13-1 栈的操作
栈要求操作在同一侧进行,可以通过lpush和lpop命令组合实现,也可以通过rpush和lpop命令组合实现。(左进左出或者右进右出)
如果需要在pop时等待数据产生,可以使用阻塞的pop,即通过lpush和blpop命令组合实现阻塞的栈,或者通过rpush和brpop命令组合实现阻塞的栈。
2.实现队列
如图13-2所示,列表的节点为(1,3,4)。在右侧进行了入队列操作,节点变为(1,3,4,10)。在左侧进行了出队列操作,列表节点变为了(3,4,10)。
图13-2 队列的操作
队列要求在一侧插入、另一侧删除,可以通过lpush和rpop命令组合实现,或者通过rpush和lpop命令组合实现。(左进右出或者右进左出)
如果需要在pop时等待数据产生,可以使用阻塞的pop,即通过lpush和brpop命令组合实现阻塞的队列,或者通过rpush和blpop命令组合实现阻塞的队列。
13.2 push/pop相关命令
13.2.1 push类命令的实现
lpush命令作用是在列表头部插入元素,并返回列表的总长度。
格式: lpush key value [value...]
入口函数为lpushCommand,内部调用pushGenericCommand实现,参数LIST_HEAD表示列表的头部。
void pushGenericCommand(client *c, int where) {
int j, pushed = 0;
//根据key查找列表对象,如果没有查找到,则创建新的列表对象
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
for (j = 2; j < c->argc; j++) {
if (!lobj) {
lobj = createQuicklistObject(); //创建新的列表对象, 底层为qucklist
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
dbAdd(c->db,c->argv[1],lobj);
}
listTypePush(lobj,c->argv[j],where); //将值value插入列表
pushed++;
}
...
}
函数listTypePush首先会校验列表对象的编码,如果不是OBJ_ENCODING_quicklist则直接返回错误,即列表对象的底层数据结构只有quicklist。函数quicklistPush是数据结构quicklist提供的API,用于将value插入到quicklist指定位置,在第7章已经详细介绍了 。
void listTypePush(robj *subject, robj *value, int where) {
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
value = getDecodedObject(value);
size_t len = sdslen(value->ptr);
quicklistPush(subject->ptr, value->ptr, len, pos);
decrRefCount(value);
} else {
serverPanic("Unknown list encoding");
}
}
lpushx、rpush、rpushx命令实现与LPUSH非常类似;
13.2.2 pop类命令的实现
rpop命令的作用是从列表尾部弹出元素,并返回给客户端。
格式:rpop key
示例:
127.0.0.1:6379> rpush mylist 1 2 3 4
(integer) 4
127.0.0.1:6379> rpop mylist
"4"
rpop命令的入口函数为rpopCommand,内部调用popGenericCommand实现,参数LIST_TAIL表示列表的尾部。
void popGenericCommand(client *c, int where) {
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
//函数listTypePop内部主要调用quicklistPopCustom实现quicklist元素的获取,这是数据结构quicklist提供的API
robj *value = listTypePop(o,where);
if (value == NULL) {
addReplyNull(c);
} else {
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
addReplyBulk(c,value);
decrRefCount(value);
if (listTypeLength(o) == 0) {
dbDelete(c->db,c->argv[1]);
}
}
}
命令lpop的实现与rpop非常类似,都是调用popGenericCommand函数完成的,只是第2个参数传递的是quicklist_HEAD;
13.2.3 阻塞push/pop类命令的实现
blpop是阻塞版本的POP:即当列表对象不存在,会阻塞客户端,直到列表对象不为空或者阻塞时间超过timeout为止,timeout为0表示阻塞时间无限长
格式:blpop key [key ...] timeout
示例:127.0.0.1:6379> blpop list1 list2 list3 0
blpop命令的入口函数为blpopCommand,内部调用blockingPopGenericCommand实现,参数LIST_HEAD表示列表的尾部。
函数blockingPopGenericCommand遍历所有列表key,当列表不为空时,则从列表中弹出元素并返回给客户端,注意这里同时会返回列表key与弹出的元素value。从代码实现中同样可以看出,只要这多个列表中至少有一个列表不为空,命令就会直接返回而不会阻塞客户端。阻塞客户端的逻辑由函数blockForKeys实现。另外需要注意的一点是,这里解析的timeout超时时间是一个绝对时间。
/* Blocking RPOP/LPOP */
void blockingPopGenericCommand(client *c, int where) {
//遍历所有key,只要有一个列表不为空,就从该列表中弹出元素并返回
for (j = 1; j < c->argc-1; j++) {
o = lookupKeyWrite(c->db,c->argv[j]);
if (o != NULL) {
if (o->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
} else {
if (listTypeLength(o) != 0) {
//弹出元素并返回给客户端
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
robj *value = listTypePop(o,where);
serverAssert(value != NULL);
//注意返回的是列表的key和value
addReplyArrayLen(c,2);
addReplyBulk(c,c->argv[j]);
addReplyBulk(c,value);
decrRefCount(value);
notifyKeyspaceEvent(NOTIFY_LIST,event,
c->argv[j],c->db->id);
if (listTypeLength(o) == 0) {
dbDelete(c->db,c->argv[j]);
}
return; //命令结束
}
}
}
}
/* 所有列表都为空,阻塞客户端k */
blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
}
redisDb结构体,其中有一个字段blocking_keys,类型为字典,所有因为某个键阻塞的客户端都会被添加到该字典中,其结构如图13-3所示
图13-3 阻塞键结构
函数blockForKeys主要实现逻辑如下:
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
//c->bpop类型为结构体blockingState,
c->bpop.timeout = timeout; //c->bpop.timeout记录客户端阻塞超时时间
c->bpop.target = target;
if (target != NULL) incrRefCount(target);
for (j = 0; j < numkeys; j++) {
bkinfo *bki = zmalloc(sizeof(*bki));
if (btype == BLOCKED_STREAM)
bki->stream_id = ids[j];
/* c->bpop.keys是一个字典,记录客户端因为哪些key而阻塞. */
if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) {
zfree(bki);
continue;
}
incrRefCount(keys[j]);
//将客户端添加到数据库对象redisDb的blocking_keys字典
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;
//没有则需要创建链表,并添加到字典
l = listCreate();
//同时可以看到客户端对象被添加到c->db->blocking_keys字典中,key为阻塞的键,值为客户端链表
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
serverAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
l = dictGetVal(de);
}
listAddNodeTail(l,c); //将客户端追加到链表尾部
bki->listnode = listLast(l);
}
blockClient(c,btype); // 阻塞客户端
}
函数blockClient主要设置客户端标志位为阻塞状态,记录阻塞类型,同时统计阻塞的客户端数目。
void blockClient(client *c, int btype) {
c->flags |= CLIENT_BLOCKED;
c->btype = btype;
server.blocked_clients++;
server.blocked_clients_by_type[btype]++;
addClientToTimeoutTable(c);
}
有两个触发条件会解除客户端的阻塞状态,下面我们详细介绍。
(1)进行push操作
进行push操作时,如果该列表对象不存在,会创建一个空的列表并添加到数据库字典,此时会检测是否有某个客户端因为该列表键阻塞。函数如下:
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val);
serverAssertWithInfo(NULL,key,retval == DICT_OK);
if (val->type == OBJ_LIST ||
val->type == OBJ_ZSET ||
val->type == OBJ_STREAM)
signalKeyAsReady(db, key);
if (server.cluster_enabled) slotToKeyAdd(key->ptr);
}
函数signalKeyAsReady会将该键添加到server.ready_keys链表中,节点类型为ready-List:
void signalKeyAsReady(redisDb *db, robj *key) {
readyList *rl;
/* 没有客户端因为该键而阻塞 */
if (dictFind(db->blocking_keys,key) == NULL) return;
/* 已经处理过了 */
if (dictFind(db->ready_keys,key) != NULL) return;
/* 把该键添加到ready_keys字典*/
rl = zmalloc(sizeof(*rl));
rl->key = key;
rl->db = db;
incrRefCount(key);
listAddNodeTail(server.ready_keys,rl);
incrRefCount(key);
serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}
redis的命令处理入口processCommand,在调用完命令后,如果检测到server.ready_keys链表不为空,则说明有客户端可以解除阻塞状态了:
int processCommand(client *c) {
...
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
//处理阻塞键
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
如下代码所示函数handleClientsBlockedOnKeys首先遍历server.ready_keys链表,获取到多个列表键,同时在db->blocking_keys字典中查找是否存在某个因为该列表键阻塞的客户端,如果有则解除客户端阻塞状态。函数unblockClient主要用于去除客户端阻塞状态并向客户端返回数据。
void handleClientsBlockedOnKeys(void) {
while(listLength(server.ready_keys) != 0) {
list *l;
l = server.ready_keys;
server.ready_keys = listCreate();
//遍历server.ready_keys链表
while(listLength(l) != 0) {
listNode *ln = listFirst(l);
readyList *rl = ln->value;
//删除db->ready_keys,signalKeyAsReady处有校验
dictDelete(rl->db->ready_keys,rl->key);
server.fixed_time_expire++;
updateCachedTime(0);
//查找列表对象 .
robj *o = lookupKeyWrite(rl->db,rl->key);
if (o != NULL) {
if (o->type == OBJ_LIST)
serveClientsBlockedOnListKey(o,rl);
else if (o->type == OBJ_ZSET)
serveClientsBlockedOnSortedSetKey(o,rl);
else if (o->type == OBJ_STREAM)
serveClientsBlockedOnStreamKey(o,rl);
serveClientsBlockedOnKeyByModule(rl);
}
server.fixed_time_expire--;
/* Free this item. */
decrRefCount(rl->key);
zfree(rl);
listDelNode(l,ln);
}
listRelease(l); /* We have the new list on place at this point. */
}
}
void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
//获取到因为该列表键阻塞的所有客户端 */
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
while(numclients--) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
robj *value = listTypePop(o,where);
if (value) { //遍历客户端链表,解除其阻塞状态
if (dstkey) incrRefCount(dstkey);
unblockClient(receiver);
...
} else {
break;
}
}
}
if (listTypeLength(o) == 0) {
dbDelete(rl->db,rl->key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);
}
}
(2)阻塞超时
阻塞超时同样会解除客户端阻塞状态,同时向客户端返回nil。服务器如何检测客户端通过时间事件处理函数serverCron检测客户端阻塞超时。入口函数为clientsCron,处理客户端所有定时任务,它会遍历所有客户端,并调用函数clientsCronHandleTimeout实现阻塞超时处理逻辑,注意第2个参数为当前时间:
void clientsCron(void) {
int numclients = listLength(server.clients);
int iterations = numclients/server.hz;
mstime_t now = mstime();
if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
numclients : CLIENTS_CRON_MIN_ITERATIONS;
while(listLength(server.clients) && iterations--) {
client *c;
listNode *head;
listRotateTailToHead(server.clients);
head = listFirst(server.clients);
c = listNodeValue(head);
if (clientsCronHandleTimeout(c,now)) continue;
if (clientsCronResizeQueryBuffer(c)) continue;
if (clientsCronTrackExpansiveClients(c)) continue;
if (clientsCronTrackClientsMemUsage(c)) continue;
}
}
而函数clientsCronHandleTimeout实现就需要比较客户端的超时时间timeout与当前时间即可。
至于命令brpop、brpoplpush都和blpop大同小异;
13.3 获取列表数据
对于列表对象,可以获取指定索引的元素,获取列表的长度,查询指定索引范围内的所有元素。
13.3.1 获取单个元素
lindex命令的作用是获取索引为index的元素。
格式:lindex key
示例:127.0.0.1:6379> lindex mylist 1
命令的入口函数为lindexCommand。函数quicklistIndex是数据结构Quicklist提供的API,用于查找指定索引位置的元素,在第7章已经详细介绍,这里不再赘述。如果查找到元素则返回该元素值,否则返回"$-1 "。
void lindexCommand(client *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
long index;
robj *value = NULL;
if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
return;
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklistEntry entry;
if (quicklistIndex(o->ptr, index, &entry)) {
if (entry.value) {
value = createStringObject((char*)entry.value,entry.sz);
} else {
value = createStringObjectFromLongLong(entry.longval);
}
addReplyBulk(c,value);
decrRefCount(value);
} else {
addReplyNull(c);
}
} else {
serverPanic("Unknown list encoding");
}
}
13.3.2 获取多个元素
lrange命令的作用是获取指定索引范围内的所有元素:0表示第1个元素,1表示第2个元素,以此类推;-1表示最后一个元素,-2表示倒数第2个元素,以此类推。
格式:lrange key start end
示例:127.0.0.1:6379> lrange mylist 0 -1
入口函数为lrangeCommand
首先需要计算得到正确的索引起始和终止范围。这里生成了一个迭代器list-TypeIterator,并调用方法listTypeNext迭代指定数目的元素。
迭代器底层主要通过数据结构quicklist的迭代器quicklistIter实现。
void lrangeCommand(client *c) {
...
llen = listTypeLength(o);
/*处理范围 转换负索引 */
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
if (end >= llen) end = llen-1; //超过长度则设置为全长
rangelen = (end-start)+1; //范围长度
/* Return the result in form of a multi-bulk reply */
addReplyArrayLen(c,rangelen);
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
//初始化迭代器
listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
//迭代指定数目元素并返回给客户端
while(rangelen--) {
listTypeEntry entry;
listTypeNext(iter, &entry);
quicklistEntry *qe = &entry.entry;
if (qe->value) {
addReplyBulkCBuffer(c,qe->value,qe->sz);
} else {
addReplyBulkLongLong(c,qe->longval);
}
}
listTypeReleaseIterator(iter);
} else {
serverPanic("List encoding is not QUICKLIST!");
}
}
13.3.3 获取列表长度
llen命令的作用是获取列表长度(元素数目)。
格式: llen key
示例:127.0.0.1:6379> llen mylist
入口函数为llenCommand。列表的长度即quicklist列表中元素的数目,通过函数quicklistCount很容易实现。
void llenCommand(client *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
//listTypeLength调用 quicklistCount(subject->ptr);
addReplyLongLong(c,listTypeLength(o));
}
13.4 操作列表
以向指定位置插入元素,也可以删除指定位置的元素等
13.4.1 设置元素
lset命令的作用是设置指定索引位置的元素值。
格式:lset key index value
示例: 127.0.0.1:6379> lrange mylist2 0 -1
入口函数为lsetCommand。
函数quicklistReplaceAtIndex是数据结构quick-list提供的API,用于修改指定索引位置的值;修改失败会返回错误提示"-ERR index out of range "。
void lsetCommand(client *c) {
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
long index;
robj *value = c->argv[3]; //第三个参数是修改后的value
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = o->ptr;
//指定位置设置元素,返回结果
int replaced = quicklistReplaceAtIndex(ql, index,
value->ptr, sdslen(value->ptr));
if (!replaced) { //修改失败返回错误提示:-ERR index out of range
addReply(c,shared.outofrangeerr);
} else {
addReply(c,shared.ok);
}
}
}
13.4.2 插入元素
linsert命令的作用是将值value插入到列表key,且位于值pivot之前或之后。
格式:linsert key before|after pivot value
示例: linsert mylist after 3 2 //在值3后面插入2
实现函数为linsertCommand
获取列表迭代器listTypeIterator后,会从头开始遍历所有元素,直到找到与值pivot相等的元素,并插入到值pivot的前面或者后面。 (迭代查找并更新)
void linsertCommand(client *c) {
... //确定指定位置前后
if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
where = LIST_TAIL;
} else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
where = LIST_HEAD;
} else {
addReply(c,shared.syntaxerr);
return;
}
/* //遍历quicklist,根据元素找到位置索引, Seek pivot from head to tail */
iter = listTypeInitIterator(subject,0,LIST_TAIL);
while (listTypeNext(iter,&entry)) {
if (listTypeEqual(&entry,c->argv[3])) {
listTypeInsert(&entry,c->argv[4],where); //插入元素
inserted = 1;
break;
}
}
listTypeReleaseIterator(iter);
}
13.4.3 删除元素
lrem命令的作用是移除列表中与参数value相等的元素,并返回被移除的元素数目。当count大于0时,从表头开始向表尾搜索,最大删除count个元素;当count小于0时,从表尾开始向表头搜索,最大删除"count绝对值"个元素;当count等于0时,删除所有与value相等的元素。
格式:lrem key count value
示例: 127.0.0.1:6379> lrem mylist2 2 3
入口函数为lremCommand。
toremove表示count的值,负数时起始索引为-1(即最后一个元素),遍历方向为向前遍历,正数时起始索引为0(即第1个元素),遍历方向为向后遍历。获取迭代器listTypeIterator之后,调用listTypeNext依次遍历所有元素,如果元素与值obj相等则删除元素,toremove等于0时删除所有与值obj相等的元素,否则最多删除"toremove"个元素。
void lremCommand(client *c) {
...
listTypeIterator *li;
if (toremove < 0) { //toremove即为count的值
toremove = -toremove; //负数转正
li = listTypeInitIterator(subject,-1,LIST_HEAD);
} else {
li = listTypeInitIterator(subject,0,LIST_TAIL);
}
listTypeEntry entry;
while (listTypeNext(li,&entry)) {
if (listTypeEqual(&entry,obj)) {
listTypeDelete(li, &entry);
server.dirty++;
removed++;
if (toremove && removed == toremove) break; //删除元素数目控制
}
}
listTypeReleaseIterator(li);
...
}
13.4.4 裁剪列表
ltrim命令的作用是对一个列表进行裁剪,即只保留区间start与stop内的元素,其他将被删除。
格式:ltrim key start stop
示例:127.0.0.1:6379> lpush mylist4 1 2 3 4
127.0.0.1:6379> ltrim mylist4 1 2
入口函数是ltrimCommand。
它调用函数quicklistDelRange删除指定范围内的元素来实现操作,这是数据结构quicklist提供的API。
void ltrimCommand(client *c) {
robj *o;
long start, end, llen, ltrim, rtrim;
/* 转换负数 索引 convert negative indexes */
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
if (start > end || start >= llen) {
/* 如果超过范围怎没有删除的数据*/
ltrim = llen;
rtrim = 0;
} else {
if (end >= llen) end = llen-1; //结束位置最多到列表末尾
ltrim = start; //左边索引位置
rtrim = llen-end-1; //右边索引位置
}
/* Remove list elements to perform the trim */
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklistDelRange(o->ptr,0,ltrim); //删除左边索引部分
quicklistDelRange(o->ptr,-rtrim,rtrim); //删除右边索引部分
} else {
serverPanic("Unknown list encoding");
}
....'
}
13.5 本章小结
Redis中列表的命令实现,列表底层的数据结构采用的是quicklist。
栈与队列的基本概念,如何通过push/pop实现栈与队列;
列表阻塞命令的实现,通过blpop命令讲解了客户端阻塞流程,以及解除客户端阻塞流程;
常见的列表操作和查询的命令。