zoukankan      html  css  js  c++  java
  • RocketMQ 摘要


    一个Topic可以被多个Group订阅的,每个group下只会被一个consumer消费
    consumer+GroupId消费到那个位点,是记录在客户端,还是记录在Rocket服务器


    一个 Group ID 代表一个 Consumer 实例群组。同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。
    否则您的消息可能会丢失。点击这里了解更多内容。

    消息队列RocketMQ版是基于发布或订阅模型的消息系统。消费者,即消息的订阅方订阅关注的Topic,以获取并消费消息。由于消费者应用一般是分布式系统,以集群方式部署,因此消息队列RocketMQ版约定以下概念:

    什么是集群?使用相同Group ID的消费者属于同一个集群。同一个集群下的消费者消费逻辑必须完全一致(包括Tag的使用)。更多信息,请参见订阅关系一致。

    集群消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被集群内的任意一个消费者处理即可
    广播消费:当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次

    集群消费模式
    适用场景
    适用于消费端集群化部署,每条消息只需要被处理一次的场景。此外,由于消费进度在服务端维护,可靠性更高。具体消费示例如下图所示。

    注意事项
    集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
    集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。

    广播消费模式
    适用场景
    适用于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。具体消费示例如下图所示。

    注意事项
    广播消费模式下不支持顺序消息。
    广播消费模式下不支持重置消费位点。
    每条消息都需要被相同订阅逻辑的多台机器处理。
    消费进度在客户端维护,出现重复消费的概率稍大于集群模式。
    广播模式下,消息队列RocketMQ版保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。
    广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
    广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
    广播模式下服务端不维护消费进度,所以消息队列RocketMQ版控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

    https://help.aliyun.com/document_detail/43163.html?spm=5176.rocketmq.0.0.1492176fAeCrhI





    不同的消费模式适用于不同的场景。当使用集群消费模式时,消息队列 RocketMQ 版认为任意一条消息只需要被集群内的任意一个消费者处理即可。
    当使用广播消费模式时,消息队列 RocketMQ 版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。

    consumer消费到那个位点,是记录在客户端,还是记录在Rocket服务器?记录在服务器端。 
    一个GroupId的消费位点会在服务端保存多少时间呢?

    Topic上发三个消息:

    2021-9-20 12:00:00 第一条

    2021-9-20 13:00:00 第二条

    2021-9-20 14:00:00 第三条
    有一个consumer是 2021-9-20 13:10:00注册上来的,是否能收到 第一条和第二条

    有一个consumer是 2021-9-20 13:10:00注册上来的,是否能收到 第一条和第二条

    如果这个consumer是第一次注册上来,收不到第一条和第二条。

    如果这个consumer是在2021-9-20 10:00:00 注册上来,但在2021-9-20 11:00:00点下线。

    2021-9-20 13:10:00换了一台机器运行,并注册上来,能不能收到 第一条和第二条呢?

     

    重置消费位点
    以时间轴为坐标,在消息持久化存储的时间范围内(默认3天),重新设置Consumer对已订阅的Topic的消费进度,设置完成后Consumer将接收设定时间点之后由Producer发送到消息队列RocketMQ版服务端的消息。
    更多信息,请参见重置消费位点。

    消息过滤
    Consumer可以根据消息标签(Tag)对消息进行过滤,确保Consumer最终只接收被过滤后的消息类型。消息过滤在消息队列RocketMQ版的服务端完成。更多信息,请参见消息过滤。

    https://help.aliyun.com/document_detail/29533.htm?spm=a2c4g.11186623.0.0.328a69bd3Lceiv#concept2655



    Tag过滤
    Tag,即消息标签,用于对某个Topic下的消息进行分类。消息队列RocketMQ版的生产者在发送消息时,指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。

    场景示例
    以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,以以下消息为例:
    订单消息
    支付消息
    物流消息
    这些消息会发送到Trade_TopicTopic中,被各个不同的系统所订阅,以以下系统为例:
    支付系统:只需订阅支付消息。
    物流系统:只需订阅物流消息。
    交易成功率分析系统:需订阅订单和支付消息。
    实时计算系统:需要订阅所有和交易相关的消息。
    过滤示意图如下所示。

    配置方式
    消息队列RocketMQ版支持通过SDK配置Tag过滤功能,分别在消息发送和订阅代码中设置消息Tag和订阅消息Tag。SDK详细信息,请参见SDK参考概述。消息发送端和消费端的代码配置方法如下:
    发送消息
    发送消息时,每条消息必须指明Tag。

        Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());                

    订阅所有Tag
    消费者如需订阅某Topic下所有类型的消息,Tag用星号(*)表示。

        consumer.subscribe("MQ_TOPIC", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                

    订阅单个Tag
    消费者如需订阅某Topic下某一种类型的消息,请明确标明Tag。

        consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                

    订阅多个Tag
    消费者如需订阅某Topic下多种类型的消息,请在多个Tag之间用两个竖线(||)分隔。

        consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                

    错误示例
    同一个消费者多次订阅某个Topic下的Tag,以最后一次订阅的Tag为准。

        //如下错误代码中,Consumer只能订阅到MQ_TOPIC下TagB的消息,而不能订阅TagA的消息。
        consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });
        consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                

    SQL属性过滤
    SQL属性过滤是在消息发送时设置消息的自定义属性,消费者订阅时使用SQL语法设置过滤表达式,根据自定义属性过滤消息,消息队列RocketMQ版根据表法式的逻辑进行计算,将符合条件的消息投递到消费端。

    说明 Tag属于一种特殊的消息属性,所以SQL过滤方式也兼容Tag过滤方法,支持通过Tag属性过滤消息。在SQL语法中,Tag的属性值为TAGS。

    使用限制
    使用SQL属性过滤消息时,有以下限制:
    只有企业铂金版实例支持SQL属性过滤,标准版实例不支持该功能。
    只有TCP协议的客户端支持SQL属性过滤,HTTP协议的客户端不支持该功能。
    若服务端不支持SQL过滤时,客户端使用SQL过滤消息,则客户端启动会报错或收不到消息。

    配置方式
    消息队列RocketMQ版支持通过SDK配置SQL属性过滤。发送端需要在发送消息的代码中设置消息的自定义属性;消费端需要在订阅消息代码中设置SQL语法的过滤表达式。SDK详细信息,请参见SDK参考概述。消息发送端和消费端的代码配置方法如下:
    消息发送端:
    设置消息的自定义属性。

    Message msg = new Message("topic", "tagA", "Hello MQ".getBytes());
    // 设置自定义属性A,属性值为1。
    msg.putUserProperties("A", "1");

    消息消费端
    使用SQL语法设置过滤表达式,并根据自定义属性过滤消息。

    注意 使用属性时,需要先判断属性是否存在。若属性不存在则过滤表达式的计算结果为NULL,消息不会被投递到消费端。

    // 订阅自定义属性A存在且属性值为1的消息。
    consumer.subscribe("topic", MessageSelector.bySql("A IS NOT NULL AND TAGS IS NOT NULL AND A = '1'"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

    示例代码
    发送消息
    同时设置消息Tag和自定义属性。

    Producer producer = ONSFactory.createProducer(properties);
    // 设置Tag的值为tagA。
    Message msg = new Message("topicA", "tagA", "Hello MQ".getBytes());
    // 设置自定义属性region为hangzhou。
    msg.putUserProperties("region", "hangzhou");
    // 设置自定义属性price为50。
    msg.putUserProperties("price", "50");
    SendResult sendResult = producer.send(msg);

    根据单个自定义属性订阅消息。

    Consumer consumer = ONSFactory.createConsumer(properties);
    // 只订阅属性region为hangzhou的消息,若消息中未定义属性region或属性值不是hangzhou,则消息不会被投递到消费端。
    consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND region = 'hangzhou'"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

    预期结果:示例中发送的消息属性符合订阅的过滤条件,消息被投递到消费端。
    同时根据Tag和自定义属性订阅消息。

    Consumer consumer = ONSFactory.createConsumer(properties);
    // 只订阅Tag的值为tagA且属性price大于30的消息。
    consumer.subscribe("topicA", MessageSelector.bySql("TAGS IS NOT NULL AND price IS NOT NULL AND TAGS = 'tagA' AND price > 30 "), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });
    • 同一个Group ID下的消费者实例与Topic的订阅关系需保持一致,更多信息,请参见订阅关系一致
    • 合理使用Topic和Tag来过滤消息可以让业务更清晰,更多信息,请参见Topic与Tag最佳实践

    https://help.aliyun.com/document_detail/29543.htm?spm=a2c4g.11186623.0.0.9c83261dfmxdGU#concept-2047069





  • 相关阅读:
    年轻人的第一个 Spring Boot 应用,太爽了!
    面试问我 Java 逃逸分析,瞬间被秒杀了。。
    Spring Boot 配置文件 bootstrap vs application 到底有什么区别?
    坑爹的 Java 可变参数,把我整得够惨。。
    6月来了,Java还是第一!
    Eclipse 最常用的 10 组快捷键,个个牛逼!
    Spring Cloud Eureka 自我保护机制实战分析
    今天是 Java 诞生日,Java 24 岁了!
    厉害了,Dubbo 正式毕业!
    Spring Boot 2.1.5 正式发布,1.5.x 即将结束使命!
  • 原文地址:https://www.cnblogs.com/softidea/p/15314592.html
Copyright © 2011-2022 走看看