zoukankan      html  css  js  c++  java
  • RocketMQ学习笔记(13)----RocketMQ的Consumer消息重试

    1. 概念

      Producer端重试:

      生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。 
    这种消息失败重试我们可以手动设置发送失败重试的次数。

      Consumer端重试:

      Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次,Consumer消费消息失败通常可以认为有以下几种情况

      1. 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其他消息,而且这条失败消息即使立刻重试消费,99%也不成功,所以最后提供一种定时的重试机制,即过10s再重试。

      2. 由于依赖下游应用服务不可用,例如db连接不可用,外系统网络不可达等。

      遇到这种错误,即使跳过当前失败的消息,消费其他消息也会报错,这种情况下建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。

    2. Broker消息重试策略

      查看broker.log文件,可以看到启动有很多的启动参数,其中有一条如下:

      这里就表示的是消息重试的时间,1s,5s....的间隔时间后再进行消息的重试,这里是消息消费的消息重试。

    3. Producer端消息重试实现

    package com.wangx.rocketmq.quickstart;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    public class Producer {
    
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
            DefaultMQProducer producer = new DefaultMQProducer("myGroup");
    
            producer.setNamesrvAddr("localhost:9876");
    
            producer.start();
         //重试三秒 producer.setRetryTimesWhenSendFailed(
    3); for (int i = 0; i < 10; i++) { Message message = new Message("MyTopic", "tabA", ("Hello World" + i).getBytes());
           //超时时间 SendResult result
    = producer.send(message,100); System.out.println(result); } producer.shutdown(); } }

    如果消息在100ms之内发送失败,就重试三次

    4. Consumer使用方式

      在Consumer中,当消费消息的处理过程中,出现异常时,我们通常返回的是RECONSUME_LATER,表示一会儿之后再重试,当返回了这个状态之后,broker就会按照2中的时间间隔来重试消息。当然,最大也只能重试到2h.

      在实际的运用场景中,我们并不想要消息无止境的一直重试下去,可能我们回想要消息重试几次之后,还是不能成功的情况下就将这条消息存储到db或log文件中,所以此时我们可以这样实现:

    package com.wangx.rocketmq.quickstart;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    public class Consumer {
    
        public static void main(String[] args) throws MQClientException {
            final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyConsumerGroup");
    
            consumer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            consumer.subscribe("MyTopic", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        MessageExt ext = msgs.get(0);
                        int x = 0;
                        String topic = ext.getTopic();
                        String body = new String(ext.getBody(),"utf-8");
                        if (Integer.parseInt(body) % 2 == 0) {
                            //产生异常
                            x = Integer.parseInt(body) / 0;
                        }
                        System.out.println("收到来自topic: " + topic + ",的消息:" + body);
                    } catch (Exception e) {
                        try {
                            MessageExt ext = msgs.get(0);
                            String topic = ext.getTopic();
                            String body = new String(ext.getBody(),"utf-8");
                            if (ext.getReconsumeTimes() == 3) {
    
                                //模拟将消息保存到db或日志文件中,返回成功状态,使消息不再重试
                                System.out.println("保存成功消息:" + body + "成功!!!");
                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                            }
                            System.err.println("err:收到来自topic: " + topic + ",的消息:" + body);
                        } catch (Exception e1) {
                            e1.printStackTrace();
                        }
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
    
            });
            consumer.start();
        }
    }

      控制台打印结果如下:

    收到来自topic: MyTopic,的消息:1
    err: 收到来自topic: MyTopic,的消息:2
    err: 收到来自topic: MyTopic,的消息:6
    err: 收到来自topic: MyTopic,的消息:2
    err: 收到来自topic: MyTopic,的消息:6
    收到来自topic: MyTopic,的消息:7
    err: 收到来自topic: MyTopic,的消息:0
    err: 收到来自topic: MyTopic,的消息:8
    err: 收到来自topic: MyTopic,的消息:4
    收到来自topic: MyTopic,的消息:3
    收到来自topic: MyTopic,的消息:5
    收到来自topic: MyTopic,的消息:9
    err: 收到来自topic: MyTopic,的消息:0
    err: 收到来自topic: MyTopic,的消息:8
    err: 收到来自topic: MyTopic,的消息:4
    err: 收到来自topic: MyTopic,的消息:2
    err: 收到来自topic: MyTopic,的消息:6
    err: 收到来自topic: MyTopic,的消息:0
    err: 收到来自topic: MyTopic,的消息:8
    err: 收到来自topic: MyTopic,的消息:4
    保存成功消息:2成功!!!
    保存成功消息:6成功!!!
    保存成功消息:0成功!!!
    保存成功消息:8成功!!!
    保存成功消息:4成功!!

      可以看到奇数的时候,正常消费,当消息为偶数时,会抛出异常,此时返回的是RECONSUME_LATER,所以消息将会重试消费,在MessageExt中保存了一个属性叫reconsumeTimes,表示消息重试次数,我们这里使用当消息重试三次之后,模拟将消息保存到db或日志文件中的操作,然后返回CONSUME_SUCCESS,结束消息的重试。这样就可以保证消息出现异常时我们可以做适当的操作避免消息一直重试或对于消息无法消费情况做一些补偿操作。

  • 相关阅读:
    linux安装glibc2.14
    Redhat下如何查看nvidia显卡的工作状况
    Tensorflow下指定显卡占用比例参数配置
    Linux系统终端session保持服务工具-Tmux
    error while loading shared libraries: libevent-2.1.so.6 的解决办法
    机器学习环境配置系列六之jupyter notebook远程访问
    机器学习环境配置系列五之keras2
    机器学习环境配置系列四之theano
    LeetCode 27
    LeetCode 26
  • 原文地址:https://www.cnblogs.com/Eternally-dream/p/9958490.html
Copyright © 2011-2022 走看看