zoukankan      html  css  js  c++  java
  • Redis Stream

    Redis 5.0版本之后提供的新的数据类型,类似于MQ,是发布订阅模式的改进

    1. Streams数据结构

      每一个Streams有唯一的名称,对应了Redis的键。将所有加入的消息(xadd指令)串联起来,每个消息有一个ID

    • Consumer Group:消费组,同一个消息可以被组内成员消费,但只会有一个消费成功,保证了消费的唯一性
    • last_delivered_id:标识消费组在Stream上的消费位置,代表当前消费组正在消费该消息,每当消费者读取一条消息,last_delivered_id变量会前进
    • pending_ids(PEL):每个消费者内部的一个数组,记录已经被客户端获取,但是没有ack的消息id。为了确保消费成功。

    2. 消息ID和消息内容

      由xadd命令自动创建的消息ID格式为:毫秒时间戳-序列号,代表该条消息是指定毫秒内产生的第几条消息。使用时间戳是为了支持时间的范围查找,并且保证了消息ID的自增。

      消息内容是普通的键值对

    3. 增删改查命令

    1. xadd:追加消息
    2. xdel:删除消息,注意是只做一个标记
    3. xrange:获取消息列表,自动过滤已经删除的消息
    4. xlen:消息长度
    5. del:删除Stream

    4. 独立消费

      使用xread命令忽略消费组,将Steam当成普通的消息队列使用。如果要进行顺序消费,在每次xread时,带上前一个消息的ID作为参数。使用xread block 秒数 命令,在指定时间内阻塞等待消息的到来,如果没有消息,返回nil

    5. 创建消费者

      通过xgroup create创建消费组,需要传递初始消息ID初始化last_delivered_id

    6. 组内消费

      通过xreadgroup命令,指定消费组名称,消费者名称和起始消息ID。支持阻塞等待新消息。

      读取到新消息,将消息ID添加到pending_ids中,当客户端处理完毕,使用xack指令通知服务器,该ID会被删除。

    7. 问题

      1. Stream消息太多怎么办

       xadd指令可以指定最大长度 maxlen,超过指定长度,会将较老的消息干掉。

      2. PEL如何避免消息丢失

       当客户端重连后,xreadgroup命令会指定获取消息ID,一般设置为0-0,表示读取所有PEL消息以及last_delivered_id之后的消息。

    8. 应用

       即时通信、大数据分析、异地数据备份、分流

    参考:https://www.cnblogs.com/wmyskxz/p/12499532.html

    人生就像蒲公英,看似自由,其实身不由己。
  • 相关阅读:
    MarkDown语法总结
    HashMap
    [LeetCode] 102. Binary Tree Level Order Traversal(二叉树的中序遍历)
    [LeetCode] 287. Find the Duplicate Number(寻找重复数字)
    [LeetCode] 215. Kth Largest Element in an Array(数组里的第 k 大元素)
    [LeetCode] 39. Combination Sum(组合的和)
    [LeetCode] 49. Group Anagrams(分组相同字母异序词)
    [LeetCode] 48. Rotate Image(旋转图片)
    [LeetCode] 647. Palindromic Substrings(回文子串)
    [LeetCode] 238. Product of Array Except Self(数组除自身元素外的乘积)
  • 原文地址:https://www.cnblogs.com/walker993/p/14499957.html
Copyright © 2011-2022 走看看