zoukankan      html  css  js  c++  java
  • RocketMQ(5)---RocketMQ重试机制

    RocketMQ重试机制

    消息重试分为两种:Producer发送消息的重试Consumer消息消费的重试

    一、Producer端重试

    Producer端重试是指: Producer往MQ上发消息没有发送成功,比如网络原因导致生产者发送消息到MQ失败。

    看一下代码:

    @Slf4j
    public class RocketMQTest {
        /**
         * 生产者组
         */
        private static String PRODUCE_RGROUP = "test_producer";
      
        public static void main(String[] args) throws Exception {
            //1、创建生产者对象
            DefaultMQProducer producer = new DefaultMQProducer(PRODUCE_RGROUP);
            //设置重试次数(默认2次)
            producer.setRetryTimesWhenSendFailed(3000);
            //绑定name server
            producer.setNamesrvAddr("74.49.203.55:9876");
            producer.start();
            //创建消息
            Message message = new Message("topic_family", ("小小今年3岁" ).getBytes());
            //发送 这里填写超时时间是5毫秒 所以每次都会发送失败
            SendResult sendResult = producer.send(message,5);
            log.info("输出生产者信息={}",sendResult);
        }
    }
    

    超时重试 针对网上说的超时异常会重试的说法都是错误的,想想都觉得可怕,我查的所以文章都说超时异常都会重试,难道这么多人都没有去测试一下 或者去看个源码。

    我发现这个问题,是因为我上面超时时间设置为5毫秒 ,按照正常肯定会报超时异常,但我设置1次重试和3000次的重试,虽然最终都会报下面异常,但输出错误时间报

    显然不应该是一个级别。但测试发现无论我设置的多少次的重试次数,报异常的时间都差不多。

    org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
    

    针对这个疑惑,我去看了源码之后,才恍然大悟。

       /**
         * 说明 抽取部分代码
         */
        private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) {
            
            //1、获取当前时间
            long beginTimestampFirst = System.currentTimeMillis();
            long beginTimestampPrev ;
            //2、去服务器看下有没有主题消息
            TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
            if (topicPublishInfo != null && topicPublishInfo.ok()) {
                boolean callTimeout = false;
                //3、通过这里可以很明显看出 如果不是同步发送消息 那么消息重试只有1次
                int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
                //4、根据设置的重试次数,循环再去获取服务器主题消息
                for (times = 0; times < timesTotal; times++) {
                    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                    beginTimestampPrev = System.currentTimeMillis();
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    //5、前后时间对比 如果前后时间差 大于 设置的等待时间 那么直接跳出for循环了 这就说明连接超时是不进行多次连接重试的
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
    
                    }
                    //6、如果超时直接报错
                    if (callTimeout) {
                        throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
                    }
            }
        }
    

    通过这段源码很明显可以看出以下几点

    1. 如果是异步发送 那么重试次数只有1次
    2. 对于同步而言,超时异常也是不会再去重试
    3. 如果发生重试是在一个for 循环里去重试,所以它是立即重试而不是隔一段时间去重试。

    真是实践出真知!!!


    二、 Consumer端重试

    消费端比较有意思,而且在实际开发过程中,我们也更应该考虑的是消费端的重试。

    消费者端的失败主要分为2种情况,ExceptionTimeout

    1、Exception

    @Slf4j
    @Component
    public class Consumer {
        /**
         * 消费者实体对象
         */
        private DefaultMQPushConsumer consumer;
        /**
         * 消费者组
         */
        public static final String CONSUMER_GROUP = "test_consumer";
        /**
         * 通过构造函数 实例化对象
         */
        public Consumer() throws MQClientException {
            consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
            consumer.setNamesrvAddr("47.99.203.55:9876;47.99.203.55:9877");
            //订阅topic和 tags( * 代表所有标签)下信息
            consumer.subscribe("topic_family", "*");
            //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                //1、获取消息
                Message msg = msgs.get(0);
                try {
                    //2、消费者获取消息
                    String body = new String(msg.getBody(), "utf-8");
                    //3、获取重试次数
                    int count = ((MessageExt) msg).getReconsumeTimes();
                    log.info("当前消费重试次数为 = {}", count);
                    //4、这里设置重试大于3次 那么通过保存数据库 人工来兜底
                    if (count >= 2) {
                        log.info("该消息已经重试3次,保存数据库。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    //直接抛出异常
                    throw new Exception("=======这里出错了============");
                    //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            });
            //启动监听
            consumer.start();
        }
    }
    

    这里的代码意思很明显: 主动抛出一个异常,然后如果超过3次,那么就不继续重试下去,而是将该条记录保存到数据库由人工来兜底。

    看下运行结果

    注意 消费者和生产者的重试还是有区别的,主要有两点

    1、默认重试次数:Product默认是2次,而Consumer默认是16次

    2、重试时间间隔:Product是立刻重试,而Consumer是有一定时间间隔的。它照1S,5S,10S,30S,1M,2M····2H进行重试。
    3、Product在异步情况重试失效,而对于Consumer在广播情况下重试失效。

    2、Timeout

    说明 这里的超时异常并非真正意义上的超时,它指的是指获取消息后,因为某种原因没有给RocketMQ返回消费的状态,即没有return ConsumeConcurrentlyStatus.CONSUME_SUCCESSreturn ConsumeConcurrentlyStatus.RECONSUME_LATER

    那么 RocketMQ会认为该消息没有发送,会一直发送。因为它会认为该消息根本就没有发送给消费者,所以肯定没消费。

    做这个测试很简单。

            //1、消费者获得消息
            String body = new String(msg.getBody(), "utf-8");
            //2、获取重试次数
            int count = ((MessageExt) msg).getReconsumeTimes();
            log.info("当前消费重试次数为 = {}", count);
            //3、这里睡眠60秒
            Thread.sleep(60000);
           log.info("休眠60秒 看还能不能走到这里。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body);
            //返回成功
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    

    当获得 当前消费重试次数为 = 0 后 , 关掉该进程。再重新启动该进程,那么依然能够获取该条消息

    consumer消费者  当前消费重试次数为 = 0
    休眠60秒 看还能不能走到这里。topic=topic_family,keys=1a2b3c4d5f,msg=小小今年3岁
    


    只要自己变优秀了,其他的事情才会跟着好起来(上将2)
    
  • 相关阅读:
    如何提高软件可维护性
    UML系列 (二)四种关系
    软件工程
    软件工程需求规格说明书
    机房收费系统可行性分析报告
    ThreadStaticAttribute 的使用
    WPF:Border 控件
    几篇介绍在页面中引用脚本文件的技术文档
    收集两篇介绍 Fildder 的文章
    收集三篇关于数据库主键设计的文章
  • 原文地址:https://www.cnblogs.com/qdhxhz/p/11117379.html
Copyright © 2011-2022 走看看