zoukankan      html  css  js  c++  java
  • RocketMQ-消费重试机制

    介绍:

      RocketMQ的消息重试及时分为两种,一种是Producer端重试,一种是Consume端重试。

      1、Producer端重试 :
        1.1消息发没发成功,默认情况下是3次重试。
      2、Consumer端重试:
        2.1 exception的情况,一般重复16次 10s、30s、1mins、2mins、3mins等。注意reconsumeTimes这个参数;
        2.2 超时情况,这种情况MQ会无限制的发送给消费端。这种情况就是Consumer端没有返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,也没有返回ConsumeConcurrentlyStatus.RECONSUME_LATER。Consumner超时的情况我们还分为一个Producer和一个Consumer的场景一个Producer和多个Consumer(Consumer集群)的场景。下面的实例中我们会详细的做实验。

      问题,Consumer的默认超时时间是多少?

     示例:

      Producer端重试

        producer端重试的机制比较简单,我们通过看一下源码可以发现,通过设置retryTimesWhenSendFailed定义重试次数,通过设置sendMsgTimeout来定义超时时间

    producer.setRetryTimesWhenSendFailed(3); //默认是2
    producer.setSendMsgTimeout(6000); //默认是3000
    package org.hope.lee.producer;
    
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendCallback;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageQueue;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    public class Producer {
        public static void main(String[] args) {
            DefaultMQProducer producer = new DefaultMQProducer("push_consumer");
    //        producer.setNamesrvAddr("192.168.31.176:9876");
            producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            try {
                // 设置实例名称
                producer.setInstanceName("quick_start_producer");
                // 设置重试次数,默认2
                producer.setRetryTimesWhenSendFailed(3);
                //设置发送超时时间,默认是3000
                producer.setSendMsgTimeout(6000);
                // 开启生产者
                producer.start();
                // 创建一条消息
                Message msg = new Message("PushTopic_tt1", "TagB", "OrderID0034", "uniform_just_for_test".getBytes());
                SendResult send = producer.send(msg);
                System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus());
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
            producer.shutdown();
        }
    }

    Consumer端exception的情况。

    Producer端发送消息:

    package org.hope.lee.producer;
    
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendCallback;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageQueue;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    public class ProducerException {
        public static void main(String[] args) {
            DefaultMQProducer producer = new DefaultMQProducer("push_consumer");
            producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            try {
                producer.setInstanceName("quick_start_producer");
                producer.setRetryTimesWhenSendFailed(3);
                producer.start();
                for(int i = 0; i < 10; i++) {
                    Message msg = new Message("PushTopic_tt1", "TagA", "OrderID0034", ("message" + i).getBytes());
                    //目前发现3.2.6版本没有这个方法,3.5.3版本有这个方法,并且必须要设置为false否则会报错
    //                producer.setVipChannelEnabled(false);
                    SendResult send = producer.send(msg);
                    System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus());
                }
                
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
            producer.shutdown();
        }
    }

    Consumer端接收消息

    package org.hope.lee.consumer;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.sun.org.apache.xpath.internal.SourceTree;
    
    /**
     * 消费端重试的情况 :异常情况
     */
    public class ConsumerException {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("push_consumer");
            consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            // 批量消费,每次拉取10条
            consumer.setConsumeMessageBatchMaxSize(10);//// consumer.setInstanceName("quick_start_consumer");
            // 3.2.6这个版本没有这个方法,3.5.3版本要设置这个方法为false,否则取不到topic
            // consumer.setVipChannelEnabled(false);
    
            // 程序第一次启动从消息队列头取数据
            // 如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 订阅PushTopic下Tag为push的消息
            consumer.subscribe("PushTopic_tt1", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    
                    MessageExt msg = msgs.get(0);
                    try {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:topic:" + topic + ",tags:" + tags + ",msg:" + msg + "msgBody:" + msgBody);
                        if ("message4".equals(msgBody)) {
                            System.out.println("====失败消息开始=====");
                            System.out.println("msg:" + msg);
                            System.out.println("msgBody:" + msgBody);
                            System.out.println("====失败消息结束=====");
                            // 发生异常
                            int i = 1 / 0;
                            System.out.println(i);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        // ②如果重试了三次就返回成功
                        if (msg.getReconsumeTimes() == 3) {
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started.");
            // consumer.suspend();
    
        }
    }

     我们先启动ConsumerException再启动ProducerException。我们看到在①处发生了异常,这个时候消息就会进行重试,我们看看Consumer端的输出来分析一下。

    Consumer Started.
    收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=0, storeSize=149, queueOffset=4, sysFlag=0, bornTimestamp=1515158266232, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293401, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001603, commitLogOffset=5635, bodyCRC=886509244, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=5, MIN_OFFSET=0}, body=8]]msgBody:message0
    收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=1, storeSize=149, queueOffset=3, sysFlag=0, bornTimestamp=1515158266249, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293411, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001698, commitLogOffset=5784, bodyCRC=1137720874, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=4, MIN_OFFSET=0}, body=8]]msgBody:message1
    收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=3, storeSize=149, queueOffset=2, sysFlag=0, bornTimestamp=1515158266267, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293428, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F00000000000017C2, commitLogOffset=6082, bodyCRC=769548038, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=3, MIN_OFFSET=0}, body=8]]msgBody:message3
    收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=2, storeSize=149, queueOffset=2, sysFlag=0, bornTimestamp=1515158266259, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293420, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F000000000000172D, commitLogOffset=5933, bodyCRC=1524199312, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=3, MIN_OFFSET=0}, body=8]]msgBody:message2
    收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=0, storeSize=149, queueOffset=5, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293445, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001857, commitLogOffset=6231, bodyCRC=867879589, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=6, MIN_OFFSET=0}, body=8]]msgBody:message4
    ====失败消息开始=====
    msg:MessageExt [queueId=0, storeSize=149, queueOffset=5, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293445, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001857, commitLogOffset=6231, bodyCRC=867879589, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=6, MIN_OFFSET=0}, body=8]]
    msgBody:message4
    ====失败消息结束=====
    java.lang.ArithmeticException: / by zero
        at org.hope.lee.consumer.ConsumerException$1.consumeMessage(ConsumerException.java:55)
        at com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:142)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)
    收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=1, storeSize=149, queueOffset=4, sysFlag=0, bornTimestamp=1515158266292, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293452, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F00000000000018EC, commitLogOffset=6380, bodyCRC=1153301043, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=5, MIN_OFFSET=0}, body=8]]msgBody:message5
    收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=2, storeSize=149, queueOffset=3, sysFlag=0, bornTimestamp=1515158266300, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293482, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001A9D, commitLogOffset=6813, bodyCRC=1572121481, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=4, MIN_OFFSET=0}, body=8]]msgBody:message6
    收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=3, storeSize=149, queueOffset=3, sysFlag=0, bornTimestamp=1515158266328, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293491, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001B32, commitLogOffset=6962, bodyCRC=716413727, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=4, MIN_OFFSET=0}, body=8]]msgBody:message7
    收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=0, storeSize=149, queueOffset=6, sysFlag=0, bornTimestamp=1515158266337, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293497, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001BC7, commitLogOffset=7111, bodyCRC=973899406, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=7, MIN_OFFSET=0}, body=8]]msgBody:message8
    收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=1, storeSize=149, queueOffset=5, sysFlag=0, bornTimestamp=1515158266348, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293508, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001C5C, commitLogOffset=7260, bodyCRC=1292613144, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=6, MIN_OFFSET=0}, body=8]]msgBody:message9
    收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=0, storeSize=285, queueOffset=7, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158303482, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001CF1, commitLogOffset=7409, bodyCRC=867879589, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=3, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=8, MIN_OFFSET=0, REAL_QID=0}, body=8]]msgBody:message4
    ====失败消息开始=====
    msg:MessageExt [queueId=0, storeSize=285, queueOffset=7, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158303482, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001CF1, commitLogOffset=7409, bodyCRC=867879589, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=3, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=8, MIN_OFFSET=0, REAL_QID=0}, body=8]]
    msgBody:message4
    ====失败消息结束=====
    java.lang.ArithmeticException: / by zero
        at org.hope.lee.consumer.ConsumerException$1.consumeMessage(ConsumerException.java:55)
        at com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:142)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)
    收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=0, storeSize=285, queueOffset=8, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158333701, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001F2A, commitLogOffset=7978, bodyCRC=867879589, reconsumeTimes=2, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=4, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=9, MIN_OFFSET=0, REAL_QID=0}, body=8]]msgBody:message4
    ====失败消息开始=====
    msg:MessageExt [queueId=0, storeSize=285, queueOffset=8, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158333701, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001F2A, commitLogOffset=7978, bodyCRC=867879589, reconsumeTimes=2, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=4, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=9, MIN_OFFSET=0, REAL_QID=0}, body=8]]
    msgBody:message4
    ====失败消息结束=====
    java.lang.ArithmeticException: / by zero
        at org.hope.lee.consumer.ConsumerException$1.consumeMessage(ConsumerException.java:55)
        at com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:142)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)
    收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=0, storeSize=285, queueOffset=9, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158394718, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000002163, commitLogOffset=8547, bodyCRC=867879589, reconsumeTimes=3, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=5, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=10, MIN_OFFSET=0, REAL_QID=0}, body=8]]msgBody:message4
    ====失败消息开始=====
    msg:MessageExt [queueId=0, storeSize=285, queueOffset=9, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158394718, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000002163, commitLogOffset=8547, bodyCRC=867879589, reconsumeTimes=3, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=5, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=10, MIN_OFFSET=0, REAL_QID=0}, body=8]]
    msgBody:message4
    ====失败消息结束=====
    java.lang.ArithmeticException: / by zero
        at org.hope.lee.consumer.ConsumerException$1.consumeMessage(ConsumerException.java:55)
        at com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:142)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)
    收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=0, storeSize=285, queueOffset=10, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158514742, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F000000000000239C, commitLogOffset=9116, bodyCRC=867879589, reconsumeTimes=4, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=6, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=11, MIN_OFFSET=0, REAL_QID=0}, body=8]]msgBody:message4
    ====失败消息开始=====
    msg:MessageExt [queueId=0, storeSize=285, queueOffset=10, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158514742, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F000000000000239C, commitLogOffset=9116, bodyCRC=867879589, reconsumeTimes=4, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=6, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=11, MIN_OFFSET=0, REAL_QID=0}, body=8]]
    msgBody:message4
    ====失败消息结束=====
    java.lang.ArithmeticException: / by zero
        at org.hope.lee.consumer.ConsumerException$1.consumeMessage(ConsumerException.java:55)
        at com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:142)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)
    java.lang.ArithmeticException: / by zero
        at org.hope.lee.consumer.ConsumerException$1.consumeMessage(ConsumerException.java:55)
        at com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:142)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)

       我们看到最初消费到第四条数据的时候抛出异常,然后继续消费了剩下的6条消息,过了一段时间MQ又重试发送消息给Consumer端,又抛出异常,过一段时间MQ还是会发送消息过来。我么恩看一下用蓝色标注的字体,这个就是记录重试次数的字段。我们可以利用这个字段进行业务逻辑处理,比如重试了3次就返回SUCCESS了,就不再重试了。比如Consumern代码中的②处

      customer端发生超时的情况讨论。

      在消费端,我们有两个Consumer来组成一个集群。ConsumerClusterMember1中我们设置一个Thread.sleep()来模拟一个消费超时的状态。

      实验的步骤如下:

      一、先启动ConsumerClusterMember1

      二、再启动ConsumerClusterMember2。

      三、然后启动ProducerCluster来发送一条消息。

      四、这个时候ConsumerClusterMember1可能会先接收到这条消息,然后sleep等待了。这个时候我们停掉它的JVM

      五、过了一段时间,ConsumerClusterMember2就会重新收到Producer端发过来的消息。完成了Consumer端集群的消息超时。

      下面我们看看代码

    Produder端:

    package org.hope.lee.producer;
    
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendCallback;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageQueue;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    /**
     * 配合ConsumerClusterMember1,2做测试
     */
    public class ProducerCluster {
        public static void main(String[] args) {
            DefaultMQProducer producer = new DefaultMQProducer("producer_cluster");
            producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            try {
                // 设置实例名称
                producer.setInstanceName("producer_cluster_name");
                // 开启生产者
                producer.start();
                // 创建一条消息
                Message msg = new Message("cluster_timeout_test", "TagA", "OrderID0034", "customer_cluster_test".getBytes());
                SendResult send = producer.send(msg);
                System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus());
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
            producer.shutdown();
        }
    }

    消费端集群:

    package org.hope.lee.consumer;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    public class ConsumerClusterMember1 {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_cluster");
            consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            // 批量消费,每次拉取10条
            consumer.setConsumeMessageBatchMaxSize(10);
            // 如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 订阅PushTopic下Tag为push的消息
            consumer.subscribe("cluster_timeout_test", "TagA || Tag B || Tage C");
            consumer.registerMessageListener(new MqMessageListener1());
            consumer.start();
            System.out.println("Consumer Started.");
    
        }
    }
    class MqMessageListener1 implements MessageListenerConcurrently{
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                MessageExt msg = msgs.get(0);
                System.out.println("------消息处理中--------");
                Thread.sleep(60000);
                String msgBody = new String(msg.getBody(), "utf-8");
                System.out.println("msgBody:" + msgBody);
            } catch(Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        
    }
    package org.hope.lee.consumer;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    public class ConsumerClusterMember2 {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_cluster");
            consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            // 批量消费,每次拉取10条
            consumer.setConsumeMessageBatchMaxSize(10);
            // 如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 订阅PushTopic下Tag为push的消息
            consumer.subscribe("cluster_timeout_test", "TagA || Tag B || Tage C");
            consumer.registerMessageListener(new MqMessageListener2());
            consumer.start();
            System.out.println("Consumer Started.");
    
        }
    }
    
    class MqMessageListener2 implements MessageListenerConcurrently{
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                MessageExt msg = msgs.get(0);
                String msgBody = new String(msg.getBody(), "utf-8");
                System.out.println("msgBody:" + msgBody);
            } catch(Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        
    }

    https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api

    参考:

    [1]博客,http://blog.csdn.net/u010634288/article/details/56049305

  • 相关阅读:
    【转】CDH rpm+http方式离线部署
    处理CDH环境Hadoop:NameNode is not formatted
    使用 prometheus-operator 监控 Kubernetes 集群【转】
    Prometheus监控k8s(10)-PrometheusOperator-更优雅的Prometheus部署【转】
    kali 网络配置作死踩坑
    01 校招信息收集渠道
    Pypora打开markdown(md)文件保存为PDF文件
    Github上的md文件查看后下载PDF文档方法(将HTML文件保存为PDF格式)
    利用后裔采集器快速采集文本数据(以京东为例)
    office安装公式编辑器mathtype6.9及mathtype过期解决方案
  • 原文地址:https://www.cnblogs.com/happyflyingpig/p/8207003.html
Copyright © 2011-2022 走看看