zoukankan      html  css  js  c++  java
  • RocketMQ(7)---顺序消费

    RocketMQ顺序消费

    如果要保证顺序消费,那么他的核心点就是:生产者有序存储消费者有序消费

    一、概念

    1、什么是无序消息

    无序消息 无序消息也指普通的消息,Producer 只管发送消息,Consumer 只管接收消息,至于消息和消息之间的顺序并没有保证。

    举例 Producer 依次发送 orderId 为 1、2、3 的消息,Consumer 接到的消息顺序有可能是 1、2、3,也有可能是 2、1、3 等情况,这就是普通消息。

    2、什么是全局顺序

    对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费

    举例 比如 Producer 发送orderId 1,3,2 的消息, 那么 Consumer 也必须要按照 1,3,2 的顺序进行消费。

    3、局部顺序

    在实际开发有些场景中,我并不需要消息完全按照完全按的先进先出,而是某些消息保证先进先出就可以了。

    就好比一个订单涉及 订单生成订单支付订单完成。我不用管其它的订单,只保证同样订单ID能保证这个顺序就可以了。


    二、实现原理

    我们知道 生产的message最终会存放在Queue中,如果一个Topic关联了16个Queue,如果我们不指定消息往哪个队列里放,那么默认是平均分配消息到16个queue,

    好比有100条消息,那么这100条消息会平均分配在这16个Queue上,那么每个Queue大概放5~6个左右。这里有一点很重的是:

    同一个queue,存储在里面的message 是按照先进先出的原则

    这个时候思路就来了,好比有orderId=1的3条消息,分别是 订单生产订单付款订单完成。只要保证它们放到同一个Queue那就保证消费者先进先出了。

    这就保证局部顺序了,即同一订单按照先后顺序放到同一Queue,那么取消息的时候就可以保证先进先取出。

    那么全局消息呢?

    这个就简单啦,你把所有消息都放在一个Queue里,这样不就保证全局消息了。

    就这么简单

    当然不是,这里还有很关键的一点,好比在一个消费者集群的情况下,消费者1先去Queue拿消息,它拿到了 订单生成,它拿完后,消费者2去queue拿到的是 订单支付

    拿的顺序是没毛病了,但关键是先拿到不代表先消费完它。会存在虽然你消费者1先拿到订单生成,但由于网络等原因,消费者2比你真正的先消费消息。这是不是很尴尬了。

    订单付款还是可能会比订单生成更早消费的情况。那怎么办。

    分布式锁来了

    Rocker采用的是分段锁,它不是锁整个Broker而是锁里面的单个Queue,因为只要锁单个Queue就可以保证局部顺序消费了。

    所以最终的消费者这边的逻辑就是

    消费者1去Queue拿 订单生成,它就锁住了整个Queue,只有它消费完成并返回成功后,这个锁才会释放。

    然后下一个消费者去拿到 订单支付 同样锁住当前Queue,这样的一个过程来真正保证对同一个Queue能够真正意义上的顺序消费,而不仅仅是顺序取出。

    全局顺序与分区顺序对比

    消息类型对比

    发送方式对比

    其它的注意事项

    1、顺序消息暂不支持广播模式。
    2、顺序消息不支持异步发送方式,否则将无法严格保证顺序。
    3、建议同一个 Group ID 只对应一种类型的 Topic,即不同时用于顺序消息和无序消息的收发。
    4、对于全局顺序消息,建议创建实例个数 >=2。
    

    三、代码示例

    这里保证两点

    1、生产端 同一orderID的订单放到同一个queue。
    
    2、消费端 同一个queue取出消息的时候锁住整个queue,直到消费后再解锁。
    

    1、ProductOrder实体

    @AllArgsConstructor
    @Data
    @ToString
    public class ProductOrder {
        /**
         * 订单编号
         */
        private String orderId;
    
        /**
         * 订单类型(订单创建、订单付款、订单完成)
         */
        private String type;
    }
    

    2、Product(生产者)

    生产者和之前发送普通消息最大的区别,就是针对每一个message都手动通过MessageQueueSelector选择好queue。

    @RestController
    public class Product {
        private static List<ProductOrder> orderList = null;
        private static String producerGroup = "test_producer";
        /**
         * 模拟数据
         */
        static {
            orderList = new ArrayList<>();
            orderList.add(new ProductOrder("XXX001", "订单创建"));
            orderList.add(new ProductOrder("XXX001", "订单付款"));
            orderList.add(new ProductOrder("XXX001", "订单完成"));
            orderList.add(new ProductOrder("XXX002", "订单创建"));
            orderList.add(new ProductOrder("XXX002", "订单付款"));
            orderList.add(new ProductOrder("XXX002", "订单完成"));
            orderList.add(new ProductOrder("XXX003", "订单创建"));
            orderList.add(new ProductOrder("XXX003", "订单付款"));
            orderList.add(new ProductOrder("XXX003", "订单完成"));
        }
    
        @GetMapping("message")
        public  void sendMessage() throws Exception {
            //示例生产者
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            //不开启vip通道 开通口端口会减2
            producer.setVipChannelEnabled(false);
            //绑定name server
            producer.setNamesrvAddr("IP:9876");
            producer.start();
            for (ProductOrder order : orderList) {
                //1、生成消息
                Message message = new Message(JmsConfig.TOPIC, "", order.getOrderId(), order.toString().getBytes());
                //2、发送消息是 针对每条消息选择对应的队列
                SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        //3、arg的值其实就是下面传入 orderId
                        String orderid = (String) arg;
                        //4、因为订单是String类型,所以通过hashCode转成int类型
                        int hashCode = orderid.hashCode();
                        //5、因为hashCode可能为负数 所以取绝对值
                        hashCode = Math.abs(hashCode);
                        //6、保证同一个订单号 一定分配在同一个queue上
                        long index = hashCode % mqs.size();
                        return mqs.get((int) index);
                    }
                }, order.getOrderId(),50000);
    
                System.out.printf("Product:发送状态=%s, 存储queue=%s ,orderid=%s, type=%s
    ", sendResult.getSendStatus(), 
                                          sendResult.getMessageQueue().getQueueId(), order.getOrderId(), order.getType());
            }
            producer.shutdown();
        }
    }
    

    看看生产者有没有把相同订单指定到同一个queue

    通过测试结果可以看出:相同订单已经存到同一queue中了

    3、Consumer(消费者)

    上面说过,消费者真正要达到消费顺序,需要分布式锁,所以这里需要将MessageListenerOrderly替换之前的MessageListenerConcurrently,因为它里面实现了分布式锁。

    @Slf4j
    @Component
    public class Consumer {
        
        /**
         * 消费者实体对象
         */
        private DefaultMQPushConsumer consumer;
        /**
         * 消费者组
         */
        public static final String CONSUMER_GROUP = "consumer_group";
        /**
         * 通过构造函数 实例化对象
         */
        public Consumer() throws MQClientException {
            consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
            consumer.setNamesrvAddr("IP:9876");
            //TODO 这里真的是个坑,我product设置VipChannelEnabled(false),但消费者并没有设置这个参数,之前发送普通消息的时候也没有问题。能正常消费。
            //TODO 但在顺序消息时,consumer一直不消费消息了,找了好久都没有找到原因,直到我这里也设置为VipChannelEnabled(false),竟然才可以消费消息。
            consumer.setVipChannelEnabled(false);
            //订阅主题和 标签( * 代表所有标签)下信息
            consumer.subscribe(JmsConfig.TOPIC, "*");
                //注册消费的监听 这里注意顺序消费为MessageListenerOrderly 之前并发为ConsumeConcurrentlyContext
            consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
                //获取消息
                MessageExt msg = msgs.get(0);
                //消费者获取消息 这里只输出 不做后面逻辑处理
                log.info("Consumer-线程名称={},消息={}", Thread.currentThread().getName(), new String(msg.getBody()));
                return ConsumeOrderlyStatus.SUCCESS;
            });
            consumer.start();
        }
    }
    

    看看消费结果是不是我们需要的结果

    通过测试结果我们看出

    1、消费消息的顺序并没有完全按照之前的先进先出,即没有满足全局顺序。
    2、同一订单来讲,订单的 订单生成、订单支付、订单完成 消费顺序是保证的。
    

    这是局部保证顺序消费就已经满足我们当前实际开发中的需求了。

    有关消费端选择MessageListenerOrderly后,consumer.start()启动相关的源码可以参考博客:RocketMQ顺序消息消费端源码



    只要自己变优秀了,其他的事情才会跟着好起来(上将4)
    
  • 相关阅读:
    学习Java的第八天
    学习Java的第七天
    学习Java的第六天
    学习Java的第五天
    学习Java的第四天
    学习Java的第三天
    学习Java的第二天
    学习Java的第一天
    第九天
    第八次
  • 原文地址:https://www.cnblogs.com/qdhxhz/p/11134903.html
Copyright © 2011-2022 走看看