zoukankan      html  css  js  c++  java
  • Redis5设计与源码分析 (第18章 数据流相关命令的实现)

    18.1 相关命令介绍

    Redis的Stream命令都以x开头。

    1.xadd命令

    作用: 将指定消息数据追加到指定的Stream队列中或裁减列中数据长度。

    格式:xadd key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ...

    说明: 每条消息由一或多个阈值对组成,消息插入队列中后会返回唯一的消息ID。

    xadd是唯一可以向Stream队列添加数据的命令。

    1) MAXLEN:当Stream中数据量过大时,通过此关键字裁剪长度,删除旧数据至指定的值;当 数据量小于等于指定值时,不进行剪切。其中裁剪模式有两种。

    ·~:模糊裁剪,优化精确裁剪,一般用此模式,效率更高。

    ·=:精确裁剪,数据存储的listpack结构体中,裁剪长度的所有阈值是依照数据从老到新的方式,依次把listpack释放掉,此模式下删除最后一个listpack中的数据比较费时,所以推荐用模糊裁剪。

    2)ID:添加消息可指定具体值或用"*"代替,指定的值必须大于当前队列中最大的消息ID,为*时则默认生成一个最新的ID,ID值取的是当前时间+序列号。

    示例:

    ①添加一条数据,不指定ID值。 xadd mytopic * name tom age 20

    ②添加一条数据,指定ID值: xadd mytopic 1547127055889-0 name jim age 21

    ③修改长度,如果发现添加新元素后的超过100万条消息,则删除旧消息使长度大约缩减至100万个元素。

     

    2.xrange命令

    用于读取给定ID范围内的消息数据,并可以设置返回数据的条数。

    格式:xrange key start end [COUNT count]

    说明: 将返回两个ID之间(闭区间)的所有消息,消息排序为ID递增排序。

    ·start:开始消息ID,指定具体值或通过"-"特殊符号来表示最小ID。

    ·end:结束消息ID,指定具体值或通过"+"特殊符号来表示最大ID。

    ·COUNT:设定返回的消息数量。

    示例 : xrange mytopic - + COUNT 2

    3.xrevrange命令

    说明: 与xrange用法唯一区别是返回数据的顺序为消息ID的递减序,正好与xrange返回的数据顺序相反。

    4.xdel命令

    用于删除Stream队列中指定的一或多个消息ID对应的数据。

    格式:

    xdel key ID [ID ...]

    说明

    ·key:类型必须为OBJ_STREAM,否则报错。

    ·ID:为指定的一或多个消息ID。注意:ID不可为特殊符号"-"和"+",不支持范围删除

    示例: xdel mytopic 1547127055879-0

     

     

    5.xgroup命令

    用于队列的消费组管理,包含对消费组的创建、删除、修改等操作。

    格式:xgroup [CREATE key groupname id-or-$]

             [SETID key id-or-$]

    [DESTROY key groupname]

    [DELCONSUMER key groupname consumername]

    [HELP]

    说明:S tream队列可以被多个消费组订阅,每个消费组都会记录最近一次消费的消息last_id,一个消费组可以拥有多个消费者去消费,组内消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_id往前移动,每个消费者有一个组内唯一名称。而xgroup命令就是用于消费组管理。

    ·CREATE:创建一个新消费组。该选项末尾设置了MKSTREAM参数,当创建消费组的键值对不存在时,则会创建一个新的消费组。

    ·SETID:修改某个消费组消费的消息last_id。

    ·DESTROY:删除指定消费组。

    ·DELCONSUMER:删除指定消费组中某个消费者。

    ·HELP:查看使用帮助。

    示例

    1)创建一个消费组:xgroup CREATE mytopic cg1 1547127055879-0

    //创建一个消费组cg1,从消息id为1547127055879-0的消息开始消费

    最后一个参数是指定该消费组开始消费的消息ID,其中"0"或"0-

    0",表示从头开始消费,如果使用特殊符"$",则表示队列中最后一

    项ID,只读取消息队列中新到的消息。

    2)修改消费组的last_id:

    xgroup SETID mytopic cg1 1547127055888-0

    //修改消费组cg1,从消息id为1547127055888-0的消息开始消费

    6.xreadgroup命令

    用于从消费组中可靠地消费n条消息,如果指定的消费者不存在,则创建之。

    格式

    xreadgroup GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

    说明

    ·group:消费组名称。

    ·consumer:消费者名称。

    ·COUNT:消费多少条数据。

    ·BLOCK:是否为阻塞模式,milliseconds为阻塞多少毫秒。

    ·STREAMS:Stream队列名称,可指定多个。若指定多个,则ID也要对应指定相同个数。

    ·ID:读取只大于指定消息ID后未确认的消息;特殊符号">",读取未传递给其他任何消费者的消息,也就是新消息。

    ·NOACK:该消息不需要确认。

    示例:

    从Stream队列的消费组cg1中新建一个消费者c1,并消费一条数据。

    XREADGROUP GROUP cg1 c1 COUNT 1 STREAMS mytopic >

    7.xread命令

    用于从Stream队列中读取N条消息,一般用作遍历队列中的消息。

    格式

    xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

    说明: 此命令读取消息后无须通过XACK确认,也不需要强制指定消费组名称与消费者名称。

    ·COUNT:读取多少条数据;

    ·BLOCK:是否为阻塞模式,milliseconds为阻塞多少毫秒;

    ·STREAMS:Stream队列名称;

    ·ID:指定从消息ID开始读取,即消息ID大于指定的ID的消息,可为"$"特殊符号,代表从最后一条开始读取。

    示例: xread COUNT 10 STREAMS mytopic 0

    8.xack命令

    用于确认一或多个指定ID的消息,使其从待确认列表中删除

    格式:xack key group ID [ID ...]

    说明: 为了确保每个消息能被消费者消费到,通过xreadgroup消费的消息会存储在该消费组的未确认列表中,直到客户端确认该消息,才会从未确认列表中删除。

    ·group:消费组名称;

    ·ID:确认的消息ID。

    示例:xack mytopic cg1 1547127055889-0

    9.xpending命令

    读取某消费组或者某个消费者的未确认消息,返回未确认的消息ID、空闲时间、被读取次数。

    格式:xpending key group [start end count] [consumer]

    说明

    ·group:指定的消费组;

    ·start:范围开始ID,可以为特殊符"-"表示开始或指定ID;

    ·end:范围结束ID,可以为特殊符"+"标识结尾或指定ID;

    ·count:读取条数;

    ·consumer:指定的消费者。

    示例:

    ①读取消费组cg1中消费者c1的所有待确认消息

    xpending mytopic cg1 - + 2 c1

    1) 1) "1547127055889-0" //消息ID

    2) "c1" //消费者名称

    3) (integer) 653752 //间隔多久未确认

    4) (integer) 11 //已被读取次数

    ②读取消费组cg1的所有待确认消息。

    127.0.0.1:6379> xpending mytopic CG1 - + 10

    10.xclaim命令

    用于改变一或多个未确认消息的所有权,新的所有者是在命令参数中指定。

    格式:

    xclaim key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]

    说明:

    ·consumer:指定新的消费者

    ·min-idle-time:指定消息最小空闲数;

    ·ID:指定消息ID;

    ·IDLE:将该消息空闲时间设置为指定毫秒数,默认IDLE值为0;

    ·TIME:将该消息空闲时间设置为指定UNIX时间;

    ·RETRYCOUNT:被读取次数重置为指定次数,默认不去修改,避免丢失真实被读取次数;

    ·force:在待处理条目列表(PEL)中创建待处理消息条目,即使某些指定的ID尚未在分配给不同客户端的待处理条目列表(PEL)中;

    ·justid:只返回成功认领的消息ID数组。

    示例: 认领ID为"1547294557195-0"的消息,仅当消息闲置至少1小时时,将所有权分配给消费者c2,并将该消息的空闲时间置为0,被交付读取次数也改为0。

    xclaim mytopic1 cg1 c2 3600000 1547294557195-0 IDLE 0 RETRY-COUNT 0

    11.xinfo命令

    xinfo命令用于读取消息队列、消费组、消费者等的信息。

    格式

    xinfo [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

    说明

    ·CONSUMERS:用于查看某个消费组下的消费者信息;

    ·GROUPS:用于查看某个Stream队列下的消费组信息;

    ·STREAM:用于查看某个Stream队列的整体组信息。

    示例

    1)查看消费组c1中消费者消费信息:

    xinfo CONSUMERS mytopic cg1

    2)查看Stream队列信息:

    xinfo STREAM mytopic

    3)查看Stream队列中消费组信息:

    xinfo GROUPS mytopic

    12.xtrim命令

    作用是缩减消息队列。

    格式:xtrim key MAXLEN [~] count

    说明: 当Stream中数据量过大时,可通过此命令字来缩减Stream队列长度,删除Stream中旧数据直到长度减少至指定的值;当数据量小于等于指定值时,不做剪切,此命令与xadd中通过MAXLEN字段实现裁剪的逻辑是一致的。其中count为指定的长度。

    其中裁剪模式有两种:

    ·~:模糊裁剪,优化精确裁剪一般用此模式,效率更高。

    ·=:精确裁剪。

    示例: xadd mytopic * name tom age 20

    13.xlen命令

    xlen命令用于获取Stream队列的数据长度。

    格式:xlen key ID [ID ...]

    示例: xlen mytopic

    18.2 基本操作命令原理分析

    添消息、删除消息、范围查找、遍历消息、获取队列信息、长度统计、裁剪消息;

    18.2.1 添加消息

    xaddCommand函数。

    void xaddCommand(client *c) {

    streamID id;

    int id_given = 0; /* Was an ID different than "*" specified? */

    long long maxlen = -1; /* If left to -1 no trimming is performed. */

    int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so

    the maxium length is not applied verbatim. */

    int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */

     

    /* 1)解析参数,主要对MAXLEN/ID/[field value]对的解析,参数是否合法,并赋值给不同的变量 */

    int i = 2;

    for (; i < c->argc; i++) {

    int moreargs = (c->argc-1) - i; /*添加的[field value]对参数长度 */

    char *opt = c->argv[i]->ptr;

    if (opt[0] == '*' && opt[1] == '') { //遇到符号"*" 则跳出循环

    break;

    } else if (!strcasecmp(opt,"maxlen") && moreargs) {

    // 解析maxlen参数

    if (moreargs >= 2 && next[0] == '~' && next[1] == '') {

    approx_maxlen = 1; /*maxlen后的"~"approx_maxlen = 1标识出来*/

    i++;

    //之后读取maxlen的值,并转换成long long类型

    if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)

    != C_OK) return;

     

    } else {

    /* 指定ID时读取id值,并用id_given = 1; 标识*/ */

    if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return;

    id_given = 1;

    break;

    }

    }

    int field_pos = i+1;

     

    /* 校验[field value]对的数据是否合法 */

    if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {

    addReplyError(c,"wrong number of arguments for XADD");

    return;

    }

     

    if (id_given && id.ms == 0 && id.seq == 0) {

    addReplyError(c,"The ID specified in XADD must be greater than 0-0");

    return;

    }

     

    2)校验key对应的值是否为Stream类型:如果存在且类型为Stream,则获取对应的值。如果值存在,但不为Stream类型,则报错。如果不存对应键值对,则调用createStream-Object函数初始化一个空的Stream类型对象,写入db的字典中

    robj *o;

    stream *s;

    if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;

    s = o->ptr;

     

    /* Return ASAP if the stream has reached the last possible ID */

    if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {

    addReplyError(c,"The stream has exhausted the last possible ID, "

    "unable to add more items");

    return;

    }

     

    3)调用streamAppendItem,往Stream中写入消息ID及内容数据。其中消息ID会当作key存储在Rax树中,每个ID并非独占一个节点,插入时会找到Rax树的最大节点,判断该节点中存储数据的data字段是否达极限

    if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,

    &id, id_given ? &id : NULL)

    == C_ERR)

    {

    addReplyError(c,"The ID specified in XADD is equal or smaller than the "

    "target stream top item");

    return;

    }

     

    4)消息添加完后,则把新插入的消息ID返回给客户端

    addReplyStreamID(c,&id);

     

    signalModifiedKey(c,c->db,c->argv[1]);

     

    5)如果传入了maxlen参数,则会调用streamTrimByLength函数剪切队列中数据,实现见18.2.6

    notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);

    server.dirty++;

     

    if (maxlen >= 0) {

    /* Notify xtrim event if needed. */

    if (streamTrimByLength(s,maxlen,approx_maxlen)) {

    notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);

    }

    if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);

    }

     

    /* Let's rewrite the ID argument with the one actually generated for

    * AOF/replication propagation. */

    robj *idarg = createObjectFromStreamID(&id);

    rewriteClientCommandArgument(c,i,idarg);

    decrRefCount(idarg);

     

    /* We need to signal to blocked clients that there is new data on this

    * stream. */

    if (server.blocked_clients_by_type[BLOCKED_STREAM])

    signalKeyAsReady(c->db, c->argv[1]);

    }

     

    //上面几个步骤的内部函数

    robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {

    // 检查key ,查找db中是否已经存在该键值对

    robj *o = lookupKeyWrite(c->db,key);

    if (o == NULL) { // 不存在则初始化一个空的stream类型的对象

    o = createStreamObject();

    dbAdd(c->db,key,o); // 写入db

    } else {

    if (o->type != OBJ_STREAM) { //db中已存在,类型不对则报错

    addReply(c,shared.wrongtypeerr);

    return NULL;

    }

    }

    return o; //返回key对应Stream类型的值

    }

     

     

    int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {

    ...

    if (server.stream_node_max_bytes &&

    lp_bytes >= server.stream_node_max_bytes)

    {

    lp = NULL;

    } else if (server.stream_node_max_entries) {

    int64_t count = lpGetInteger(lpFirst(lp));

    if (count >= server.stream_node_max_entries) lp = NULL;

    }

    }

    其默认配置:

    stream-node-max-bytes 4096

    stream-node-max-entries 100

     

    如节点中data为空或达到存储上限,则重新创建一个新节点,把对应的消息ID及内容(field-value对)插入。

    data字段存储数据用的是listpack表,会把不同的消息分成不同的entryentry中存储偏移量+消息内容(field value对)。

    entry节点消息存储也分为两种:

    1 是消息内容的fieldvalue两个值都存储,

    2 是只存储field-value对中的value值。

    区分用哪种方式存储的办法是和该队列中第1条消息做对比,如果结构一致,则采用第2种方式,更省内存,结构不一致则用第一种方式存储消息内容。所以在创建消息队列首次添加的数据时,一定要采用更通用的结构,避免浪费内存

     

    18.2.2 删除消息

    void xdelCommand(client *c) {

    robj *o;

    //1)根据key读取对应值,并判断类型是否为OBJ_STREAM,如果不是则报错

    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL

    || checkType(c,o,OBJ_STREAM)) return;

    stream *s = o->ptr;

     

    /* 2)参数检验,循环校验参数,判断ID值格式是否正确,一个格式不正确则报错*/*/

    streamID id;

    for (int j = 2; j < c->argc; j++) { //检验id格式是否正确

    if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;

    }

     

    /* 3)删除消息,根据消息ID先从Rax树中查找到其内容所属的节点,从节点遍历找到该消息对应的地址,把头部的flag标识为删除态,直到该listpack删除到最后一个消息时才真正释放整块内存,并从Rax树中摘除该节点*/

    int deleted = 0;

    for (int j = 2; j < c->argc; j++) {

    streamParseStrictIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */

    deleted += streamDeleteItem(s,&id); //内部调用 streamIteratorRemoveEntry

    }

     

    /* 4)返回消息,调用addReplyLongLong函数,把删除的消息数量写入输出缓冲 */

    ...

    addReplyLongLong(c,deleted);

    }

    18.2.3 范围查找

    xrange与xrevrange两个命令的分别调用xrangeCommand与xrevrangeCommand函数,底层统一再调用的是xrangeGenericCommand函数。会通过参数rev区分,对应的值分别为0和1;

    void xrangeGenericCommand(client *c, int rev) {

    robj *o;

    stream *s;

    streamID startid, endid;

    long long count = -1;

    robj *startarg = rev ? c->argv[3] : c->argv[2];

    robj *endarg = rev ? c->argv[2] : c->argv[3];

    1)解析参数与参数校验

    //判断startarg格式是否正确

    if (streamParseIDOrReply(c,startarg,&startid,0) == C_ERR) return;

    // 判断endarg格式是否正确

    if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX) == C_ERR) return;

     

    /* 传入了COUNT,则校验之后的参数是否正确. */

    if (c->argc > 4) {

    for (int j = 4; j < c->argc; j++) {

    int additional = c->argc-j-1;

    if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) {

    //读取count的值,并转换为longlong类型

    if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL)

    != C_OK) return;

    if (count < 0) count = 0;

    j++; /* Consume additional arg. */

    } else {

    addReply(c,shared.syntaxerr);

    return;

    }

    }

    }

     

    /* 2)校验key对应的值是否为Stream类型:如果存在且类型为Stream,则获取对应的值。

    如果值存在但不为stream类型,则报错。 */

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL ||

    checkType(c,o,OBJ_STREAM)) return;

     

    s = o->ptr;

     

    if (count == 0) {

    addReplyNullArray(c);

    } else {

    if (count == -1) count = 0;

    // 3)调用streamReplyWithRange函数进行范围匹配查找

    streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);

    }

    }

     

    正序范围查找主要分如下两步。

    反序范围查找:和正序查找类似,根据end指定的消息ID找到位置后,遍历顺序相反即可。

    size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {

    ...

    ①根据start参数中指定的ID,从Rax树中查找到最后一个比该ID小的节点并往后遍历,直到找到该ID为止

    if (!(flags & STREAM_RWR_RAWENTRIES))

    arraylen_ptr = addReplyDeferredLen(c);

    streamIteratorStart(&si,s,start,end,rev);

    while(streamIteratorGetID(&si,&id,&numfields)) {

    ②从这个位置往后遍历,每读取一条消息则往输出缓冲区写入一条,当本节点遍历完则继续遍历Rax树的下一个节点,直到所有节点遍历完或者遇到一个比end参数所指定的消息ID大的值则结束。

    /* Update the group last_id if needed. */

    if (group && streamCompareID(&id,&group->last_id) > 0) {

    group->last_id = id;

    if (noack) propagate_last_id = 1;

    }

     

    /* Emit a two elements array for each item. The first is

    * the ID, the second is an array of field-value pairs. */

    addReplyArrayLen(c,2);

    addReplyStreamID(c,&id);

    addReplyArrayLen(c,numfields*2);

     

    /* Emit the field-value pairs. */

    while(numfields--) {

    unsigned char *key, *value;

    int64_t key_len, value_len;

    streamIteratorGetField(&si,&key,&value,&key_len,&value_len);

    addReplyBulkCBuffer(c,key,key_len);

    addReplyBulkCBuffer(c,value,value_len);

    }

     

    if (group && !noack) {

    unsigned char buf[sizeof(streamID)];

    streamEncodeID(buf,&id);

     

    streamNACK *nack = streamCreateNACK(consumer);

    int group_inserted =

    raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);

    int consumer_inserted =

    raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);

     

    if (group_inserted == 0) {

    streamFreeNACK(nack);

    nack = raxFind(group->pel,buf,sizeof(buf));

    serverAssert(nack != raxNotFound);

    raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);

    /* Update the consumer and NACK metadata. */

    nack->consumer = consumer;

    nack->delivery_time = mstime();

    nack->delivery_count = 1;

    /* Add the entry in the new consumer local PEL. */

    raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);

    } else if (group_inserted == 1 && consumer_inserted == 0) {

    serverPanic("NACK half-created. Should not be possible.");

    }

     

    /* Propagate as XCLAIM. */

    if (spi) {

    robj *idarg = createObjectFromStreamID(&id);

    streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);

    decrRefCount(idarg);

    }

    }

     

    arraylen++;

    if (count && count == arraylen) break; //遍历结束

    }

     

    if (spi && propagate_last_id)

    streamPropagateGroupID(c,spi->keyname,group,spi->groupname);

     

    streamIteratorStop(&si);

    if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);

    return arraylen;

    }

    18.2.4 获取队列信息

    读取队列、消费组、消费者信息的xinfo命令,对应的函数是xinfoCommand函数,xinfoCommand函数查询基本信息主要分为如下几步;

    void xinfoCommand(client *c) {

    ...

    opt = c->argv[1]->ptr;

    key = c->argv[2];

     

    /* 1)根据key读取对应值,并判断类型是否为OBJ_STREAM,如果不是则报错. */

    robj *o = lookupKeyWriteOrReply(c,key,shared.nokeyerr);

    if (o == NULL || checkType(c,o,OBJ_STREAM)) return;

    s = o->ptr;

     

    /*2)根据传入的第2个参数做判断,区分去获取哪些信息,

    如果为"CONSUMERS",则用于查看某个消费组下的消费者信息 */

    if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {

    /* XINFO CONSUMERS <key> <group>. */

    streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);

    if (cg == NULL) {

    addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "

    "for key name '%s'",

    (char*)c->argv[3]->ptr, (char*)key->ptr);

    return;

    }

     

    addReplyArrayLen(c,raxSize(cg->consumers));

    raxIterator ri;

    raxStart(&ri,cg->consumers);

    raxSeek(&ri,"^",NULL,0);

    mstime_t now = mstime();

    while(raxNext(&ri)) {

    streamConsumer *consumer = ri.data;

    mstime_t idle = now - consumer->seen_time;

    if (idle < 0) idle = 0;

     

    addReplyMapLen(c,3);

    addReplyBulkCString(c,"name");

    addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));

    addReplyBulkCString(c,"pending");

    addReplyLongLong(c,raxSize(consumer->pel));

    addReplyBulkCString(c,"idle");

    addReplyLongLong(c,idle);

    }

    raxStop(&ri);

    //·GROUPS:用于查看某个Stream队列下的消费组信息;

    } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {

    if (s->cgroups == NULL) {

    addReplyArrayLen(c,0);

    return;

    }

     

    addReplyArrayLen(c,raxSize(s->cgroups));

    raxIterator ri;

    raxStart(&ri,s->cgroups);

    raxSeek(&ri,"^",NULL,0);

    while(raxNext(&ri)) {

    streamCG *cg = ri.data;

    addReplyMapLen(c,4);

    addReplyBulkCString(c,"name");

    addReplyBulkCBuffer(c,ri.key,ri.key_len);

    addReplyBulkCString(c,"consumers");

    addReplyLongLong(c,raxSize(cg->consumers));

    addReplyBulkCString(c,"pending");

    addReplyLongLong(c,raxSize(cg->pel));

    addReplyBulkCString(c,"last-delivered-id");

    addReplyStreamID(c,&cg->last_id);

    }

    raxStop(&ri);

    // STREAM:用于查看某个Stream队列的整体组信息

    } else if (!strcasecmp(opt,"STREAM")) {

    /* XINFO STREAM <key> [FULL [COUNT <count>]]. */

    xinfoReplyWithStreamInfo(c,s);

    } else {

    addReplySubcommandSyntaxError(c);

    }

    }

    18.2.5 长度统计

    读取队列长度的xlen命令,底层调用xlenCommand函数;

    void xlenCommand(client *c) {

    robj *o;

    // 1)根据key读取对应值,并判断类型是否为OBJ_STREAM,如果不是则报错

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL

    || checkType(c,o,OBJ_STREAM)) return;

    stream *s = o->ptr;

    //2)返回值:根据读取的Stream,直接调用addReplyLongLong输出其长度

    addReplyLongLong(c,s->length);

    }

    18.2.6 剪切消息

    底层调用对应的函数是xtrimCommand函数;

    void xtrimCommand(client *c) {

    robj *o;

    /*1)根据key读取对应值,并判断类型是否为OBJ_STREAM,如果不是则报错. */

    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL

    || checkType(c,o,OBJ_STREAM)) return;

    stream *s = o->ptr;

     

    /* 2)解析与校验参数。maxlen必传,如果模糊缩减则把参数approx_maxlen标识为1*/

    int trim_strategy = TRIM_STRATEGY_NONE;

    long long maxlen = -1;

    int approx_maxlen = 0;

    int maxlen_arg_idx = 0;

    /* Parse options. */

    int i = 2; /* Start of options. */

    for (; i < c->argc; i++) {

    int moreargs = (c->argc-1) - i; /* Number of additional arguments. */

    char *opt = c->argv[i]->ptr;

    if (!strcasecmp(opt,"maxlen") && moreargs) {

    approx_maxlen = 0;

    trim_strategy = TRIM_STRATEGY_MAXLEN;

    char *next = c->argv[i+1]->ptr;

    /* Check for the form MAXLEN ~ <count>. */

    if (moreargs >= 2 && next[0] == '~' && next[1] == '') {

    approx_maxlen = 1;

    i++;

    } else if (moreargs >= 2 && next[0] == '=' && next[1] == '') {

    i++;

    }

    if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)

    != C_OK) return;

     

    if (maxlen < 0) {

    addReplyError(c,"The MAXLEN argument must be >= 0.");

    return;

    }

    i++;

    maxlen_arg_idx = i;

    } else {

    addReply(c,shared.syntaxerr);

    return;

    }

    }

     

    /* 3)调用streamTrimByLength函数缩减队列*/

    int64_t deleted = 0;

    if (trim_strategy == TRIM_STRATEGY_MAXLEN) {

    deleted = streamTrimByLength(s,maxlen,approx_maxlen);

    } else {

    addReplyError(c,"XTRIM called without an option to trim the stream");

    return;

    }

     

    ...

    addReplyLongLong(c,deleted);

    }

     

    int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {

    if (s->length <= maxlen) return 0; //只能缩减长度

     

    raxIterator ri;

    raxStart(&ri,s->rax); //初始化rax迭代器

    raxSeek(&ri,"^",NULL,0); //找到rax首个节点

     

    int64_t deleted = 0;

    while(s->length > maxlen && raxNext(&ri)) {

    unsigned char *lp = ri.data, *p = lpFirst(lp);

    int64_t entries = lpGetInteger(p);

    //从首个节点开始,循环删除Rax树中每个节点,直到删除数量足够,会把队列中老数据按节点逐个删除

    /*如果可以删除整个节点*/

    if (s->length - entries >= maxlen) {

    lpFree(lp);

    raxRemove(s->rax,ri.key,ri.key_len,NULL);

    raxSeek(&ri,">=",ri.key,ri.key_len);

    s->length -= entries;

    deleted += entries;

    continue;

    }

    //当执行的是模糊删除时,默认会多保留一些数据,也就是说,当传入的第3个参数为符号"~"时,会把最后一个需要删除的消息ID所在的节点数据保留,而精确删除则会把最后一个需要删除的消息ID之前的数据都删掉

     

    if (approx) break;

     

    /*我们必须将listpack中的单个条目标记为已删除。我们首先更新条目/删除的计数器*/

    int64_t to_delete = s->length - maxlen;

    serverAssert(to_delete < entries);

    lp = lpReplaceInteger(lp,&p,entries-to_delete);

    p = lpNext(lp,p); /* Seek deleted field. */

    int64_t marked_deleted = lpGetInteger(p);

    lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);

    p = lpNext(lp,p); /* Seek num-of-fields in the master entry. */

     

    /* Skip all the master fields. */

    int64_t master_fields_count = lpGetInteger(p);

    p = lpNext(lp,p); /* Seek the first field. */

    for (int64_t j = 0; j < master_fields_count; j++)

    p = lpNext(lp,p); /* Skip all master fields. */

    p = lpNext(lp,p); /* Skip the zero master entry terminator. */

     

    /* 'p' is now pointing to the first entry inside the listpack.

    * We have to run entry after entry, marking entries as deleted

    * if they are already not deleted. */

    while(p) {

    int flags = lpGetInteger(p);

    int to_skip;

     

    /* 标记entry删除 */

    if (!(flags & STREAM_ITEM_FLAG_DELETED)) {

    flags |= STREAM_ITEM_FLAG_DELETED;

    lp = lpReplaceInteger(lp,&p,flags);

    deleted++;

    s->length--;

    if (s->length <= maxlen) break; /* Enough entries deleted. */

    }

    p = lpNext(lp,p); /* Skip ID ms delta. */

    p = lpNext(lp,p); /* Skip ID seq delta. */

    p = lpNext(lp,p); /* Seek num-fields or values (if compressed). */

    if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {

    to_skip = master_fields_count;

    } else {

    to_skip = lpGetInteger(p);

    to_skip = 1+(to_skip*2);

    }

     

    while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */

    p = lpNext(lp,p); /* Skip the final lp-count field. */

    }

     

    /* 垃圾回收*/

    entries -= to_delete;

    marked_deleted += to_delete;

    if (entries + marked_deleted > 10 && marked_deleted > entries/2) {

    /* TODO: perform a garbage collection. */

    }

    /* Update the listpack with the new pointer. */

    raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);

    break; /* 到这里表示删除数量足够. */

    }

    raxStop(&ri);

    return deleted;

    }

    18.3 分组命令原理分析

    三个数据结构

    streamCG 用于存储消费组相关信息、

    streamConsumer 用于存储消费者相关信息、

    streamNACK 用于存储未确认消息;

    18.3.1 分组管理

    消费组管理的xgroup命令,调用xgroupCommand函数;

    void xgroupCommand(client *c) {

    stream *s = NULL;

    sds grpname = NULL;

    streamCG *cg = NULL;

    char *opt = c->argv[1]->ptr; /* Subcommand name. */

    int mkstream = 0;

    robj *o;

     

    /* 1)校验,xgroup命令第2个参数共有5个选项,当选项不为HELP时,则需根据key读取对应的value值,并判断其类型是否为OBJ_STREAM,如果不是则报错。

    当选项为CREATE且带了MKSTREAM参数时,则不判断value值是否存在与是否为Stream类型。

    当选项为SETID DELCONSUMER时,则需要判断指定的组是否存在,不存在组信息则报错 */

    if (c->argc == 6 && !strcasecmp(opt,"CREATE")) {

    if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) {

    addReplySubcommandSyntaxError(c);

    return;

    }

    mkstream = 1;

    grpname = c->argv[3]->ptr;

    }

     

    /* 当选项不为HELP时,根据key读取对应的value值,并判断是否为OBJ_STREAM,不是则报错。 */

    if (c->argc >= 4) {

    o = lookupKeyWrite(c->db,c->argv[2]);

    if (o) {

    if (checkType(c,o,OBJ_STREAM)) return;

    s = o->ptr;

    }

    grpname = c->argv[3]->ptr;

    }

     

    /* Check for missing key/group. */

    if (c->argc >= 4 && !mkstream) {

    /* key必须存在,否则error */

    if (s == NULL) {

    error...

    }

     

    /* group不存在报错 */

    if ((cg = streamLookupCG(s,grpname)) == NULL &&

    (!strcasecmp(opt,"SETID") ||

    !strcasecmp(opt,"DELCONSUMER")))

    {

    error....

    return;

    }

    }

     

    /* 2)根据不同参数做不同的处理。*/

    if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) {

         //①create 参数,创建一个新消费组

    streamID id; //消息ID如为特殊符号"$",是则把该队列中最大的个id赋值给它

    if (!strcmp(c->argv[4]->ptr,"$")) {

    if (s) {

    id = s->last_id;

    }

    } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK) {

            //不为特殊符号,则检验传入id的合法性,不合法则报错。

    return;

    }

     

    //指定了MKSTREAM参数,且db中该键值对不存在,创建一个类型为stream新对象存入db */

    if (s == NULL) {

    serverAssert(mkstream);

    o = createStreamObject();

    dbAdd(c->db,c->argv[2],o);

    s = o->ptr;

    signalModifiedKey(c,c->db,c->argv[2]);

    }

    //创建一个新的消费组,但此处逻辑有漏洞,使用s前需要校验其所属对象是否为stream

    如不是则可能把内存写坏,导致redis-server直接挂掉

    streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);

    ...

    } else if (!strcasecmp(opt,"SETID") && c->argc == 5) {

    //②setid 参数,修改某个消费组消费的last_id:

    // 根据消费组名从s.cgroups这个Rax树中查找出streamCG,

    修改字段last_id的值为参数指定的ID值即可。

    streamID id;

    if (!strcmp(c->argv[4]->ptr,"$")) {

    id = s->last_id;

    } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) {

    return;

    }

    cg->last_id = id;

    ...

    } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) {

    //③destroy 参数,删除指定消费组:

    //根据消费组姓名,从s.cgroups这个Rax树中删除并释放内存。

    if (cg) {

    raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL);

    streamFreeCG(cg);

    ...

    }

    } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {

    //④delconsumer 参数,删除指定消费组中某个消费者

     

    long long pending = streamDelConsumer(cg,c->argv[4]->ptr);

    addReplyLongLong(c,pending);

    server.dirty++;

    notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer",

    c->argv[2],c->db->id);

    } else if (c->argc == 2 && !strcasecmp(opt,"HELP")) {

    //⑤help 参数

    addReplyHelp(c, help);

    } else {

    addReplySubcommandSyntaxError(c);

    }

    }

     

     

    内部函数1:streamCreateCG

    最终创建消费组调用的是streamCreateCG函数,该函数的主要实现步骤如下

    streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {

    if (s->cgroups == NULL) s->cgroups = raxNew();

    if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)

    return NULL;

    //1 ,初始化streamCG结构体,设置streamCG.last_id为参数指定的ID值;

    streamCG *cg = zmalloc(sizeof(*cg));

    cg->pel = raxNew();

    cg->consumers = raxNew();

    cg->last_id = *id;

    // 2 ,往s.cgroups这个Rax树中写入新初始化的streamCG结构体,

    其中key为分组名称,关联的值为streamCG结构体。

    raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);

    return cg;

    }

     

    内部函数2:streamDelConsumer

    uint64_t streamDelConsumer(streamCG *cg, sds name) {

    //根据消费组姓名从s.cgroups这个Rax树中查找出streamCG;

    streamConsumer *consumer =

    streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH);

    if (consumer == NULL) return 0;

     

    uint64_t retval = raxSize(consumer->pel);

     

    /*根据消费者名从streamCG.Consumers这个Rax树中查找出消费者Consumers */

    raxIterator ri;

    raxStart(&ri,consumer->pel);

    raxSeek(&ri,"^",NULL,0);

    while(raxNext(&ri)) {

    streamNACK *nack = ri.data;

    // 迭代遍历Consumers.pel这个Rax树,从中删除并释放所有待确认的消息ID;

    raxRemove(cg->pel,ri.key,ri.key_len,NULL);

    streamFreeNACK(nack);

    }

    raxStop(&ri);

     

    //根据消费者名把该消费者从streamCG.Consumers这个Rax树中删除。

    raxRemove(cg->consumers,(unsigned char*)name,sdslen(name),NULL);

    streamFreeConsumer(consumer);

    return retval;

    }

    18.3.2 消费消息

    有xreadgroup及xread两个命令,底层调用都是xreadCommand函数,实现主要分为如下4步。

    define XREAD_BLOCKED_DEFAULT_COUNT 1000

    void xreadCommand(client *c) {

    1)判断执行的是哪条命令:如果传入参数中第一个字符串总长度为10,则代表的是xreadgrou命令。

    int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */

    robj *groupname = NULL;

    robj *consumername = NULL;

    /* 解析参数*/

    for (int i = 1; i < c->argc; i++) {

    ...

    }

    ...

    2)遍历读取指定的key关联的值信息,并进行类型判断与参数校验,不为OBJ_STREAM则报错。

    for (int i = streams_arg + streams_count; i < c->argc; i++) {

    //遍历参数中每个key

    int id_idx = i - streams_arg - streams_count;

    robj *key = c->argv[i-streams_count];

    robj *o = lookupKeyRead(c->db,key); //读取key关联的value

    if (o && checkType(c,o,OBJ_STREAM)) goto cleanup; //value类型判断

    streamCG *group = NULL;

     

    if (groupname) { // xreadgrou会指定消费组

    if (o == NULL ||

    (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)

    {

    //消费组不存在报错,所以指定消费多个key时,需要每个key单独建立相同消费组名

    error...

    goto cleanup;

    }

    groups[id_idx] = group;

    }

     

    if (strcmp(c->argv[i]->ptr,"$") == 0) { //特殊符号"$"只能由xread命令使用

    if (xreadgroup) {

    error...

    goto cleanup;

    }

    ...

    continue;

    } else if (strcmp(c->argv[i]->ptr,">") == 0) {

    if (!xreadgroup) { //特殊符号">"只能由xreadgroup命令使用

    error...

    goto cleanup;

    }

    ...

    continue;

    }

    //校验参数中指定ID格式是否合法

    if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)

    goto cleanup;

    }

     

    3)遍历指定的多个key,调用streamReplyWithRange函数。按照参数中指定的ID,从对应的Stream队列中读取count条数据。

    /* 尝试同步服务于客户. */

    size_t arraylen = 0;

    void *arraylen_ptr = NULL;

    for (int i = 0; i < streams_count; i++) {

    robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);

    if (o == NULL) continue;

    stream *s = o->ptr;

    streamID *gt = ids+i;

    int serve_synchronously = 0;

    int serve_history = 0;

     

    /* 检查是否存在可以同步为client服务的条件 */

    if (groups) {

    if (gt->ms != UINT64_MAX ||

    gt->seq != UINT64_MAX)

    {

    serve_synchronously = 1;

    serve_history = 1;

    } else if (s->length) {

    streamID maxid, *last = &groups[i]->last_id;

    streamLastValidID(s, &maxid);

    if (streamCompareID(&maxid, last) > 0) {

    serve_synchronously = 1;

    *gt = *last;

    }

    }

    } else if (s->length) {

    /* 对于没有组的消费者,从流中提供至少一item,去同步服务。*/

    streamID maxid;

    streamLastValidID(s, &maxid);

    if (streamCompareID(&maxid, gt) > 0) {

    serve_synchronously = 1;

    }

    }

     

    if (serve_synchronously) {

    arraylen++;

    if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c);

    /* start (包含在内) ID开始,调用函数处理 */

    streamID start = *gt;

    streamIncrID(&start);

     

    if (c->resp == 2) addReplyArrayLen(c,2);

    addReplyBulk(c,c->argv[streams_arg+i]);

    streamConsumer *consumer = NULL;

    if (groups) consumer = streamLookupConsumer(groups[i],

    consumername->ptr,

    SLC_NONE);

    streamPropInfo spi = {c->argv[i+streams_arg],groupname};

    int flags = 0;

    if (noack) flags |= STREAM_RWR_NOACK;

    if (serve_history) flags |= STREAM_RWR_HISTORY;

    //进行范围匹配查找

    streamReplyWithRange(c,s,&start,NULL,count,0,

    groups ? groups[i] : NULL,

    consumer, flags, &spi);

    if (groups) server.dirty++;

    }

    }

     

    /* We replied synchronously! Set the top array len and return to caller. */

    if (arraylen) {

    if (c->resp == 2)

    setDeferredArrayLen(c,arraylen_ptr,arraylen);

    else

    setDeferredMapLen(c,arraylen_ptr,arraylen);

    goto cleanup;

    }

    /* 4)如果添加了BLOCK关键字,则调用blockForKeys函数,把当前链接标识成阻塞状态,

    并且记录解除阻塞时间节点,等着下一次时间事件触发看是否超时,或当新的数据写入时解除阻塞。

    新数据写入时会触发handleClientsBlockedOnKeys函数,会判断此次新增的key是否为阻塞等待的key,如果是,则继续比较ID是否有更新,如有更新,则读取最新的数据回复给该客户端,并解除阻塞。*/

    if (timeout != -1) {

    /* 如果在MULTI / EXEC中,并且列表为空,将其视为超时(超时为0*/

    if (c->flags & CLIENT_MULTI) {

    addReplyNullArray(c);

    goto cleanup;

    }

    blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,

    timeout, NULL, ids);

    /* 如果没有count 给一个count=1000 ,防止返回大量数据*/

    c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;

     

    if (groupname) {

    incrRefCount(groupname);

    incrRefCount(consumername);

    c->bpop.xread_group = groupname;

    c->bpop.xread_consumer = consumername;

    c->bpop.xread_group_noack = noack;

    } else {

    c->bpop.xread_group = NULL;

    c->bpop.xread_consumer = NULL;

    }

    goto cleanup;

    }

     

    /* 没有BLOCK,也没有可以提供的任何流。回复时带有超时 */

    addReplyNullArray(c);

     

    cleanup: /* Cleanup. */

    preventCommandPropagation(c);

    if (ids != static_ids) zfree(ids);

    zfree(groups);

    }

    18.3.3 响应消息

    确认消息xack,底层调用xackCommand函数,主要分为如下3步:

    void xackCommand(client *c) {

    streamCG *group = NULL;

    1)根据key读取对应值,并判断类型是否为OBJ_STREAM,如果不是则报错;

    robj *o = lookupKeyRead(c->db,c->argv[1]);

    if (o) {

    if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */

    2)根据读取出来的值,从s->cgroups这个Rax树中根据参数中组名查找出指定的分组信息streamCG

    //streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname,

    sdslen(groupname));

    group = streamLookupCG(o->ptr,c->argv[2]->ptr);

    }

     

    /* No key or group? Nothing to ack. */

    if (o == NULL || group == NULL) {

    addReply(c,shared.czero);

    return;

    }

     

    for (int j = 3; j < c->argc; j++) {

    streamID id;

    if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;

    }

     

    int acknowledged = 0;

    for (int j = 3; j < c->argc; j++) {

    streamID id;

    unsigned char buf[sizeof(streamID)];

    if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)

    serverPanic("StreamID invalid after check. Should not be possible.");

    streamEncodeID(buf,&id);

    3)从streamCG.pel这个Rax树中查找参数中指定的消息ID,如存在则删除,否则什么也不做。

    streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));

    if (nack != raxNotFound) {

    raxRemove(group->pel,buf,sizeof(buf),NULL);

    raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);

    streamFreeNACK(nack);

    acknowledged++;

    server.dirty++;

    }

    }

    addReplyLongLong(c,acknowledged);

    }

    18.3.4 获取未响应消息列表

    命令xpending,底层调用xpendingCommand函数,该函数实现主要分为如下3步;

    void xpendingCommand(client *c) {

    int justinfo = c->argc == 3;

    robj *key = c->argv[1];

    robj *groupname = c->argv[2];

    robj *consumername = (c->argc == 7) ? c->argv[6] : NULL;

    streamID startid, endid;

    long long count;

    /* Start and stop, and the consumer, can be omitted(省略). */

    if (c->argc != 3 && c->argc != 6 && c->argc != 7) {

    addReply(c,shared.syntaxerr);

    return;

    }

     

    /* 1)参数检验,校验参数个数,校验ID值格式是否正确。*/

    if (c->argc >= 6) {

    if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR)

    return;

    if (count < 0) count = 0;

    if (streamParseIDOrReply(c,c->argv[3],&startid,0) == C_ERR)

    return;

    if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX) == C_ERR)

    return;

    }

     

    /* 2)根据key读取对应值,并判断类型是否为OBJ_STREAM,如果不是则报错,

    类型正确则根据指定的消费组名查找出group. */

    robj *o = lookupKeyRead(c->db,c->argv[1]);

    streamCG *group;

    if (o && checkType(c,o,OBJ_STREAM)) return;

     

    /* XPENDING <key> <group> variant. */

    if (justinfo) { //①参数个数为3,读取指定消费组的所有未响应消息列表

    addReplyArrayLen(c,4);

    /* Total number of messages in the PEL. */

    addReplyLongLong(c,raxSize(group->pel));

    /* First and last IDs. */

    if (raxSize(group->pel) == 0) {

    addReplyNull(c); /* Start. */

    addReplyNull(c); /* End. */

    addReplyNullArray(c); /* Clients. */

    } else {

    /* Start. */

    raxIterator ri;

    raxStart(&ri,group->pel);

    raxSeek(&ri,"^",NULL,0);

    raxNext(&ri);

    streamDecodeID(ri.key,&startid);

    addReplyStreamID(c,&startid);

     

    /* End. */

    raxSeek(&ri,"$",NULL,0);

    raxNext(&ri);

    streamDecodeID(ri.key,&endid);

    addReplyStreamID(c,&endid);

    raxStop(&ri);

     

    /* Consumers with pending messages. */

    raxStart(&ri,group->consumers);

    raxSeek(&ri,"^",NULL,0);

    void *arraylen_ptr = addReplyDeferredLen(c);

    size_t arraylen = 0;

    while(raxNext(&ri)) { // 从头到尾迭代group.pel这个Rax

    streamConsumer *consumer = ri.data;

    if (raxSize(consumer->pel) == 0) continue;

    addReplyArrayLen(c,2);

    addReplyBulkCBuffer(c,ri.key,ri.key_len);

    addReplyBulkLongLong(c,raxSize(consumer->pel));

    arraylen++;

    }

    setDeferredArrayLen(c,arraylen_ptr,arraylen);

    raxStop(&ri);

    }

    }

    /* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */

    else {

    streamConsumer *consumer = NULL;

    if (consumername) { //找出消费者

    consumer = streamLookupConsumer(group,

    consumername->ptr,

    SLC_NOCREAT|SLC_NOREFRESH);

     

    /* 如果提到了消费者名称但它不存在,返回一个空数组*/

    if (consumer == NULL) {

    addReplyArrayLen(c,0);

    return;

    }

    }

    //消费者存在取消费者的未确认消息列表,否则取grouppel

    rax *pel = consumer ? consumer->pel : group->pel;

    unsigned char startkey[sizeof(streamID)];

    unsigned char endkey[sizeof(streamID)];

    raxIterator ri;

    mstime_t now = mstime();

    //找出迭代的开始ID与结束ID

    streamEncodeID(startkey,&startid);

    streamEncodeID(endkey,&endid);

    raxStart(&ri,pel);

    raxSeek(&ri,">=",startkey,sizeof(startkey));

    void *arraylen_ptr = addReplyDeferredLen(c);

    size_t arraylen = 0;

    //从开始ID迭代,count0或者没有下一个元素

    while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {

    streamNACK *nack = ri.data;

     

    arraylen++;

    count--; //计数递减

    addReplyArrayLen(c,4);

     

    /* Entry ID. */

    streamID id;

    streamDecodeID(ri.key,&id);

    addReplyStreamID(c,&id);

     

    /* Consumer name. */

    addReplyBulkCBuffer(c,nack->consumer->name,

    sdslen(nack->consumer->name));

     

    /* Milliseconds elapsed since last delivery. */

    mstime_t elapsed = now - nack->delivery_time;

    if (elapsed < 0) elapsed = 0;

    addReplyLongLong(c,elapsed);

     

    /* Number of deliveries. */

    addReplyLongLong(c,nack->delivery_count);

    }

    raxStop(&ri);

    setDeferredArrayLen(c,arraylen_ptr,arraylen);

    }

    }

    18.3.5 修改指定未响应消息归属

    命令是xclaim,对应的函数是xclaimCommand函数;

    void xclaimCommand(client *c) {

    streamCG *group = NULL;

    1)根据key读取对应值,并判断类型是否为OBJ_STREAM,如果不是则报错

    robj *o = lookupKeyRead(c->db,c->argv[1]);

    long long minidle; /* Minimum idle time argument. */

    long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */

    mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */

    int force = 0;

    int justid = 0;

     

    if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */

    2)根据消费组名称取出消费组信息:

    group = streamLookupCG(o->ptr,c->argv[2]->ptr);

     

    3)参数解析与参数校验

    if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,

    "Invalid min-idle-time argument for XCLAIM")

    != C_OK) return;

    if (minidle < 0) minidle = 0;

     

    int j;

    for (j = 5; j < c->argc; j++) {

    streamID id;

    if (streamParseStrictIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break;

    }

    int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */

    mstime_t now = mstime();

    streamID last_id = {0,0};

    int propagate_last_id = 0;

     

    for (; j < c->argc; j++) {

    int moreargs = (c->argc-1) - j; /* Number of additional arguments. */

    char *opt = c->argv[j]->ptr;

    if (!strcasecmp(opt,"FORCE")) {

    force = 1;

    } else if (!strcasecmp(opt,"JUSTID")) {

    justid = 1;

    } else if (!strcasecmp(opt,"IDLE") && moreargs) {

    j++;

    if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,

    "Invalid IDLE option argument for XCLAIM")

    != C_OK) return;

    deliverytime = now - deliverytime;

    } else if (!strcasecmp(opt,"TIME") && moreargs) {

    j++;

    if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,

    "Invalid TIME option argument for XCLAIM")

    != C_OK) return;

    } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) {

    j++;

    if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,

    "Invalid RETRYCOUNT option argument for XCLAIM")

    != C_OK) return;

    } else if (!strcasecmp(opt,"LASTID") && moreargs) {

    j++;

    if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) return;

    } else {

    addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);

    return;

    }

    }

     

    if (streamCompareID(&last_id,&group->last_id) > 0) {

    group->last_id = last_id;

    propagate_last_id = 1;

    }

     

    if (deliverytime != -1) {

    if (deliverytime < 0 || deliverytime > now) deliverytime = now;

    } else {

    deliverytime = now;

    }

     

    /* Do the actual claiming. */

    streamConsumer *consumer = NULL;

    void *arraylenptr = addReplyDeferredLen(c);

    size_t arraylen = 0;

    5)遍历指定的ID,依次修改其所属消费者

    for (int j = 5; j <= last_id_arg; j++) {

    streamID id;

    unsigned char buf[sizeof(streamID)];

    //校验ID是否合法

    if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)

    serverPanic("StreamID invalid after check. Should not be possible.");

    streamEncodeID(buf,&id); //赋值给buf

     

    /* group->pel中查找ID. */

    streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));

     

    /* 指定的ID不在待确认列表,若设置了force关键字,则继续判断是否在stream队列中存在,

         存在则插入到未确认消息列表中 */

    if (force && nack == raxNotFound) {

    streamIterator myiterator;

    streamIteratorStart(&myiterator,o->ptr,&id,&id,0);

    int64_t numfields;

    int found = 0;

    streamID item_id;

    if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1;

    streamIteratorStop(&myiterator);

     

    /* Item must exist for us to create a NACK for it. */

    if (!found) continue;

    /* Create the NACK. */

    nack = streamCreateNACK(NULL);

    raxInsert(group->pel,buf,sizeof(buf),nack,NULL);

    }

     

    if (nack != raxNotFound) {

    /* 已存在待确认消息列表则把该消息从原有所属的消费者pel列表中删除,

    并插入到新指定的消费者pel列表中 */

    if (nack->consumer && minidle) {

    mstime_t this_idle = now - nack->delivery_time;

    if (this_idle < minidle) continue;

    }

    /* Remove the entry from the old consumer.

    * Note that nack->consumer is NULL if we created the

    * NACK above because of the FORCE option. */

    if (nack->consumer)

    raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);

    /* Update the consumer and idle time. */

    if (consumer == NULL)

    consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE);

    nack->consumer = consumer;

    nack->delivery_time = deliverytime;

    /* Set the delivery attempts counter if given, otherwise

    * autoincrement unless JUSTID option provided */

    if (retrycount >= 0) {

    nack->delivery_count = retrycount;

    } else if (!justid) {

    nack->delivery_count++;

    }

    /* Add the entry in the new consumer local PEL. */

    raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);

    /* Send the reply for this entry. */

    if (justid) {

    addReplyStreamID(c,&id);

    } else {

    size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,

    NULL,NULL,STREAM_RWR_RAWENTRIES,NULL);

    if (!emitted) addReplyNull(c);

    }

    arraylen++;

     

    /* Propagate this change. */

    streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);

    propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */

    server.dirty++;

    }

    }

    if (propagate_last_id) {

    streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);

    server.dirty++;

    }

    setDeferredArrayLen(c,arraylenptr,arraylen);

    preventCommandPropagation(c);

    }

    18.4 本章小结

    本章整体性讲解了Stream相关的命令的源码实现;

  • 相关阅读:
    NSNotificationCenter通知
    UITextView 输入字数限制
    UITextView添加占位符 placeholder
    Label显示html文本
    响应者链
    UIKit框架各类简要说明
    [转]setValue和setObject的区别
    谓词(NSPredicate)
    iOS麦克风权限的检测和获取
    SOCKET是什么
  • 原文地址:https://www.cnblogs.com/coloz/p/13812858.html
Copyright © 2011-2022 走看看