zoukankan      html  css  js  c++  java
  • Redis之Stream

    【Stream简介】
    Redis5.0增加了一种新的数据结构:Stream,它是一个支持多播的可持久化消息队列。
    Stream的结构是一个链表,将所有的消息都串起来,每个消息都有一个唯一的ID和对应的内容。消息是持久化的,Redis重启后,内容还在。
    和其它的结构一样,结构上的不同,都是value不同,key都是字符串形式的。key就是Stream这个结构的名称。

     在使用xadd指令追加消息时,Stream也符合creates if not exists的规则,Redis会自动创建一个Stream结构。

    【消费组-消费者】
    每个Stream都可以挂多个消费组,每个消费组会有游标(last_delivered_id)在Stream上往前移动,表示当前消费组已经消费到哪条消息了。
    每个消费组都有一个Stream内唯一的名称,消费组不会自动创建,需要用xgroup create进行创建,需要指定Stream某个消息ID开始消费,这个ID用于初始化消费组的游标。
    每个消费组的状态相互独立,同一份Stream内部的消息会被每个消费组都消费到。
    同一个消费组可以挂多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使组的游标往前移动。每个消费者都有一个组内唯一名称。
    消费者内部会有一个状态变量pending_ids,记录了当前已经被客户端读取,但没有ack的消息。一旦某个消息被ack,它就减掉这个消息。这个变量被称为PEL,即Pending Entries List,这是一个核心的数据结构,用来确保客户端至少消费了消息一次。
    【消息】
    消息ID的形式是毫秒时间戳-消息顺序,如1527846880572-5,表示当前消息在1527846880572毫秒时产生,并且是毫秒内产生的第5条消息。
    消息ID可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是“整数-整数”,且后面加入的消息ID必须要大于前面的消息ID。
    消息内容就是hash结构的键值对。
    【基本操作】
    xadd,xdel,xrange,xlen表示对Stream内部消息的增,删(标记),查(除标记外的),长度查询操作。
    del表示删除Stream这个结构。

    【独立消费】
    在不定义消费组的情况下,我们也可以用xread单独消费Stream内的消息。

    也可以阻塞读消息,从尾部读取消息,等待最新的消息到来。当有其它客户端添加消息时,这里就会解除阻塞,显示最新的消息:

    如果客户端想要使用xread进行顺序消费,那么就要记住服务器返回的游标,即消息ID,下次继续调用xread时,将上次的ID作为参数传递进去,就可以继续消费后续的消息。
    block 0表示用原阻塞,block 1000表示阻塞1S,如果时间内没有消息,返回nil。
    【创建消费组】
    xgroup create可以创建消费组,创建时需要指定游标位置,也可以从头或者尾创建,创建完可以用xinfo查看组信息:

    【组内消费】
    xreadgroup可以进行消费组的组内消费,需要提供消费组名称,消费者名称和起始消息ID。也可以阻塞读。读到新消息后,对应ID会进入消费者的PEL中,客户端处理完毕后用xack指令通知服务器,本条消息已经处理完毕,消息就会从PEL中移除。
    xreadgroup GROUP consumegroup-name consume-name count 1 streams stream-name
    xack stream-name consumegroup-name messageid
    【Stream消息废弃策略】
    xadd指令有一个定长参数maxlen,可以将老的消息干掉:
    【一定要记得xack】
    如果消费者收到了消息,但是读取完并没有xack,那么PEL就会不断增多,消费组很多的时候,这个PEL占用的内存就会放大。
    这个PEL的作用就是为了让消费者处理完成后,再通过xack告诉服务器我已经处理完了,你可以不用管了,如果客户端处理过程中发生异常,那么这个PEL就会记住这个ID,客户端连上后会再次收到这个消息。此时客户端可以设置xreadgroup读取,起始消息必须是有效的消息ID,如果设置0-0,表示读取所有PEL消息,以及游标之后的新消息(相当于增量读取)。
    【Stream的消息丢失】
    主从复制时,如果failover发生,Redis可能会丢失从节点没来得及处理的消息,所以Stream此时也有可能会丢失消息。
    【分区】
    Redis没有原生支持分区能力,如果想分区,那就多建几个Stream,然后在客户端采用一定策略来生产消息到不同的Stream。
    这里可以对比Kafka的HashStrategy,它通过客户端的hash算法,来将消息放入了不同的分区。

    【参考】

    《Redis深度历险 核心原理与应用实践》
    https://juejin.cn/post/7028439051308892167

  • 相关阅读:
    oracle 查询 当前最大时间的value的值
    Visual Studio《加载此属性页时出错》的解决办法
    (转) 关于在IE6下 无法跳转问题
    LINQ TO XML 个人的一些心得1
    CSS实现单行、多行文本溢出显示省略号(…)
    html 图像映射(一个图像多个连接)
    JS页面跳转大全
    首行负缩进达到内容对齐的目的
    HTML图片死活不显示
    JS高级. 06 缓存、分析解决递归斐波那契数列、jQuery缓存、沙箱、函数的四种调用方式、call和apply修改函数调用方法
  • 原文地址:https://www.cnblogs.com/bruceChan0018/p/15764320.html
Copyright © 2011-2022 走看看