18.1 相关命令介绍
Redis的Stream命令都以x开头。
1.xadd命令
作用: 将指定消息数据追加到指定的Stream队列中或裁减列中数据长度。
格式:xadd key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ...
说明: 每条消息由一或多个阈值对组成,消息插入队列中后会返回唯一的消息ID。
xadd是唯一可以向Stream队列添加数据的命令。
1) MAXLEN:当Stream中数据量过大时,通过此关键字裁剪长度,删除旧数据至指定的值;当 数据量小于等于指定值时,不进行剪切。其中裁剪模式有两种。
·~:模糊裁剪,优化精确裁剪,一般用此模式,效率更高。
·=:精确裁剪,数据存储的listpack结构体中,裁剪长度的所有阈值是依照数据从老到新的方式,依次把listpack释放掉,此模式下删除最后一个listpack中的数据比较费时,所以推荐用模糊裁剪。
2)ID:添加消息可指定具体值或用"*"代替,指定的值必须大于当前队列中最大的消息ID,为*时则默认生成一个最新的ID,ID值取的是当前时间+序列号。
示例:
①添加一条数据,不指定ID值。 xadd mytopic * name tom age 20
②添加一条数据,指定ID值: xadd mytopic 1547127055889-0 name jim age 21
③修改长度,如果发现添加新元素后的超过100万条消息,则删除旧消息使长度大约缩减至100万个元素。
2.xrange命令
用于读取给定ID范围内的消息数据,并可以设置返回数据的条数。
格式:xrange key start end [COUNT count]
说明: 将返回两个ID之间(闭区间)的所有消息,消息排序为ID递增排序。
·start:开始消息ID,指定具体值或通过"-"特殊符号来表示最小ID。
·end:结束消息ID,指定具体值或通过"+"特殊符号来表示最大ID。
·COUNT:设定返回的消息数量。
示例 : xrange mytopic - + COUNT 2
3.xrevrange命令
说明: 与xrange用法唯一区别是返回数据的顺序为消息ID的递减序,正好与xrange返回的数据顺序相反。
4.xdel命令
用于删除Stream队列中指定的一或多个消息ID对应的数据。
格式:
xdel key ID [ID ...]
说明:
·key:类型必须为OBJ_STREAM,否则报错。
·ID:为指定的一或多个消息ID。注意:ID不可为特殊符号"-"和"+",不支持范围删除。
示例: xdel mytopic 1547127055879-0
5.xgroup命令
用于队列的消费组管理,包含对消费组的创建、删除、修改等操作。
格式:xgroup [CREATE key groupname id-or-$]
[SETID key id-or-$]
[DESTROY key groupname]
[DELCONSUMER key groupname consumername]
[HELP]
说明:S tream队列可以被多个消费组订阅,每个消费组都会记录最近一次消费的消息last_id,一个消费组可以拥有多个消费者去消费,组内消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_id往前移动,每个消费者有一个组内唯一名称。而xgroup命令就是用于消费组管理。
·CREATE:创建一个新消费组。该选项末尾设置了MKSTREAM参数,当创建消费组的键值对不存在时,则会创建一个新的消费组。
·SETID:修改某个消费组消费的消息last_id。
·DESTROY:删除指定消费组。
·DELCONSUMER:删除指定消费组中某个消费者。
·HELP:查看使用帮助。
示例:
1)创建一个消费组:xgroup CREATE mytopic cg1 1547127055879-0
//创建一个消费组cg1,从消息id为1547127055879-0的消息开始消费
最后一个参数是指定该消费组开始消费的消息ID,其中"0"或"0-
0",表示从头开始消费,如果使用特殊符"$",则表示队列中最后一
项ID,只读取消息队列中新到的消息。
2)修改消费组的last_id:
xgroup SETID mytopic cg1 1547127055888-0
//修改消费组cg1,从消息id为1547127055888-0的消息开始消费
6.xreadgroup命令
用于从消费组中可靠地消费n条消息,如果指定的消费者不存在,则创建之。
格式:
xreadgroup GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
说明:
·group:消费组名称。
·consumer:消费者名称。
·COUNT:消费多少条数据。
·BLOCK:是否为阻塞模式,milliseconds为阻塞多少毫秒。
·STREAMS:Stream队列名称,可指定多个。若指定多个,则ID也要对应指定相同个数。
·ID:读取只大于指定消息ID后未确认的消息;特殊符号">",读取未传递给其他任何消费者的消息,也就是新消息。
·NOACK:该消息不需要确认。
示例:
从Stream队列的消费组cg1中新建一个消费者c1,并消费一条数据。
XREADGROUP GROUP cg1 c1 COUNT 1 STREAMS mytopic >
7.xread命令
用于从Stream队列中读取N条消息,一般用作遍历队列中的消息。
格式:
xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
说明: 此命令读取消息后无须通过XACK确认,也不需要强制指定消费组名称与消费者名称。
·COUNT:读取多少条数据;
·BLOCK:是否为阻塞模式,milliseconds为阻塞多少毫秒;
·STREAMS:Stream队列名称;
·ID:指定从消息ID开始读取,即消息ID大于指定的ID的消息,可为"$"特殊符号,代表从最后一条开始读取。
示例: xread COUNT 10 STREAMS mytopic 0
8.xack命令
用于确认一或多个指定ID的消息,使其从待确认列表中删除。
格式:xack key group ID [ID ...]
说明: 为了确保每个消息能被消费者消费到,通过xreadgroup消费的消息会存储在该消费组的未确认列表中,直到客户端确认该消息,才会从未确认列表中删除。
·group:消费组名称;
·ID:确认的消息ID。
示例:xack mytopic cg1 1547127055889-0
9.xpending命令
读取某消费组或者某个消费者的未确认消息,返回未确认的消息ID、空闲时间、被读取次数。
格式:xpending key group [start end count] [consumer]
说明:
·group:指定的消费组;
·start:范围开始ID,可以为特殊符"-"表示开始或指定ID;
·end:范围结束ID,可以为特殊符"+"标识结尾或指定ID;
·count:读取条数;
·consumer:指定的消费者。
示例:
①读取消费组cg1中消费者c1的所有待确认消息。
xpending mytopic cg1 - + 2 c1
1) 1) "1547127055889-0" //消息ID
2) "c1" //消费者名称
3) (integer) 653752 //间隔多久未确认
4) (integer) 11 //已被读取次数
②读取消费组cg1的所有待确认消息。
127.0.0.1:6379> xpending mytopic CG1 - + 10
10.xclaim命令
用于改变一或多个未确认消息的所有权,新的所有者是在命令参数中指定。
格式:
xclaim key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]
说明:
·consumer:指定新的消费者;
·min-idle-time:指定消息最小空闲数;
·ID:指定消息ID;
·IDLE:将该消息空闲时间设置为指定毫秒数,默认IDLE值为0;
·TIME:将该消息空闲时间设置为指定UNIX时间;
·RETRYCOUNT:被读取次数重置为指定次数,默认不去修改,避免丢失真实被读取次数;
·force:在待处理条目列表(PEL)中创建待处理消息条目,即使某些指定的ID尚未在分配给不同客户端的待处理条目列表(PEL)中;
·justid:只返回成功认领的消息ID数组。
示例: 认领ID为"1547294557195-0"的消息,仅当消息闲置至少1小时时,将所有权分配给消费者c2,并将该消息的空闲时间置为0,被交付读取次数也改为0。
xclaim mytopic1 cg1 c2 3600000 1547294557195-0 IDLE 0 RETRY-COUNT 0
11.xinfo命令
xinfo命令用于读取消息队列、消费组、消费者等的信息。
格式:
xinfo [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
说明:
·CONSUMERS:用于查看某个消费组下的消费者信息;
·GROUPS:用于查看某个Stream队列下的消费组信息;
·STREAM:用于查看某个Stream队列的整体组信息。
示例:
1)查看消费组c1中消费者消费信息:
xinfo CONSUMERS mytopic cg1
2)查看Stream队列信息:
xinfo STREAM mytopic
3)查看Stream队列中消费组信息:
xinfo GROUPS mytopic
12.xtrim命令
作用是缩减消息队列。
格式:xtrim key MAXLEN [~] count
说明: 当Stream中数据量过大时,可通过此命令字来缩减Stream队列长度,删除Stream中旧数据直到长度减少至指定的值;当数据量小于等于指定值时,不做剪切,此命令与xadd中通过MAXLEN字段实现裁剪的逻辑是一致的。其中count为指定的长度。
其中裁剪模式有两种:
·~:模糊裁剪,优化精确裁剪一般用此模式,效率更高。
·=:精确裁剪。
示例: xadd mytopic * name tom age 20
13.xlen命令
xlen命令用于获取Stream队列的数据长度。
格式:xlen key ID [ID ...]
示例: xlen mytopic
18.2 基本操作命令原理分析
添消息、删除消息、范围查找、遍历消息、获取队列信息、长度统计、裁剪消息;
18.2.1 添加消息
xaddCommand函数。
void xaddCommand(client *c) {
streamID id;
int id_given = 0; /* Was an ID different than "*" specified? */
long long maxlen = -1; /* If left to -1 no trimming is performed. */
int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
the maxium length is not applied verbatim. */
int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
/* 1)解析参数,主要对MAXLEN/ID/[field value]对的解析,参数是否合法,并赋值给不同的变量 */
int i = 2;
for (; i < c->argc; i++) {
int moreargs = (c->argc-1) - i; /*添加的[field value]对参数长度 */
char *opt = c->argv[i]->ptr;
if (opt[0] == '*' && opt[1] == '