zoukankan      html  css  js  c++  java
  • RocketMq顺序消费

    部分内容出处   https://www.jianshu.com/p/453c6e7ff81c

    rocketmq内部有4个默认的队里,在发送消息时,同一组的消息需要按照顺序,发送到相应的mq中,同一组消息按照顺序进行消费,不同组的消息可以并行的进行消费。

    下面看一下producer的代码:

    package com.alibaba.rocketmq.example.message.order;
    
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageQueue;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.List;
    
    /**
     * @author : Jixiaohu
     * @Date : 2018-04-19.
     * @Time : 9:20.
     * @Description :
     */
    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException {
            String groupName = "order_producer";
            DefaultMQProducer producer = new DefaultMQProducer(groupName);
            producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
            producer.start();
    
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            String dateStr = sdf.format(new Date());
            try {
                for (int i = 1; i <= 3; i++) {
                    String body = dateStr + "Hello RoctetMq : " + i;
                    Message msg = new Message("Topic1", "Tag1", "Key" + i,
                            body.getBytes());
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                            Integer id = (Integer) o;
                            return list.get(id);
                        }
                    }, 0); //0是队列的下标
                    System.out.println(sendResult);
                }
                for (int i = 1; i <= 3; i++) {
                    String body = dateStr + "Hello RoctetMq : " + i;
                    Message msg = new Message("Topic1", "Tag1", "Key" + i,
                            body.getBytes());
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                            Integer id = (Integer) o;
                            return list.get(id);
                        }
                    }, 1); //1是队列的下标
                    System.out.println(sendResult);
                }
                for (int i = 1; i <= 3; i++) {
                    String body = dateStr + "Hello RoctetMq : " + i;
                    Message msg = new Message("Topic1", "Tag1", "Key" + i,
                            body.getBytes());
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                            Integer id = (Integer) o;
                            return list.get(id);
                        }
                    }, 2); //2是队列的下标
                    System.out.println(sendResult);
                }
                for (int i = 1; i <= 3; i++) {
                    String body = dateStr + "Hello RoctetMq : " + i;
                    Message msg = new Message("Topic1", "Tag1", "Key" + i,
                            body.getBytes());
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                            Integer id = (Integer) o;
                            return list.get(id);
                        }
                    }, 3); //3是队列的下标
                    System.out.println(sendResult);
                }
                for (int i = 1; i <= 3; i++) {
                    String body = dateStr + "Hello RoctetMq : " + i;
                    Message msg = new Message("Topic1", "Tag1", "Key" + i,
                            body.getBytes());
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                            Integer id = (Integer) o;
                            return list.get(id);
                        }
                    }, 4); //4是队列的下标
                    System.out.println(sendResult);
                }
                for (int i = 1; i <= 3; i++) {
                    String body = dateStr + "Hello RoctetMq : " + i;
                    Message msg = new Message("Topic1", "Tag1", "Key" + i,
                            body.getBytes());
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                            Integer id = (Integer) o;
                            return list.get(id);
                        }
                    }, 5); //5是队列的下标
                    System.out.println(sendResult);
                }
            } catch (RemotingException e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
            producer.shutdown();
        }
    }

    这边发送多组消息,每组消息的顺序分别为1,2,3,

    下面查看consumer1,和consumer2,因为要顺序消费,需要注意的是,这两个消费者的监听器是MessageListenerOrderly,两个的代码一样,我这边就只展示一个consumer的代码

    package com.alibaba.rocketmq.example.message.order;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author : Jixiaohu
     * @Date : 2018-04-23.
     * @Time : 9:35.
     * @Description : 顺序消息消费
     */
    public class Consumer1 {
    
        public Consumer1() throws Exception {
            String groupName = "order_producer";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
            consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
            /**
             * 设置Consumer第一次启动是从队列头开始消费还是队列尾开始消费
             * 非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //订阅的主题,以及过滤的标签内容
            consumer.subscribe("Topic1", "*");
            //注册监听
            consumer.registerMessageListener(new Listener());
            consumer.start();
            System.out.println("Consumer1 Started.");
        }
    
        class Listener implements MessageListenerOrderly {
    
            private Random random = new Random();
    
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
                // 设置自动提交
                context.setAutoCommit(true);
                for (MessageExt msg : list) {
                    System.out.println(msg + ",context" + new String(msg.getBody()));
                }
                try {
                    TimeUnit.SECONDS.sleep(random.nextInt(1));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
    
                return ConsumeOrderlyStatus.SUCCESS;
            }
        }
    
        public static void main(String[] args) throws Exception {
            new Consumer1();
        }
    
    
    }

    还是按照先启动consumer的顺序,在启动producer的顺序。

    这边看一下控制台的信息

    总共6组消息,broker-a上接收到4组消息,broker-b上接收到2组消息,同一组的3条消息会发送到同一个broker的同一个队列中,这样才能保证顺序消费,

    下面看一下consumer1和consumer2的打印结果

    由于顺序消费只能单线程,一个线程只能去一个队列获取数据。

    可以看到,这边的queueid都是3个 3个打印,不会出现交替,下面看一下一组消息的消费顺序,

     

    可以看到,消息是按照发送的顺序,进行消费,consumer2的打印结果也是类似的,不过consumer2消费了6条消息。

     这样就实现了rocket的顺序消费,虽然实现了顺序消费,由于网络通信,会存在着重复数据的问题,

    重复数据的问题,rocket不提供解决方案,让业务方自行解决,主要有两个方法:

    1. 消费端处理消息的业务逻辑保持幂等性
    2. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

    第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

    第1条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。

    RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

    下面把consumer修改成多线程的模式,再次查看一下运行的结果:

    package com.alibaba.rocketmq.example.message.thread;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author : Jixiaohu
     * @Date : 2018-04-23.
     * @Time : 9:35.
     * @Description : 顺序消息消费
     */
    public class Consumer {
    
        public Consumer() throws Exception {
            String groupName = "order_producer";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
            consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
            /**
             * 设置Consumer第一次启动是从队列头开始消费还是队列尾开始消费
             * 非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            //最小线程数
            consumer.setConsumeThreadMin(10);
    
            //最大线程数
            consumer.setConsumeThreadMax(20);
    
            //订阅的主题,以及过滤的标签内容
            consumer.subscribe("Topic1", "*");
            //注册监听
            consumer.registerMessageListener(new Listener());
            consumer.start();
            System.out.println("Consumer Started.");
        }
    
        class Listener implements MessageListenerConcurrently {
    
            private Random random = new Random();
    
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
    
                for (MessageExt msg : list) {
                    System.out.println(msg + ",context" + new String(msg.getBody()));
                }
                try {
                    TimeUnit.SECONDS.sleep(random.nextInt(1));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
    
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        }
    
        public static void main(String[] args) throws Exception {
            new Consumer();
        }
    
    
    }

     同样先启动consumer,在启动producer,查看一下打印结果:

    从打印结果,可以看出consumer消费并不能保证严格的顺序,多线程和顺序,只能保证其中的一个。producer能保证消息发送的顺序,不能保证消息消费的顺序,要保证消息消费的顺序,consumer端必须实现 MessageListenerOrderly 接口。

  • 相关阅读:
    Windows 2003/2008更改远程桌面端口脚本
    如何修改远程桌面连接3389端口
    关于百度地图(离线)使用过程报“Cannot read property 'jb' of undefined ”错误的解决办法
    IIS 错误:处理程序“PageHandlerFactory-Integrated”在其模块列表中有一个错误模块“ManagedPipelineHandler”
    IIS 错误:由于扩展配置问题而无法提供您请求的页面。如果该页面是脚本,请添加处理程序。如果应下载文件,请添加 MIME 映射。
    IIS7错误:不能在此路径中使用此配置节。如果在父级别上锁定了该节,便会出现这种情况。锁定是默认设置的(overrideModeDefault="Deny")......
    Jqgrid pager 关于“local” dataType 动态加载数据分页的研究(没好用的研究结果)
    JQGrid导出Excel文件
    Oracle以15分钟为界,统计一天内各时间段的数据笔数
    ORA-01438: 值大于为此列指定的允许精度
  • 原文地址:https://www.cnblogs.com/shmilyToHu/p/8933500.html
Copyright © 2011-2022 走看看