zoukankan      html  css  js  c++  java
  • 3-rocketmq-支持的消息种类

    RocketMQ消息支持的模式

    普通消息 NormalProducer

    • 消息同步发送

      producer.send(Message msg)

    • 消息异步发送

      producer.send(Message msg, SendCallback sendCallback)

    • 单向发送OneWay

      producer.sendOneWay(Message msg);

    顺序消息 OrderProducer

    简介

    顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:顺序发布和顺序消费。
    顺序消息包含两种类型:

    • 分区顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费

    • 全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费

    全局顺序其实是分区顺序的一个特例,即使Topic只有一个分区(以下不在讨论全局顺序,因为全局顺序将面临性能的问题,而且绝大多数场景都不需要全局顺序)

    在MQ的模型中,顺序需要由3个阶段去保障:

    1. 消息被发送时保持顺序
    2. 消息被存储时保持和发送的顺序一致(依赖1)
    3. 消息被消费时保持和存储的顺序一致(依赖2)

    顺序发送和顺序存储由rocketmq保证,顺序消费需要由消费者业务层保证

    RocketMQ顺序消息原理

    img

    Producer保证顺序发送,Consumer消费时通过一个分区只能有一个线程消费的方式来保证消息顺序

    实现

    生产者producer

    producer只需要确保消息发送到特定的分区,也就是MessageQueue。通过MessageQueueSelector来实现分区的选择(自定义消息发送规则)

    public class Producer {
        public static void main(String[] args) {
            try {
                DefaultMQProducer producer = new DefaultMQProducer("producerGroup1");
                producer.setNamesrvAddr("10.211.55.4:9876");
                producer.setRetryTimesWhenSendFailed(3);
                producer.start();
                String[] tags = new String[]{"创建订单", "支付", "发货", "收货", "五星好评"};
                for (int i = 5; i < 25; i++) {
                    int orderId = i / 5;
                    Message msg = new Message("OrderTopic1", tags[i % tags.length], "uniqueId:" + i,
                            ("order_" + orderId + " " + tags[i % tags.length]).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    /**
                     * 实现MessageQueueSelector 保证顺序发送
                     * 实现MessageQueueSelector的三种实现:
                     * SelectMessageQueueByHash
                     * SelectMessageQueueByMachineRoom
                     * SelectMessageQueueByRandom
                     */
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            //此刻arg == orderId,可以保证是每个订单进入同一个队列
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);
                    System.out.printf("%s%n", sendResult);
                }
                producer.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    消费者consumer

    消费者端需要保证消息消费的顺序性,最简单的办法就是单线程消费。使用MessageListenerOrderly实现消费监听

    疑问:一个broker能否被多个consumer同时消费?

    public class Consumer {
        public static void main(String[] args) {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
            consumer.setNamesrvAddr("10.211.55.4:9876");
            try {
                //设置Consumer从哪开始消费
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.subscribe("OrderTopic1", "*");
                // 实现了MessageListenerOrderly表示一个队列只会被一个线程取到, 第二个线程无法访问这个队列,MessageListenerOrderly默认单线程
    //            consumer.setConsumeThreadMin(3);
    //            consumer.setConsumeThreadMax(6);
                consumer.registerMessageListener(new MessageListenerOrderly() {
                    @Override
                    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                        try {
                            System.out.println("orderInfo: " + new String(msgs.get(0).getBody(), "utf-8"));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        return ConsumeOrderlyStatus.SUCCESS;
                    }
                });
                consumer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("Consumer1 Started.");
        }
    
    }
    
    顺序和异常的关系

    顺序消息需要Producer和Consumer都保证顺序。Producer需要保证消息被路由到正确的分区,消息需要保证每个分区的数据只有一个线程消息,那么就会有一些缺陷:

    • 发送顺序消息无法利用集群的Failover特性,因为不能更换MessageQueue进行重试
    • 因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大
    • 消费的并行读依赖于分区数量
    • 消费失败时无法跳过

    不能更换MessageQueue重试就需要MessageQueue有自己的副本,通过Raft、Paxos之类的算法保证有可用的副本,或者通过其他高可用的存储设备来存储MessageQueue。

    热点问题好像没有什么好的解决办法,只能通过拆分MessageQueue和优化路由方法来尽量均衡的将消息分配到不同的MessageQueue。

    消费并行度理论上不会有太大问题,因为MessageQueue的数量可以调整。

    消费失败的无法跳过是不可避免的,因为跳过可能导致后续的数据处理都是错误的。不过可以提供一些策略,由用户根据错误类型来决定是否跳过,并且提供重试队列之类的功能,在跳过之后用户可以在“其他”地方重新消费到这条消息。

  • 相关阅读:
    Golang Gin 实战(一)| 快速安装入门
    6 款最棒的 Go 语言 Web 框架简介
    Golang教科书般的web框架
    vgo简明教程
    go mod常用命令 已经 常见问题
    线程池原理讲解 Java专题
    Python 3.9安装与使用
    消息队列的基本概念
    实践——GIT安装(2021/05/01)
    vue2.0数据双向绑定原理分析及代码实现
  • 原文地址:https://www.cnblogs.com/zh-ch/p/14248762.html
Copyright © 2011-2022 走看看