zoukankan      html  css  js  c++  java
  • PushConsumer 消费消息

    CLUSTERING 模式下,消费者会订阅 retry topic

    // DefaultMQPushConsumerImpl#copySubscription
    private void copySubscription() throws MQClientException {
        try {
            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (final Map.Entry<String, String> entry : sub.entrySet()) {
                    final String topic = entry.getKey();
                    final String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        topic, subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }
    
            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }
    
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    break;
                case CLUSTERING:
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

    拉取消息

    // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
    //
    拉取消息的回调 PullCallback: // 消息放入 processQueue 的红黑树中,以分区 offset 作 key boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); // 为拉取到的消息创建 ConsumeRequest 任务 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);

    存放拉取的消息

    // ProcessQueue#putMessage
    // 消息放入 TreeMap 中,按 offset 排序
    public boolean putMessage(final List<MessageExt> msgs) {
        boolean dispatchToConsume = false;
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                int validMsgCnt = 0;
                for (MessageExt msg : msgs) {
                    MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                    if (null == old) {
                        validMsgCnt++;
                        this.queueOffsetMax = msg.getQueueOffset();
                        msgSize.addAndGet(msg.getBody().length);
                    }
                }
                msgCount.addAndGet(validMsgCnt);
    
                if (!msgTreeMap.isEmpty() && !this.consuming) {
                    dispatchToConsume = true;
                    this.consuming = true;
                }
    
                if (!msgs.isEmpty()) {
                    MessageExt messageExt = msgs.get(msgs.size() - 1);
                    String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                    if (property != null) {
                        long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
                        if (accTotal > 0) {
                            this.msgAccCnt = accTotal;
                        }
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("putMessage exception", e);
        }
    
        return dispatchToConsume;
    }

    提交 ConsumeRequest

    // ConsumeMessageConcurrentlyService#submitConsumeRequest
    // 按照 consumeBatchSize 对消息分批
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            for (int total = 0; total < msgs.size(); ) {
                List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }
    
                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }
    
                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }

    并行消费和顺序消费

    ConsumeMessageConcurrentlyService.ConsumeRequest#run
    ConsumeMessageOrderlyService.ConsumeRequest#run

    按批消费消息,消费完一批消息返回状态。消费成功,则删除这一批消息,并更新 offsetTable。

    ConsumeMessageConcurrentlyService 和 ConsumeMessageOrderlyService 的线程池是一样的,顺序消费体现在,同一时刻,一个分区的消息只有一个线程在消费。分区消息的到达是有先后顺序,但消费是在多个线程中进行。 一个 MessageQueue 对应一把锁。

    重点看下消费失败的情形

    public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        //  Integer.MAX_VALUE
        int ackIndex = context.getAckIndex();
    
        if (consumeRequest.getMsgs().isEmpty())
            return;
    
        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }
    
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                // 消费失败,ackIndex = -1
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    // 消息重新发送回队列
                    boolean result = this.sendMessageBack(msg, context);
                    // 如果发送失败,则放入 msgBackFailed 重新消费
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }
    
                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);
                    // 根据 msgBackFailed 重新生成 ConsumeRequest
                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }
    
        // 发送回 broker 的消息相当于是处理掉了,从红黑树中删除,返回最小的 offset
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

    broker 处理 retry 消息

    SendMessageProcessor#consumerSendMsgBack

    消费失败的消息,会发送到 %RETRY% + consumerGroup,消费者重新拉取消费。

    如果重复消费的次数达到 16 次,则 broker 把消息发送到死信队列 %DLQ% + consumerGroup。 

    位移提交

    // MQClientInstance#startScheduledTask
    // 定时任务提交 offset
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
  • 相关阅读:
    Navicat Premium 12连接Oracle时提示oracle library is not loaded的问题解决
    事务传播机制Propagation.REQUIRES_NEW
    @ApiImplicitParams、ApiImplicitParam的使用
    启动微服务项目的时候报redisInit错误---本地启动redis服务
    Swagger介绍及使用
    mybaitis框架-trim标签
    pgadmin怎样创建新的连接
    微服务项目启动
    管理中第一可怕之事(2) . 分类: 项目管理 2014-06-25 18:54 257人阅读 评论(0) 收藏
    管理中第一可怕之事(1) . 分类: 项目管理 2014-06-25 18:53 264人阅读 评论(0) 收藏
  • 原文地址:https://www.cnblogs.com/allenwas3/p/11699063.html
Copyright © 2011-2022 走看看