zoukankan      html  css  js  c++  java
  • Streams:深入剖析Redis5.0全新数据结构

    Streams:深入剖析Redis5.0全新数据结构
     
    原创: 阿飞的博客
     
    Redis 5.0 全新的数据类型:streams,官方把它定义为:以更抽象的方式建模日志的数据结构。Redis的streams主要是一个append only的数据结构,至少在概念上它是一种在内存中表示的抽象数据类型,只不过它们实现了更强大的操作,以克服日志文件本身的限制。
     
    如果你了解MQ,那么可以把streams当做MQ。如果你还了解kafka,那么甚至可以把streams当做kafka。
     
    另外,这个功能有点类似于redis以前的Pub/Sub,但是也有基本的不同:
    • streams支持多个客户端(消费者)等待数据(Linux环境开多个窗口执行XREAD即可模拟),并且每个客户端得到的是完全相同的数据。
    • Pub/Sub是发送忘记的方式,并且不存储任何数据;而streams模式下,所有消息被无限期追加在streams中,除非用于显示执行删除(XDEL)。
    • streams的Consumer Groups也是Pub/Sub无法实现的控制方式。
    streams数据结构
     
    streams数据结构本身非常简单,但是streams依然是Redis到目前为止最复杂的类型,其原因是实现的一些额外的功能:一系列的阻塞操作允许消费者等待生产者加入到streams的新数据。另外还有一个称为Consumer Groups的概念,这个概念最先由kafka提出,Redis有一个类似实现,和kafka的Consumer Groups的目的是一样的:允许一组客户端协调消费相同的信息流!
     
    redis源码中定义streams结构的源码如下,由源码可知,stream的核心数据结构是radix tree:
     
    typedef struct stream {
     
        rax *rax;               /* The radix tree holding the stream. */
     
        uint64_t length;        /* Number of elements inside this stream. */
     
        streamID last_id;       /* Zero if there are yet no items. */
     
        rax *cgroups;           /* Consumer groups dictionary: name -> streamCG */
     
    } stream;
     
    源码参考:https://github.com/antirez/redis/blob/5.0.0/src/stream.h;
     
    至于redis对radix tree的实现,参考源码:https://github.com/antirez/redis/blob/5.0.0/src/rax.c 和 https://github.com/antirez/redis/blob/5.0.0/src/rax.h 。网上也有很多radix tree的文章,本篇文章就不做过多的介绍了。下面给出一张从官方源码中的部分截图:
    radix tree
     
    streams基础
     
    为了理解streams的目的,以及如何使用它,我们先忽略掉所有高级特性,只把注意力放在数据结构本身,以及那些操作和访问streams的命令。这基本上也是大多数其他Redis数据类型共有的部分,例如Lists,Sets,Sorted Sets等。然而需要注意的是,Lists也有一个更复杂的阻塞式的API,例如BLPOP,BRPOP等。streams这方便的API也没什么不同,只是更复杂,更强大(更牛逼,哈)!
     
    streams命令
     
    废话不多说,先上手玩玩这个全新的数据类型。streams这个数据类型对应有如下13个操作命令,所有命令都以"X"开头:
     
    XADD
     
    用法:XADD key ID field string [field string …]
     
    正如其名,这个命令就是用来添加的,给streams追加(append,前面提到过:streams主要是一个append only的数据结构)一个新的entry(和Java里的Map类似,Redis里的streams中的数据也称为entry)。
     
    key:的含义就是同一类型streams的名称;
     
    ID: streams中entry的唯一标识符,如果执行XADD命令时,传入星号(*),那么,ID会自动生成,且自动生成的ID会在执行XADD后返回,默认生成的ID格式为millisecondsTime+sequenceNumber,即当前毫秒级别的时间戳加上一个自增序号值,例如"1540013735401-0"。并且执行XADD时,不接受少于或等于上一次执行XADD的ID,否则会报错:ERR The ID specified in XADD is equal or smaller than the target stream top item;
     
    field&string:接下来就是若干组field string。可以把它理解为表示属性的json中的key-value。例如,某一streams的key命名为userInfo,且某个用户信息为{"username":"afei", "password":"123456"},那么执行XADD命令如下:
     
    127.0.0.1:6379> xadd userInfo * username afei password 123456
     
    "1540014082060-0"
     
    由于命令中ID字段的值是星号,所以自定生成ID,1540014082060-0就是自动生成的ID。 XADD命令也支持显示指定ID,例如:XADD streamname 0-2 foo bar。
    • 时钟回拨
    需要注意的是,ID的时间戳部分是部署Redis服务器的本地时间,如果发生时钟回拨会怎么样?如果发生始终回拨,生成的ID的时间戳部分就是回拨后的时间,然后加上这个时间的递增序列号。例如当前时间戳1540014082060,然后这时候发生了时钟回拨,且回拨5ms,那么时间戳就是1540014082055。假设以前已经生成了1540014082055-0,1540014082055-1,那么这次由于时钟回拨,生成的ID就是1540014082055-2。所以允许自动生成的ID在发生时钟回拨时少于上次的ID,但是不允许显示指定一个少于上次的ID。
     
    XDEL
     
    用法:XDEL key ID [ID …]
     
    和XADD相反,这是命令用来从streams中删除若干个entry,并且会返回实际删除数,这个删除数可能和参数ID个数不等,因为某些ID表示的消息可能不存在。执行命令如下,第二个参数ID是不存在的,所以XDEL的返回结果是1:
     
    127.0.0.1:6379> XDEL userInfo "1540014379642-0" "1540014379642-1"
     
    (integer) 1
     
    XLEN
     
    用法:XLEN key
     
    很好理解,这个命令就是用来返回streams中有多少个entry。执行如下:
     
    127.0.0.1:6379> XLEN userInfo
     
    (integer) 2
     
    streams三种查询模式
     
    redis提供了三种查询streams数据的模式:
    1. 范围查询:因为streams的每个entry,其默认生成的ID是基于时间且递增的;
    2. 监听模式:类比linux中的tailf命令,实时接收新增加到streams中的entry(也有点像一个消息系统,事实上笔者认为它就是借鉴了kafka);
    3. 消费者组:即Consumer Groups,特殊的监听模式。从一个消费者的角度来看streams,一个streams能被分区到多个处理消息的消费者,对于任意一条消息,同一个消费者组中只有一个消费者可以处理(和kafka的消费者组完全一样)。这样还能够横向扩容消费者,从而提升处理消息的能力,而不需要只让把让一个消费者处理所有消息。
    接下里分别介绍这三种模式。
     
    XRANGE
     
    用法:XRANGE key start end [COUNT count]
     
    这个命令属于第1种模式,即基于范围查询。这个命令用来返回streams某个顺序范围下的元素,start参数是更小的ID,end参数是更大的ID。有两个特殊的ID用符号"-"和"+"表示,符号"-"表示最小的ID,符号"+"表示最大的ID:
     
    127.0.0.1:6379> XRANGE userInfo "1540014096298-0" "1540014477236-0"
     
    1) 1) "1540014096298-0"
     
       2) 1) "username"
     
          2) "root"
     
          3) "password"
     
          4) "666666"
     
    2) 1) "1540014477236-0"
     
       2) 1) "username"
     
          2) "test"
     
          3) "password"
     
          4) "111111"
     
    127.0.0.1:6379> 
     
    127.0.0.1:6379> XRANGE userInfo - +
     
    1) 1) "1540014082060-0"
     
       2) 1) "username"
     
          2) "afei"
     
          3) "password"
     
          4) "123456"
     
    2) 1) "1540014096298-0"
     
       2) 1) "username"
     
          2) "root"
     
          3) "password"
     
          4) "666666"
     
    3) 1) "1540014477236-0"
     
       2) 1) "username"
     
          2) "test"
     
          3) "password"
     
          4) "111111"
     
    4) 1) "1540014493402-0"
     
       2) 1) "username"
     
          2) "u1"
     
          3) "password"
     
          4) "111111"
     
    XRANGE还能实现遍历某个范围区间的功能,例如我想遍历2018-10-20号新增的用户信息。首先得到2018-10-20 00:00:00对应的时间戳为1539964800000,再得到2018-10-20 23:59:59对应的时间戳为1540051199000,然后执行如下命令:
     
    127.0.0.1:6379> XRANGE userInfo 1539964800000-0  1540051199000-0 COUNT 5
     
    1) 1) "1540014082060-0"
     
       2) 1) "username"
     
          2) "afei"
     
          3) "password"
     
          4) "123456"
     
    ... ...
     
    5) 1) "1540014496505-0"
     
       2) 1) "username"
     
          2) "u2"
     
          3) "password"
     
          4) "111111"
     
    127.0.0.1:6379> 
     
    # 需要注意的是,接下来再遍历的start参数是上一次遍历结果最大的ID加1,即"1540014496505-0"加1就是"1540014496505-1"。
     
    127.0.0.1:6379> XRANGE userInfo 1540014496505-1  1540051199000-0 COUNT 5
     
    1) 1) "1540014499863-0"
     
       2) 1) "username"
     
          2) "u3"
     
          3) "password"
     
          4) "111111"
     
    XREVRANGE
     
    用法:XREVRANGE key end start [COUNT count]
     
    这个命令也属于第1种模式,且和XRANGE相反,返回一个逆序范围。end参数是更大的ID,start参数是更小的ID。执行示例如下:
     
    XREVRANGE userInfo "1540014477236-0" "1540014096298-0"
     
    XREAD
     
    用法:XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
     
    很明显,这个命令就是用来实现第2个模式,即监听模式。其作用是返回streams中从来没有读取的,且比参数ID更大的元素。
     
    这个命令的使用方式如下:
     
    127.0.0.1:6379> XREAD COUNT 10 BLOCK 60000 STREAMS userInfo "1540041139268-0"
     
    1) 1) "userInfo"
     
       2) 1) 1) "1540041264182-0"
     
             2) 1) "u2"
     
                2) "p2"
     
    (9.26s)
     
    # "1540041264182-0"这条消息时通过XADD添加的然后被XREAD监听到的消息。
     
    127.0.0.1:6379> XREAD COUNT 2 STREAMS userInfo 0
     
    1) 1) "userInfo"
     
       2) 1) 1) "1540014082060-0"
     
             2) 1) "username"
     
                2) "afei"
     
                3) "password"
     
                4) "123456"
     
          2) 1) "1540014096298-0"
     
             2) 1) "username"
     
                2) "root"
     
                3) "password"
     
                4) "666666"
     
    # 这条命令实现类似XRANGE的功能。
     
    127.0.0.1:6379> XREAD BLOCK 0 STREAMS userInfo $
     
    1) 1) "userInfo"
     
       2) 1) 1) "1540042613437-0"
     
             2) 1) "u7"
     
                2) "p7"
     
    # 说明BLOCK为0表示一致等待知道有新的数据,否则永远不会超时。并且ID的值我们用特殊字符`$`表示,这个特殊字符表示我们只获取最新添加的消息。
     
    此外,XREAD还支持同时监听多个streams,用法如下所示:
     
    127.0.0.1:6379> XREAD BLOCK 0 STREAMS userInfo_01 userInfo_02 userInfo_03 userInfo_04  $ $ $ $
     
    1) 1) "userInfo_03"
     
       2) 1) 1) "1540043348287-0"
     
             2) 1) "u1"
     
                2) "p1"
     
    (3.49s)
     
    # 监听userInfo_01~userInfo_04这4个streams的新的消息。
     
    XREAD除了COUNT和BLOCK,没有其他选项了。所有XREAD是一个非常基本的命令。更多高级特性可以往下看接下来要介绍的XREADGROUP。
     
    XREADGROUP
     
    用法:XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
     
    很明显,这就是第三种模式:消费者组模式。
     
    如果你了解kafka的消费者组,那么你就也了解了streams的消费者组。如果不了解也没关系,笔者简单解释一下,假设有三个消费者C1,C2,C3。在streams中总计有7条消息:1, 2, 3, 4, 5, 6, 7,那么消费关系如下所示:
     
    1 -> C1
     
    2 -> C2
     
    3 -> C3
     
    4 -> C1
     
    5 -> C2
     
    6 -> C3
     
    7 -> C1
     
    消费者组具备如下几个特点:
    1. 同一个消息不会被投递到一个消费者组下的多个消费者,只可能是一个消费者。
    2. 同一个消费者组下,每个消费者都是唯一的,通过大小写敏感的名字区分。
    3. 消费者组中的消费者请求的消息,一定是新的,从来没有投递过的消息。
    4. 消费一个消息后,需要用命令(XACK)确认,意思是说:这条消息已经给成功处理。正因为如此,当访问streams的历史消息时,每个消费者只能看到投递给它自己的消息。
    消费者组抽象的想象成如下这个样子:
     
    +----------------------------------------+
     
    | consumer_group_name: afeigroup         |
     
    | consumer_group_stream: somekey         |
     
    | last_delivered_id: 1292309234234-92    |
     
    |                                        |
     
    | consumers:                             |
     
    |    "consumer-1" with pending messages  |
     
    |       1292309234234-4                  |
     
    |       1292309234232-8                  |
     
    |    "consumer-42" with pending messages |
     
    |       ... (and so forth)               |
     
    +----------------------------------------+
     
    XACK
     
    用法:XACK key group ID [ID …]
     
    这是消费者组相关的另一个重要的命令。标记一个处理中的消息为已被正确处理,如此一来,这条消息就会被从消费者组的pending消息集合中删除,类似MQ中的ack。
     
    XGROUP
     
    用法:XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
     
    这也是消费者组的一个重要命令,这个命令用来管理消费者组,例如创建,删除等。
     
    XREADGROUP,XACK,XGROUP三种命令构成了消费者组相关的操作命令,下面是消费者组一些操作示例:
     
    # 创建一个消费者组
     
    127.0.0.1:6379> XGROUP CREATE userInfo GRP-AFEI $
     
    OK
     
    # 需要注意的是,目前XGROUP CREATE的streams必须是一个存在的streams,否则会报错:
     
    127.0.0.1:6379> XGROUP CREATE userinfo GRP-AFEI $
     
    (error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
     
    # 名为zhangsan的消费者,需要注意的是streams名称userInfo后面的特殊符号`>`表示这个消费者只接收从来没有被投递给其他消费者的消息,即新的消息。当然我们也可以指定具体的ID,例如指定0表示访问所有投递给该消费者的历史消息,指定1540081890919-1表示投递给该消费者且大于这个ID的历史消息:
     
    127.0.0.1:6379> XREADGROUP GROUP mygroup zhangsan COUNT 1 BLOCK 0 STREAMS userInfo >
     
    # 名为lisi的消费者:
     
    127.0.0.1:6379> XREADGROUP GROUP mygroup lisi COUNT 1 BLOCK 0 STREAMS userInfo >
     
    # 接下来分别添加两条信息,一条就会被zhangsan消费,另一条被lisi消费:
     
    127.0.0.1:6379> XADD userInfo * username u102102 password p102102
     
    "1540081873370-0"
     
    127.0.0.1:6379> XADD userInfo * username u102103 password p102103
     
    "1540081890919-0"
     
    #现在消费者lisi有一条消息:
     
    127.0.0.1:6379> XREADGROUP GROUP mygroup lisi COUNT 5 BLOCK 0 STREAMS userInfo 0
     
    1) 1) "userInfo"
     
       2) 1) 1) "1540081890919-0"
     
             2) 1) "username"
     
                2) "u102103"
     
                3) "password"
     
                4) "p102103"
     
    #然后通过命令ack这条消息:
     
    127.0.0.1:6379> XACK userInfo mygroup 1540081890919-0
     
    (integer) 1
     
    # 再看消费者lisi的pending队列,已经为空:
     
    127.0.0.1:6379> XREADGROUP GROUP mygroup lisi COUNT 5 BLOCK 0 STREAMS userInfo 0
     
    1) 1) "userInfo"
     
       2) (empty list or set)
     
    XPENDING
     
    用法:XPENDING key group [start end count] [consumer]
     
    返回streams中消费者组的pending消息,即消费者接收到但是还没有ack的消息,用法参考:
     
    # 查看消费者组下总计最多10条pending消息
     
    127.0.0.1:6379> XPENDING userInfo mygroup - + 10
     
    1) 1) "1540083260408-0"
     
       2) "zhangsan"
     
       3) (integer) 183551
     
       4) (integer) 1
     
    2) 1) "1540083266293-0"
     
       2) "lisi"
     
       3) (integer) 177666
     
       4) (integer) 1
     
    # 查看消费者组下zhangsan这个消费者总计最多10条pending消息
     
    127.0.0.1:6379> XPENDING userInfo mygroup - + 10 zhangsan
     
    1) 1) "1540083260408-0"
     
       2) "zhangsan"
     
       3) (integer) 187006
     
       4) (integer) 1
     
    XCLAIM
     
    用法:XCLAIM key group consumer min-idle-time ID [ID …] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]
     
    作用是改变消费者组中消息的所有权,用法参考:
     
    127.0.0.1:6379> XREADGROUP GROUP mygroup zhangsan COUNT 5 BLOCK 0 STREAMS userInfo 0
     
    1) 1) "userInfo"
     
       2) 1) 1) "1540083260408-0"
     
             2) 1) "username"
     
                2) "u102106"
     
                3) "password"
     
                4) "p102106"
     
    # zhangsan本来有1条消息,现在将另一条本来属于lisi的消息的所有权转给它:
     
    127.0.0.1:6379> XCLAIM userInfo mygroup zhangsan 360 1540083266293-0
     
    1) 1) "1540083266293-0"
     
       2) 1) "username"
     
          2) "u102107"
     
          3) "password"
     
          4) "p102107"
     
    # 现在zhangsan有两条消息了
     
    127.0.0.1:6379> XREADGROUP GROUP mygroup zhangsan COUNT 5 BLOCK 0 STREAMS userInfo 0
     
    1) 1) "userInfo"
     
       2) 1) 1) "1540083260408-0"
     
             2) 1) "username"
     
                2) "u102106"
     
                3) "password"
     
                4) "p102106"
     
          2) 1) "1540083266293-0"
     
             2) 1) "username"
     
                2) "u102107"
     
                3) "password"
     
                4) "p102107"
     
    XINFO
     
    用法:XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
     
    其作用是得到streams和消费者组的一些信息,使用参考:
     
    127.0.0.1:6379> XINFO CONSUMERS userInfo mygroup 
     
    1) 1) "name"
     
       2) "lisi"
     
       3) "pending"
     
       4) (integer) 0
     
       5) "idle"
     
       6) (integer) 201086
     
    2) 1) "name"
     
       2) "zhangsan"
     
       3) "pending"
     
       4) (integer) 2
     
       5) "idle"
     
       6) (integer) 701954
     
    127.0.0.1:6379> XINFO STREAM userInfo
     
     1) "length"
     
     2) (integer) 22
     
     3) "radix-tree-keys"
     
     4) (integer) 1
     
     5) "radix-tree-nodes"
     
     6) (integer) 2
     
     7) "groups"
     
     8) (integer) 2
     
     9) "last-generated-id"
     
    10) "1540082298051-0"
     
    11) "first-entry"
     
    12) 1) "1540014082060-0"
     
        2) 1) "username"
     
           2) "afei"
     
           3) "password"
     
           4) "123456"
     
    13) "last-entry"
     
    14) 1) "1540082298051-0"
     
        2) 1) "username"
     
           2) "u102105"
     
           3) "password"
     
           4) "p102105"
     
    XTRIM
     
    用法:XTRIM key MAXLEN [~] count
     
    修剪streams到一个确定的size。Trims the stream to (approximately if '~' is passed) a certain size,用法参考:
     
    # streams只保留10条消息,其返回结果表示被剪去多少条消息:
     
    127.0.0.1:6379> XTRIM userInfo MAXLEN 10
     
    (integer) 14
     
    说明:streams目前的修剪策略比较简单,比如连根据ID范围修剪都没有实现。根据具体某一个ID删除,可以通过XDEL实现。
     
    持久化,复制以及消息安全性
     
    和其他数据类型一样,streams也会异步复制到slave,并也会持久化到AOF和RDB文件中。然而,消费者组的全部状态是被传播(propagated )到AOF,RDB和slave中。
     
    需要注意的是,Redis的streams和消费者组使用Redis默认复制进行持久化和复制,因此:如果消息的持久性在您的应用程序中很重要,则必须将AOF与强fsync策略一起使用。
     
    默认情况下,异步复制不保证能复制每一个数据添加或使用者组状态更改:在故障转移之后,可能会丢失某些内容,具体取决于slave从master接收数据的能力。
    • 长度为0的streams
    这是streams和其他redis数据类型的不同,其他数据类型,例如Lists,Sets等,如果所有元素都被删除,那么key也不存在。而streams允许所有entry都被删除。
     
    存在这种不对称性的原因是因为streams可能具有关联的消费者组,并且我们不希望由于streams中不再有任何entry而丢失消费者组定义的状态。 目前,即使没有关联的消费者群体,也不会删除该streams。
     
     
     
  • 相关阅读:
    数据库的三大范式以及五大约束
    解析PHP面向对象的三大特征
    php中的数组遍历的几种方式
    PHP中的函数声明与使用
    使用mui框架打开页面的几种不同方式
    JS中精选this关键字的指向规律你记住了吗
    同一功能三种不同实现方式你选哪个
    转!!NPM报错 Error: EPERM: operation not permitted, unlink......解决办法和清除缓存。
    转!!关于http请求 浏览器 中文编码
    CentOS6.5下Apache防止目录遍历
  • 原文地址:https://www.cnblogs.com/williamjie/p/9837712.html
Copyright © 2011-2022 走看看