zoukankan      html  css  js  c++  java
  • RocketMQ顺序消息

    rocketmq的顺序消息需要满足2点:

    1.Producer端保证发送消息有序,且发送到同一个队列。
    2.consumer端保证消费同一个队列。

    生产端:

    RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。

    但是同一条queue里面,RocketMQ的确是能保证FIFO的

    确保消息放到同一个queue中,需要使用 MessageQueueSelector

    列如:

    String body = dateStr + " Hello RocketMQ " + orderList.get(i);
    Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes());
    //确保同一个订单号的数据放到同一个queue中
    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Long id = (Long) arg;
                            long index = id % mqs.size();
                            return mqs.get((int)index);
                        }
                    }, orderList.get(i).getOrderId());//订单id

    消费端:

    需要使用 MessageListenerOrderly 来消费数据。

    MessageListenerOrderly与MessageListenerConcurrently区别

    MessageListenerOrderly:有序消费,同一队列的消息同一时刻只能一个线程消费,可保证消息在同一队列严格有序消费  

    MessageListenerConcurrently:并发消费

    public class ConsumerInOrder {
     
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
            consumer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
     
            consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");
     
            consumer.registerMessageListener(new MessageListenerOrderly() {
     
                Random random = new Random();
     
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    context.setAutoCommit(true);
                    System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
                    for (MessageExt msg: msgs) {
                        System.out.println(msg + ", content:" + new String(msg.getBody()));
                    }
                    try {
                        //模拟业务逻辑处理中...
                        TimeUnit.SECONDS.sleep(random.nextInt(10));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
     
            consumer.start();
     
            System.out.println("Consumer Started.");
        }
    }

    参考文档

    https://blog.csdn.net/earthhour/article/details/78323026

  • 相关阅读:
    CF 676C. Vasya and String 尺取经典题目
    进制转换
    《Dotnet9》系列-开源C# Winform控件库1《HZHControls》强力推荐
    《Dotnet9》系列-开源C# Winform控件库强力推荐
    《Dotnet9》系列-开源C# WPF控件库强力推荐
    《Dotnet9》系列-开源C# WPF控件库3《HandyControl》强力推荐
    《Dotnet9》系列-开源C# WPF控件库2《Panuon.UI.Silver》强力推荐
    《Dotnet9》系列之建站-Dotnet9建站20天感悟
    《Dotnet9》系列-开源C# WPF控件库1《MaterialDesignInXAML》强力推荐
    《Dotnet9》系列-FluentValidation在C# WPF中的应用
  • 原文地址:https://www.cnblogs.com/fqybzhangji/p/11044119.html
Copyright © 2011-2022 走看看