zoukankan      html  css  js  c++  java
  • RocketMQ 顺序消费只消费一次 坑

    rocketMq实现顺序消费的原理

    produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息

    注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue

    单个节点(Producer端1个、Consumer端1个)

    1、Producer.java 

    package order;  
      
    import java.util.List;  
      
    import com.alibaba.rocketmq.client.exception.MQBrokerException;  
    import com.alibaba.rocketmq.client.exception.MQClientException;  
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;  
    import com.alibaba.rocketmq.client.producer.MessageQueueSelector;  
    import com.alibaba.rocketmq.client.producer.SendResult;  
    import com.alibaba.rocketmq.common.message.Message;  
    import com.alibaba.rocketmq.common.message.MessageQueue;  
    import com.alibaba.rocketmq.remoting.exception.RemotingException;  
      
    /** 
     * Producer,发送顺序消息 
     */  
    public class Producer {  
        public static void main(String[] args) {  
            try {  
                DefaultMQProducer producer = new DefaultMQProducer("order_Producer");  
                producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
      
                producer.start();  
      
                // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",  
                // "TagE" };  
      
                for (int i = 1; i <= 5; i++) {  
      
                    Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());  
      
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                            Integer id = (Integer) arg;  
                            int index = id % mqs.size();  
                            return mqs.get(index);  
                        }  
                    }, 0);  
      
                    System.out.println(sendResult);  
                }  
      
                producer.shutdown();  
            } catch (MQClientException e) {  
                e.printStackTrace();  
            } catch (RemotingException e) {  
                e.printStackTrace();  
            } catch (MQBrokerException e) {  
                e.printStackTrace();  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }

    2、Consumer.java   (有问题)

    package order;   
    import java.util.List;  
    import java.util.concurrent.TimeUnit;  
    import java.util.concurrent.atomic.AtomicLong;   
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;  
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;  
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;  
    import com.alibaba.rocketmq.client.exception.MQClientException;  
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
    import com.alibaba.rocketmq.common.message.MessageExt;  
      
    /** 
     * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交) 
     */  
    public class Consumer1 {  
      
        public static void main(String[] args) throws MQClientException {  
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
            consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
      
            /** 
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> 
             * 如果非第一次启动,那么按照上次消费的位置继续消费 
             */  
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
      
            consumer.subscribe("TopicOrderTest", "*");  
      
            consumer.registerMessageListener(new MessageListenerOrderly() {  
                AtomicLong consumeTimes = new AtomicLong(0);  
      
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
                    // 设置自动提交  
                    context.setAutoCommit(true);  
                    for (MessageExt msg : msgs) {  
                        System.out.println(msg + ",内容:" + new String(msg.getBody()));  
                    }  
      
                    try {  
                        TimeUnit.SECONDS.sleep(5L);  
                    } catch (InterruptedException e) {  
      
                        e.printStackTrace();  
                    }  
                    ;  
      
                    return ConsumeOrderlyStatus.SUCCESS;  
                }  
            });  
      
            consumer.start();  
      
            System.out.println("Consumer1 Started.");  
        }  
      
    }

    这个地方有一个大坑,注册监听类的时候,不能使用匿名内部类。不然的话,只会消费一次,然后消费者就 挂了……挂了……挂了…… 

    监听类要单独写!!!

    正确消费者写法:

    自定义监听类:

    MyMessageListener

    public class MyMessageListener implements  MessageListenerOrderly {
    
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            // 设置自动提交
            context.setAutoCommit(true);
            for (MessageExt msg : msgs) {
                System.out.println(msg + ",内容:" + new String(msg.getBody()));
            }
    
            try {
                TimeUnit.SECONDS.sleep(5L);
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
    
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

    Consumer.java   

    public class Consumer {
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
            consumer.setNamesrvAddr("101.200.33.225:9876");
    
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            consumer.subscribe("TopicOrderTest", "*");
    
            MyMessageListener myMessageListener = new MyMessageListener();
    consumer.registerMessageListener(myMessageListener);
    consumer.start(); System.out.println(
    "Consumer1 Started."); } }

    参考:https://www.cnblogs.com/antain/p/rocketmq.html

               http://www.cnblogs.com/520playboy/p/6750023.html

               http://dbaplus.cn/news-21-1123-1.html

  • 相关阅读:
    深入分析java的clone
    java实现文件夹遍历
    插入排序的C语言实现
    java动态代理工厂类
    HTML 理解标签 帧
    贝塞尔曲线初探
    警告:未初始化的变量引起运行问题
    解释string类型的输入操作符和getline函数分别如何处理空白符
    程序书写规范笔记
    vector
  • 原文地址:https://www.cnblogs.com/Jtianlin/p/8436024.html
Copyright © 2011-2022 走看看