zoukankan      html  css  js  c++  java
  • RocketMQ源码 — 八、 RocketMQ消息重试

    RocketMQ的消息重试包含了producer发送消息的重试和consumer消息消费的重试。

    producer发送消息重试

    producer在发送消息的时候如果发送失败了,RocketMQ会自动重试。

    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
    
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            // 这是调用的总次数
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
    	// 省略部分代码...
    }
    

    重试几次?

    由上面可以看出发送消息的重试次数区分不同的情况:

    • 同步发送:org.apache.rocketmq.client.producer.DefaultMQProducer#retryTimesWhenSendFailed + 1,默认retryTimesWhenSendFailed是2,所以除了正常调用一次外,发送消息如果失败了会重试2次
    • 异步发送:不会重试(调用总次数等于1)

    什么时候重试?

    发生异常的时候,需要注意的是发送的时候并不是catch所有的异常,只有内部异常才会catch住并重试。

    怎么重试?

    每次重试都会重新进行负载均衡(会考虑发送失败的因素),重新选择MessageQueue,这样增大发送消息成功的可能性。

    隔多久重试?

    立即重试,中间没有单独的间隔时间。

    consumer消费重试

    消息处理失败之后,该消息会和其他正常的消息一样被broker处理,之所以能重试是因为consumer会把失败的消息发送回broker,broker对于重试的消息做一些特别的处理,供consumer再次发起消费 。

    消息重试的主要流程:

    1. consumer消费失败,将消息发送回broker
    2. broker收到重试消息之后置换topic,存储消息
    3. consumer会拉取该topic对应的retryTopic的消息
    4. consumer拉取到retryTopic消息之后,置换到原始的topic,把消息交给listener消费

    consumer发送重试消息给broker

    以非顺序消息为例说明消息消费重试,首先,在消息消费失败后consumer会把消息发送回broker

    // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
    public void run() {
        // 省略部分代码...
        	// 这个status是listener返回的,用户可以指定status,如果业务逻辑代码消费消息失败后可以返回org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus#RECONSUME_LATER
        	// 来告诉RocketMQ需要重新消费
        	// 如果是多个消息,用户还可以指定从哪一个消息开始需要重新消费
            status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
        } catch (Throwable e) {
            log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                RemotingHelper.exceptionSimpleDesc(e),
                ConsumeMessageConcurrentlyService.this.consumerGroup,
                msgs,
                messageQueue);
            hasException = true;
        }
        long consumeRT = System.currentTimeMillis() - beginTimestamp;
    	
    	// 根据不同的status判断是否成功
        if (null == status) {
            if (hasException) {
                returnType = ConsumeReturnType.EXCEPTION;
            } else {
                returnType = ConsumeReturnType.RETURNNULL;
            }
        } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
            returnType = ConsumeReturnType.TIME_OUT;
        } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
            returnType = ConsumeReturnType.FAILED;
        } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
            returnType = ConsumeReturnType.SUCCESS;
        }
    
        if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
            consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
        }
    
    	// 用户返回null或者抛出未处理的异常,RocketMQ默认会重试
        if (null == status) {
            log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                ConsumeMessageConcurrentlyService.this.consumerGroup,
                msgs,
                messageQueue);
            status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    	
        if (!processQueue.isDropped()) {
            // 上面的结果在这个方法中具体处理
            ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
        } else {
            log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
        }
    }
    

    上面这个方法区分出不同的消费结果:

    • org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus#CONSUME_SUCCESS:消费成功,如果多个消息,用户可以指定从哪一个消息开始重试
    • org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus#RECONSUME_LATER:重试所有的消息
      • 用户返回status为RECONSUME_LATER
      • 用户返回null
      • 用户业务逻辑处理抛出异常

    在确定是否需要重试的时候,进一步处理哪些消息需要重试,也就是哪些消息会发送回broker

    public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
    	// 从哪里开始重试
    	// ackIndex默认是int最大值,除非用户自己指定了从哪些消息开始重试
        int ackIndex = context.getAckIndex();
    
        if (consumeRequest.getMsgs().isEmpty())
            return;
    
        switch (status) {
            case CONSUME_SUCCESS:
            	// 即使是CONSUME_SUCCESS,也可能部分消息需要重试
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
            	// 如果status是RECONSUME_LATER的时候会所有消息都会重试所以ackIndex设为-1
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }
    
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
            	// 广播的消息不会重试
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
            	// 集群消费的消息才会重试
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    // 将消息发送回broker
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }
    
                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);
                    // 如果上面发送失败后后面会重新发送
    
                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }
    
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        	// 更新消费进度
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }
    

    consumer发送消费失败的消息和普通的producer发送消息的调用路径前面不太一样,其中关键的区别是下面的方法

    // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#sendMessageBack
    public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        try {
            String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
                : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
                this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
        } catch (Exception e) {
            log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
    		// 如果消费失败的消息发送回broker失败了,会再重试一次,和try里面的方法不一样的地方是这里直接修改topic
            // 为重试topic然后和producer发送消息的方法一样发送到broker
            Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
    
            String originMsgId = MessageAccessor.getOriginMessageId(msg);
            MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
    
            newMsg.setFlag(msg.getFlag());
            MessageAccessor.setProperties(newMsg, msg.getProperties());
            MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
            MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
            MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
            newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
    
            this.mQClientFactory.getDefaultMQProducer().send(newMsg);
        }
    }
    
    // org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack
    public void consumerSendMessageBack(
        final String addr,
        final MessageExt msg,
        final String consumerGroup,
        final int delayLevel,
        final long timeoutMillis,
        final int maxConsumeRetryTimes
    ) throws RemotingException, MQBrokerException, InterruptedException {
        ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
        // 和普通的发送消息的RequestCode不一样,broker处理的方法也不一样
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
    
        requestHeader.setGroup(consumerGroup);
        // 因为重试的消息被broker拿到后会修改topic,所以这里设置原始的topic
        requestHeader.setOriginTopic(msg.getTopic());
        // broker会根据offset查询原始的消息
        requestHeader.setOffset(msg.getCommitLogOffset());
        // 设置delayLevel,这个值决定了该消息是否会被延时消费、延时多久,
        // 用户可以设置延时等级,默认是0,不延时(但是broker端会有逻辑:如果为0会加3)
        requestHeader.setDelayLevel(delayLevel);
        // 设置最初的msgId
        requestHeader.setOriginMsgId(msg.getMsgId());
        // 设置最多被重试的次数,默认是16
        requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
    
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return;
            }
            default:
                break;
        }
    
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }
    

    发送重试的消息的时候有几个关键属性:

    originTopic:因为重试的消息被broker拿到后会修改topic,投递到所以需要保留一个原始的topic

    delayLevel:该消息是否会被延时消费

    maxReconsumeTimes:这个消息最多可以重试(消费)多少次

    broker接收重试消息

    broker处理重试消息的方式和普通消息略有不同

    1. 检查是否配置了重试的消息队列,队列是否可写
    2. 查询原始消息
    3. 判断是否超过最大重试次数或者delayLevel小于0,消息不会被重试,而是会被投递到死信队列(不会再被消费),topic是%DLQ%+group
    4. 如果delayLevel是0,0表示会被延时10s(如果是默认的延时等级,关于延时消息的部分详见:这一篇
    5. 根据原始消息构造新消息保存,差异字段为:
      1. topic:%RETRY%+group
      2. reconsumeTimes:原来的reconsumeTimes + 1,也就是说每重试一次就加1
      3. queueId:使用新的topic的queueId
      4. 新增properties:ORIGIN_MESSAGE_ID,RETRY_TOPIC(如果原来没有的话)
    // 代码不再赘述,主要方法是
    org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
    

    consumer拉取重试的消息

    按照正常的消息消费流程,消息保存在broker之后,consumer就可以拉取消费了,和普通消息不一样的是拉取消息的并不是consumer本来订阅的topic,而是%RETRY%+group。

    这里一直默认一开始retryTopic本身存在,这里说明一下retryTopic的来源,retryTopic创建的时机有以下几个:

    1. consumer启动后会向broker发送heartbeat数据,如果broker中还没有对应的SubscriptionGroupConfig

      信息,会创建对应topic的retryTopic:org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeatbroker

    2. broker在接收到consumer发送回来的重试的时候,如果还没有创建retryTopic的topicConfig配置,则会新建:org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#msgCheck

    3. broker在处理consumer发送回来的重试消息的时候会创建retryTopic:org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack

    broker创建retryTopic之后,和正常的topic配置一样同步到namesrv,然后consumer就可以从namesrv获取到retryTopic配置了。

    所以consumer会拉取%RETRY%+group对应的消息:

    1. consumer发送重试消息给broker以后,broker存储在新的retryTopic下,作为一个新的topic,consume会拉取这个新的topic的消息
    2. consumer拉取到这个retryTopic的消息之后再把topic换成原来的topic:org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#resetRetryTopic,然后交给consume的listener处理

    总结

    在业务处理出错的时候,常常需要重新处理,这个时候业务可以返回RECONSUME_LATER,RocketMQ就会重新将消息发送回broker,让consumer重试。而且,用户也可以根据实际情况,指定一些配置,比如:重试次数,是否延时消费等。但是需要注意的是如果业务抛出异常后无需重试,一定要catch住所有异常,避免把异常抛给RocketMQ,否则RocketMQ会认为该消息需要重试,当然也不能返回null。

  • 相关阅读:
    将vue文件script代码抽取到单独的js文件
    git pull 提示错误:Your local changes to the following files would be overwritten by merge
    vue和uniapp 配置项目基础路径
    XAMPP Access forbidden! Access to the requested directory is only available from the local network.
    postman与newman集成
    postman生成代码段
    Curl命令
    POST方法的Content-type类型
    Selenium Grid 并行的Web测试
    pytorch转ONNX以及TnesorRT的坑
  • 原文地址:https://www.cnblogs.com/sunshine-2015/p/9011446.html
Copyright © 2011-2022 走看看