zoukankan      html  css  js  c++  java
  • 消息队列(七)--- RocketMQ延时发送和消息重试(半原创)

    本文图片和部分总结来自于参考资料,半原创,侵删
    

    问题

    • Rocketmq 重试是否有超时问题,假如超时了如何解决,是重新发送消息呢?还是一直等待
    • 假如某个 msg 进入了重试队列(%RETRY_XXX%),然后成功消费了

    概述

        文章介绍了RocketMQ 的重试机制和消息重试的机制。

    定时任务

    定时任务概述

        rocketmq为定时任务创建一个单独的 topic ,而 rocketmq的定时任务是定的时间是分等级的,而不同等级对应topic内不同的队列,然后通过一个“执行定时任务的服务”定时执行多个队列内的任务,执行时需要更改该定时任务实际要发送的 topic 和 tag 。

    发送例子

    发送例子

    Message msg =
        new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
    msg.setDelayTimeLevel(i + 1);
    
    

        时间等级

    public class MessageStoreConfig {
    
        private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
        
    }
    
    

    写入定时任务

        写入的时候是在写入commitLog 的时候写入的,这一点很重要,因为这也是实现消费失败重试的基础。 CommitLog 会将这条消息的话题和队列 ID 替换成专门用于定时的话题和相应的级别对应的队列 ID。真实的话题和队列 ID 会作为属性放置到这条消息中,后面处理的时候会自己从这个队列id 进行发送消息。

        public class CommitLog {
    
            public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    
                // Delay Delivery
                if (msg.getDelayTimeLevel() > 0) {
    
                    topic = ScheduleMessageService.SCHEDULE_TOPIC;
                    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    
                    // Backup real topic, queueId
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    
                    // 替换 Topic 和 QueueID
                    msg.setTopic(topic);
                    msg.setQueueId(queueId);
                }
                
            }
            
        }
    
    

    处理定时任务

        执行定时任务的服务,ScheduleMessageService 的 start 方法

        public void start() {
    
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }
    
                if (timeDelay != null) {
                    //Timer 持有多个定时任务,然后时间到了就执行该任务,
                    // 但是 Timer 内部只有一个线程在执行任务,也就不能保证时间的正确性(因为当一个线程在执行的时候,某个任务的时间已经到了)
                    // 注意,是为每个延时时间等级建一个任务Task
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }
    
            this.timer.scheduleAtFixedRate(new TimerTask() {
    
                @Override
                public void run() {
                    try {
                        ScheduleMessageService.this.persist();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e);
                    }
                }
            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
        }
    
    
    class DeliverDelayedMessageTimerTask extends TimerTask {
    
        public void executeOnTimeup() {
            // ...
            for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                // 是否到时间
                long countdown = deliverTimestamp - now;
    
                if (countdown <= 0) {
                    // 取出消息
                    MessageExt msgExt =
                        ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
                    // 修正消息,设置上正确的话题和队列 ID
                    MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                    // 重新存储消息
                    PutMessageResult putMessageResult =
                        ScheduleMessageService.this.defaultMessageStore
                        .putMessage(msgInner);
                } else {
                    // countdown 后投递此消息
                    ScheduleMessageService.this
                        .timer
                        .schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
                    // 更新偏移量
                }
            } // end of for
    
            // 更新偏移量
        }
        
    }
    
    

        同时该定时任务也进行持久化,一个是消费进度,一个消息对应的位移量

    1297993-20200106151538670-2043955717.png

    1297993-20200106151602350-1881388344.png

    消息消费重试

        RocketMQ中遇到以下情况就会进行消息重试 :

    • 抛出异常
    • 返回 NULL 状态
    • 返回 RECONSUME_LATER 状态
    • 超时 15 分钟没有响应

    1297993-20200107155108508-1498228666.png

    consumer 注册订阅重试队列

        consumer 在启动的时候就会订阅“%RETRY_XXX%”的topic,为的就是当某个topic消费失败处理重试消息。如下图所示 :

    1297993-20200106162252037-330088757.png

    public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    
        public synchronized void start() throws MQClientException {
            switch (this.serviceState) {
            case CREATE_JUST:
                // ...
                this.copySubscription();
                // ...
            
                this.serviceState = ServiceState.RUNNING;
                break;
            }
        }
    
        private void copySubscription() throws MQClientException {
            switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                break;
                
            case CLUSTERING:
                // 重试话题组
                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                                                                                    retryTopic, SubscriptionData.SUB_ALL);
                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                break;
                
            default:
                break;
            }
        }
        
    }
    
    

    超时消费

        我们思考一个问题,假如消费者掉线了,那么消息直接发不过去了,而要是消费者的消费逻辑执行了太久的业务逻辑,那么应该有一个动作来触发 消费超时,进行重试.

    ConsumeMessageConcurrentlyService 的 start 方法。

        public void start() {
            this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    cleanExpireMsg();
                }
    
            }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
        }
    
    

    这个定时周期任务每过 getConsumeTimeout 时间就会扫描消费超时的任务,调用 sendMessageBack 方法,该方法会调用 RPC发送消息给 broker ,消费失败进行重试。

        上一篇我们讲到消息消费的过程,当集群模式下,消息消费成功会本地的消息消费进度,而失败了会调用RPC 发送消息给broker ,而broker 处理的逻辑在 SendMessageProcessor

        @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            SendMessageContext mqtraceContext;
            switch (request.getCode()) {
    
                //消费者消费失败的情况
                case RequestCode.CONSUMER_SEND_MSG_BACK:
                    return this.consumerSendMsgBack(ctx, request);
                default:
    
                    SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                    if (requestHeader == null) {
                        return null;
                    }
    
                    mqtraceContext = buildMsgContext(ctx, requestHeader);
                    this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
    
                    RemotingCommand response;
                    if (requestHeader.isBatch()) {
                        response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                    } else {
                        response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
                    }
    
                    this.executeSendMessageHookAfter(response, mqtraceContext);
                    return response;
            }
        }
    
    

      需要注意的是 consumerTimeOut 的时间是 15 分钟,生产的时候可以配置短点。 

    批量处理的问题

        批量处理一批数据要是返回 RECONSUME_LATER ,那么这批数据就会重新发给 broker ,进行消息重试,所以在业务逻辑的时候就要考虑消费者重新消费的幂等性。

        ConsumeRequest的 run 方法

            @Override
            public void run() {
                ....
    
                try {
                    ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
                    if (msgs != null && !msgs.isEmpty()) {
                        for (MessageExt msg : msgs) {
                            MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                        }
                    }
                    //NO.1 业务实现
                    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
                } catch (Throwable e) {
                    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                            RemotingHelper.exceptionSimpleDesc(e),
                            ConsumeMessageConcurrentlyService.this.consumerGroup,
                            msgs,
                            messageQueue);
                    hasException = true;
                }
    
                ...
    
                if (!processQueue.isDropped()) {
                    //NO.2 处理消息消费的结果
                    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
                } else {
                    log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
                }
            }

    我们可以设置最大批量处理的数量为 1 ,那么就会针对每一条消息进行重试,但是那样的话就会性能相对于批量处理肯定差一些。 

    ack 机制

        public void processConsumeResult(
                final ConsumeConcurrentlyStatus status,
                final ConsumeConcurrentlyContext context,
                final ConsumeRequest consumeRequest
        ) {
            int ackIndex = context.getAckIndex();
    
            if (consumeRequest.getMsgs().isEmpty())
                return;
    
            switch (status) {
                case CONSUME_SUCCESS:
                    if (ackIndex >= consumeRequest.getMsgs().size()) {
                        ackIndex = consumeRequest.getMsgs().size() - 1;
                    }
                    int ok = ackIndex + 1;
                    int failed = consumeRequest.getMsgs().size() - ok;
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                    break;
                case RECONSUME_LATER:
                    ackIndex = -1;
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                            consumeRequest.getMsgs().size());
                    break;
                default:
                    break;
            }
    
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                        MessageExt msg = consumeRequest.getMsgs().get(i);
                        log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                    }
                    break;
                case CLUSTERING:
                    //发送给broker , 该批数据进行消息重试
                    List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                        MessageExt msg = consumeRequest.getMsgs().get(i);
                        boolean result = this.sendMessageBack(msg, context);
                        if (!result) {
                            msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                            msgBackFailed.add(msg);
                        }
                    }
    
                    if (!msgBackFailed.isEmpty()) {
                        consumeRequest.getMsgs().removeAll(msgBackFailed);
    
                        this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                    }
                    break;
                default:
                    break;
            }
    
            //删除消息树中的已消费的消息节点,并返回消息树中最小的节点,更新最小的节点为当前进度!!
            long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
            if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
                this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
            }
        }
    
    

    可以看到消息消费完成后,更新的进度都是对应的 processqueue中对应的消息树里的最小节点(即偏移量最小的节点),那么有可能存在这样的问题,下面来自 参考

    这钟方式和传统的一条message单独ack的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。
    
    在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。
    
    在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。

    总结

        从参考资料中我学习到了自己学习与别人的差异是总结的能力,通过浓缩代码片段,总结核心的逻辑步骤,加深对逻辑的理解。

    参考资料

    • https://www.jianshu.com/p/5843cdcd02aa
    • http://jaskey.github.io/blog/2017/01/25/rocketmq-consume-offset-management/
  • 相关阅读:
    101. Symmetric Tree(js)
    100. Same Tree(js)
    99. Recover Binary Search Tree(js)
    98. Validate Binary Search Tree(js)
    97. Interleaving String(js)
    96. Unique Binary Search Trees(js)
    95. Unique Binary Search Trees II(js)
    94. Binary Tree Inorder Traversal(js)
    93. Restore IP Addresses(js)
    92. Reverse Linked List II(js)
  • 原文地址:https://www.cnblogs.com/Benjious/p/12162047.html
Copyright © 2011-2022 走看看