zoukankan      html  css  js  c++  java
  • RocketMQ-顺序消费

      看了https://www.jianshu.com/p/453c6e7ff81c这篇博客,得出顺序消费的结论。“要实现严格的顺序消息,简单且可行的办法就是:保证生产者 - MQServer - 消费者是一对一的关系”。

    我们下面通过几个实例来学习RocketMQ的顺序消费。

    一、单节点,也就是一个Producer一个Consumer。

    操作步骤:

      1、先启动ConsumerQueue1

      2、再启动ProducerQueue

    Producer端:

    package org.hope.lee.consumer.queue;
    
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageQueue;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.List;
    
    public class ProducerQueue {
        public static void main(String[] args) {
            String group_name = "order_producer";
            DefaultMQProducer producer = new DefaultMQProducer(group_name);
            producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            try {
                producer.start();
                Date date = new Date();
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String dateStr = sdf.format(date);
                /**
                 * 第一个队列
                 * 通过  public SendResult send(Message msg, MessageQueueSelector selector, Object arg)来指定发送消息到哪个队列
                 */
                for(int i = 1; i <= 5; i++) {
                    String body = dateStr + "body_1_" + i;
                    Message message = new Message("TopicTest", "order1", "KEY" + i, body.getBytes());
                    SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                            Integer id = (Integer) o;
                            return list.get(id);
                        }
                    }, 0); //0是队列的下标
                    System.out.println(sendResult + ", body:" + body);
                }
    
                producer.shutdown();
    
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            }
        }
    }

    Consumer端:

    package org.hope.lee.producer.queue;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    public class ConsumerQueue1 {
    
        public ConsumerQueue1() throws Exception {
            String group_name = "order_consumer";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
            consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //订阅的主题, 以及过滤的标签内容
            consumer.subscribe("TopicTest", "*");
            //注册监听
            consumer.registerMessageListener(new Listener());
            consumer.start();
            System.out.println("Consumer Started.....");
        }
    
        /**
         * 这里实现MessageListenerOrderLy接口就是为了达到顺序消费的目的,
         * 如果是使用MessageListenerConcurrently,则需要把线程池改为单线程模式。
         * 但是也不能保证说一定会顺序消费,因为如果master宕机了,导致写入队列的数量上
         * 出现变化。
         *
         * 从消费端,如果想保证这批消息是M1消费完成再消费M2的话,可以使用MessageListenerOrderly接口,但是这样的话会有以下问题:
         * 1. 遇到消息失败的消息,无法跳过,当前队列消费暂停
         * 2. 目前版本的RocketMQ的MessageListenerOrderly是不能从slave消费消息的。
         */
        class Listener implements MessageListenerOrderly {
            private Random random = new Random();
    
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for(MessageExt msg : list) {
                    System.out.println(msg + ", content:" + new String(msg.getBody()));
                }
                try {
                    TimeUnit.SECONDS.sleep(random.nextInt(5)); //随机休眠时间,模拟业务处理时间
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        }
        public static void main(String[] args) throws Exception {
            ConsumerQueue1 c = new ConsumerQueue1();
    
        }
    }

    Consuerm端输出结果:(横向拖到最后看蓝色字体)

    Consumer Started.....
    MessageExt [queueId=0, storeSize=159, queueOffset=0, sysFlag=0, bornTimestamp=1515420522468, bornHost=/192.168.31.38:9357, storeTimestamp=1515420551480, storeHost=/192.168.31.165:10911, msgId=C0A81FA500002A9F00000000000001EF, commitLogOffset=495, bodyCRC=829956747, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=order1, KEYS=KEY1, WAIT=true, MAX_OFFSET=5, MIN_OFFSET=0}, body=27]], content:2018-01-08 22:08:42body_1_1
    MessageExt [queueId=0, storeSize=159, queueOffset=1, sysFlag=0, bornTimestamp=1515420522668, bornHost=/192.168.31.38:9357, storeTimestamp=1515420551683, storeHost=/192.168.31.165:10911, msgId=C0A81FA500002A9F000000000000028E, commitLogOffset=654, bodyCRC=678523697, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=order1, KEYS=KEY2, WAIT=true, MAX_OFFSET=5, MIN_OFFSET=0}, body=27]], content:2018-01-08 22:08:42body_1_2
    MessageExt [queueId=0, storeSize=159, queueOffset=2, sysFlag=0, bornTimestamp=1515420522781, bornHost=/192.168.31.38:9357, storeTimestamp=1515420551716, storeHost=/192.168.31.165:10911, msgId=C0A81FA500002A9F000000000000032D, commitLogOffset=813, bodyCRC=1601586087, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=order1, KEYS=KEY3, WAIT=true, MAX_OFFSET=5, MIN_OFFSET=0}, body=27]], content:2018-01-08 22:08:42body_1_3
    MessageExt [queueId=0, storeSize=159, queueOffset=3, sysFlag=0, bornTimestamp=1515420522792, bornHost=/192.168.31.38:9357, storeTimestamp=1515420551753, storeHost=/192.168.31.165:10911, msgId=C0A81FA500002A9F00000000000003CC, commitLogOffset=972, bodyCRC=1091753476, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=order1, KEYS=KEY4, WAIT=true, MAX_OFFSET=5, MIN_OFFSET=0}, body=27]], content:2018-01-08 22:08:42body_1_4
    MessageExt [queueId=0, storeSize=159, queueOffset=4, sysFlag=0, bornTimestamp=1515420522833, bornHost=/192.168.31.38:9357, storeTimestamp=1515420551768, storeHost=/192.168.31.165:10911, msgId=C0A81FA500002A9F000000000000046B, commitLogOffset=1131, bodyCRC=907404946, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=order1, KEYS=KEY5, WAIT=true, MAX_OFFSET=5, MIN_OFFSET=0}, body=27]], content:2018-01-08 22:08:42body_1_5

    一、Consumer端集群消费的顺序消费,也就是一个Producer多个Consumer。

     步骤一、我们先改造一下ProducerQueue,在第一个队列下面再加两个队列

    /**
     * 第二个队列
     */
    for(int i = 1; i <= 5; i++) {
        //时间戳
        String body = dateStr + "order_2" + i;
        Message message = new Message("TopicTest", "body_2_", "KEY" + i, body.getBytes());
        SendResult sendResult = producer.send(message, new MessageQueueSelector() {
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                Integer id = (Integer) o;
                return list.get(id);
            }
        }, 1); //1是队列的下标
        System.out.println(sendResult + ", body:" + body);
    }
    /**
     * 第三个队列
     */
    for(int i = 1; i <= 5; i++) {
        //时间戳
        String body = dateStr + "order_3" + i;
        Message message = new Message("TopicTest", "body_3_", "KEY" + i, body.getBytes());
        SendResult sendResult = producer.send(message, new MessageQueueSelector() {
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                Integer id = (Integer) o;
                return list.get(id);
            }
        }, 2); //2是队列的下标
        System.out.println(sendResult + ", body:" + body);
    }

    步骤二、再创建一个消费端ConsumerQueue2,代码跟ConsumerQueue1一样,这里就不重复了。

    步骤三、启动ConsumerQueue1和ConsumerQueue3

    步骤四、启动ProducerQueue

    结果:

    从结果中看到,两个消费端都是按照队列顺序消费的,并且负载均衡,ConsumerQueue1消费了第三个队列,ConsumerQueue2消费了第一个队列和第二个队列。

    需要注意的一点是,对于顺序消费,我们是不能再Consumer端再使用多线程去消费的。这样就破坏了顺序消费的生态环境。

  • 相关阅读:
    【路由介绍】
    asp.net MVC 中枚举创建下拉列表?
    DELPHI中的快捷方式一览(完全版)
    C#连接mysql实例
    编写测试类,了解ArrayList的方法
    C# 验证IP是否正确简易方法 源代码
    C# 多线程操作样例
    C# 乘法口诀表的实现方法
    C# 调用系统API 内核 简单样例
    C# 基础 计算平均值的方法
  • 原文地址:https://www.cnblogs.com/happyflyingpig/p/8245297.html
Copyright © 2011-2022 走看看