zoukankan      html  css  js  c++  java
  • Redis Stream

    官方介绍:https://redis.io/topics/streams-intro

    Springboot整合文档:https://docs.spring.io/spring-data/data-redis/docs/current/reference/html/#redis.streams

    一、Stream

      Stream是Redis 5.0新增的一种数据结构。它是一个新的很强大的支持多播的可持久化消息队列(极大借鉴了Kafka的设计)。

      Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃 

      Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失

     Stream结构与特征

      它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容。消息是持久化的,Redis重启后,内容还在。

      每个Stream都有唯一的名称,它就是Redis的key,在我们首次使用 xadd指令追加消息时自动创建。

      名词

      Consumer Group:消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)

      last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。

      pending_ids :Pending Entries List (PEL),Stream在每个消费者结构内部维护了一个状态变量pending_ids ,它记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)的消息的ID。

     

      每个Stream都可以挂多个消费组(Consumer Group),每个消费组会有个游标 last_delivered_id 在Stream 之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个Stream内唯一的名称,消费组不会自动创建,它需要单独的指令 xgroup create 进行创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化 last_delivered_id 变量。

      每个消费组的状态都是独立的,相互不受影响,也就是说同一份Stream 内部的消息会被每个消费组都消费到

      同一个消费组可以挂接多个消费者(Consumer),这些消费组之间是竞争关系(一个消息只会被消费组内的一个消费者消费),任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动,每个消费者有一个组内唯一名称。

      消息ID

        消息ID的形式是timestamp-sequence,毫秒数-序号,表示当前消息产生时的毫秒数,sequence表示当前毫秒数产生的第几个消息

      消息内容

        消息内容就是键值对

     Stream消息太多怎么办?

      Stream如果消息太多,导致消息链表很长,占用内存很大,怎么办?

      Redis提供了一个定长Stream功能,通过XADD命令的MAXLEN选项或者XTRIM命令,限制Stream的长度,当达到限制的长度时,就会将老的消息干掉,从而使Stream保持恒定的长度。

        1)XADD命令的MAXLEN选项

        2)XTRIM命令(推荐)

          定长策略:MAXLEN,如限制Stream的长度为10:XTRIM mystream MAXLEN 10,或者 XTRIM mystream MAXLEN ~ 10,~参数意味着并不需要精确到长度为10,只保证最少为10即可,实际上允许比10稍多一些。

          最小ID策略(>=6.2版本):MINID,该技术可以逐出ID小于指定ID的条目。XTRIM mystream MINID 649085820:所有ID低于649085820-0的条目都将被删除

      

     Stream相关命令

        XADD - 向Stream追加消息到末尾,如果队列不存在,则创建一个队列。语法:

          XADD key ID field value [field value ...]

            key :队列名称,如果不存在就创建
            ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
            field value : 记录


        XTRIM - 对流进行修剪,限制长度,返回从流中删除的条目数
        XDEL - 从Stream中删除消息,这里的删除仅仅是设置标志位,不是真正删除,不影响消息总长度。语法:XDEL key ID [ID ...]
        XLEN - 获取流包含的元素数量,即消息长度。语法:XLEN key
        XRANGE - 获取消息列表,会自动过滤已经删除的消息

          XRANGE key start end [COUNT count]

            key :队列名
            start :开始值, - 表示最小值
            end :结束值, + 表示最大值
            count :数量


        XREVRANGE - 反向获取消息列表,ID 从大到小
        XREAD - 以阻塞或非阻塞方式获取消息列表

          xread count 3 streams yyj_stream 0-0 :从头读取yyj_steam的3条记录

        DEL-删除整个Stream消息列表中的所有消息

      
      消费者组相关命令:

        XGROUP CREATE - 创建消费者组。语法:

          XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

            key :队列名称
            groupname :组名。
            $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

          如:XGROUP CREATE mystream consumer-group-name 0-0 / $ MKSTREAM(从头/尾 开始消费),如果key不存在就创建


        XREADGROUP GROUP - 读取消费者组中的消息。语法:

          XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

            group :消费组名
            consumer :消费者名。
            count : 读取数量。
            milliseconds : 阻塞毫秒数。
            key : 队列名。
            ID : 消息 ID。

          如:XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >

          
        XACK - 将消息标记为"已处理"
        XGROUP SETID - 为消费者组设置新的最后递送消息ID
        XGROUP DELCONSUMER - 删除消费者
        XGROUP DESTROY - 删除消费者组
        XPENDING - 显示待处理消息的相关信息
        XCLAIM - 转移消息的归属权
        XINFO - 查看流和消费者组的相关信息;
        XINFO GROUPS - 打印消费者组的信息;
        XINFO STREAM - 打印流信息

    二、与Springboot整合

      org.springframework.data.redis.connectionorg.springframework.data.redis.stream软件包提供了对Redis的数据流的核心功能

      目前仅Lettuce客户端支持 Redis Stream,Jedis尚不支持。

      注意消息类型为 Map<String,String> 

      

      1、追加消息

        要发送一条消息 record,可以使用底层的 RedisConnection ,也可使用高级的 StreamOperations,两个都提供了add方法(xadd指令),该方法接受记录record和目标流

      作为参数。RedisConnection需要原始数据(字节数组),而StreamOperations可以让任意对象作为记录

    // append message through connection
    RedisConnection con =byte[] stream = …
    ByteRecord record = StreamRecords.rawBytes(…).withStreamKey(stream);
    con.xAdd(record);
    
    // append message through RedisTemplate
    RedisTemplate template = …
    StringRecord record = StreamRecords.string(…).withStreamKey("my-stream");
    template.opsForStream().add(record);

      RedisUtil:

        /**
         * 向流中追加记录,若流不存在,则创建
         *
         * @param record 记录类型为Map<String,String>
         * @param streamKey
         * @return 追加消息的RecordId
         */
        public static RecordId xadd(Map<String, String> record, String streamKey) {
            try {
                StringRecord stringRecord = StreamRecords.string(record).withStreamKey(streamKey);
                // 刚追加记录的记录ID
                RecordId recordId = redisTemplate.opsForStream().add(stringRecord);
                LOGGER.info(recordId.getValue());
                return recordId;
            } catch (Exception e) {
                LOGGER.error("xadd error:" + e.getMessage(), e);
                return null;
            }
        }
    
        /**
         * 流消息消费确认
         *
         * @param groupName
         * @param record
         * @return 成功确认的消息数
         */
        public static Long xack(String groupName, Record record) {
            try {
                return redisTemplate.opsForStream().acknowledge(groupName, record);
            } catch (Exception e) {
                LOGGER.error("xack error:" + e.getMessage(), e);
                return 0L;
            }
        }

      2、消费消息

        对消费者而言,可以消费一个或多个流。Redis Streams 提供read 命令允许从已知流内容内的任意位置(随机访问)消费流,并超出流末尾消费新的流记录

        底层的RedisConnection提供了xReadxReadGroup方法,它们分别映射Redis命令来读取消息和在消费组内读取。请注意,可以将多个流用作参数

        注意:在连接上调用xRead会导致当前线程在开始等待消息时阻塞。仅当读取命令超时或收到消息时才释放线程

        要消费流中的消息,有两种方式:

          1)(同步阻塞)在应用程序代码中轮询消息

          2)(异步)使用 Message Listener Containers(消息订阅者容器)中的两种异步接收中的一种(命令式或响应式)。每当消息到达时,容器都会通知调用应用程序代码。

         

        同步接收

          StreamOperations.read(…)方法提供了此功能。在同步接收期间,调用线程可能会阻塞,直到消息可用为止。 StreamReadOptions.block属性指定接收者在放弃等待消息之前应该等待多长时间 

    // Read message through RedisTemplate
    RedisTemplate template = …
    
    List<MapRecord<K, HK, HV>> messages = template.streamOps().read(StreamReadOptions.empty().count(2),
                    StreamOffset.latest("my-stream"));
    
    List<MapRecord<K, HK, HV>> messages = template.streamOps().read(Consumer.from("my-group", "my-consumer"),
                    StreamReadOptions.empty().count(2),
                    StreamOffset.create("my-stream", ReadOffset.lastConsumed()))

        通过 Message Listener Containers (消息订阅者容器)异步接收

          由于其阻塞性质,低级轮询没有吸引力,因为它需要每个消费者使用连接和线程管理。为了减轻这个问题,Spring Data提供了message listeners(消息侦听器),它可以完成所有繁重的工作

        Spring Data附带了两种针对所用编程模型的实现:

          StreamMessageListenerContainer:充当命令式编程模型的消息侦听器容器。它用于从Redis流中 consumer record(消费记录)并驱动注入到对应的StreamListener实例中。

          StreamReceiver:提供消息侦听器的反应式变体。它用于将Redis流中的消息作为潜在的无限流使用,并通过Flux发出流消息

        StreamMessageListenerContainer 和StreamReceiver 都是负责所有接收消息的线程,并将消息分发到 listener 中进行处理。

      

        StreamMessageListenerContainer 

        1)定义流消息订阅者( Stream-Driven POJO (SDP) )充当流消息的接收者,它必须实现 org.springframework.data.redis.stream.StreamListene 接口。如:

    public class ProductUpdateStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
        @Override
        public void onMessage(MapRecord<String, String, String> message) {
            RecordId recordId = message.getId();
            String stream = message.getStream();
            Map<String, String> record = message.getValue();
    
            // 消息处理完毕后,确认该消息,以便从PEL中删除该消息ID
            RedisUtil.streamAcknowledge("yyj_group", message);
        }
    }

        2)创建 message listener container 和注册 流消息订阅者。

        RedisConfig 中添加下面的配置:

    @Bean
        public StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
            StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                    .builder()
                    // 一次最多拉取5条消息
                    .batchSize(5)
                    // 拉取消息的超时时间
                    .pollTimeout(Duration.ofMillis(100))
                    .build();
    
            // 流消息订阅者容器
            StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory,
                    containerOptions);
            // 使用消费组,ReadOffset.lastConsumed()在消费组最后消耗的消息ID之后读取。消费组内的消费者名称需在配置文件中配置
            // 需要注意stream和消费组需提前创建好,XGROUP CREATE yyj_stream yyj_group 0-0 MKSTREAM
            // 要在接收时自动确认消息,请使用receiveAutoAck代替receive
            // 经验证一个消费组内的多个消费者名称可以相同,不会重复消费,解决了集群部署不好区别消费者名称的问题
            streamMessageListenerContainer.receive(
                    Consumer.from("yyj_group", "consumer_01"),
                    StreamOffset.create("yyj_stream", ReadOffset.lastConsumed()),
                    new ProductUpdateStreamListener());
            streamMessageListenerContainer.start();
            return streamMessageListenerContainer;
        }

        验证了,消息只能被消费组内的一个消费者监听到。一个消费组的不同机器可以共用一个消费者名称。

        需要注意消费组需要提前创建好,不然不能消费消息,创建消费组的时候要指定从什么位置开始消费消息。

      补充:

      ReadOffset 策略

        流读取操作接受读取偏移量规范,以消费来自给定偏移量的消息。ReadOffset表示读取偏移量规范。Redis支持三种偏移量,具体取决于您是standalone 消费流还是在消费组中消费流:

          ReadOffset.latest() –阅读最新消息。

          ReadOffset.from(…) –在特定消息ID之后阅读。

          ReadOffset.lastConsumed() –在最后消耗的消息ID之后读取(仅针对消费者组)

        在基于消息容器的使用情况下,我们在消费消息时需要前进(或增加)读取偏移量,前进取决于请求的ReadOffset和消费模式(有/无消费组)。下表说明了容器如何前进ReadOffset:

    Read offsetStandaloneConsumer Group

    Latest

    读取最新消息

    读取最新消息

    Specific Message Id(指定messageId)

    使用上次看到的消息作为下一个MessageId

    使用上次看到的消息作为下一个MessageId

    Last Consumed

    使用上次看到的消息作为下一个MessageId

    每个消费者组最后消费的消息

    总结:Redis Stream可以看作是一个轻量级的Kafka,适用于消息量少且对消息可靠性不高的业务。

    END.

  • 相关阅读:
    Decimal 格式化输出( 去掉多余的0和点)
    HTML Character Sets
    生成下面的模块时,启用了优化或没有调试信息
    PJBLog的CSS模板图
    .NET 实例化顺序
    Live Mail 报错 0x80048820 可能的处理方式
    Windows下将Ldif文件导入OpenLdap时的中文转换问题
    DataGrid中动态添加列
    Sip协议栈消息层的设计与实现
    Prism学习笔记模块之间通信的几种方式
  • 原文地址:https://www.cnblogs.com/yangyongjie/p/14347781.html
Copyright © 2011-2022 走看看