CLUSTERING 模式下,消费者会订阅 retry topic
// DefaultMQPushConsumerImpl#copySubscription private void copySubscription() throws MQClientException { try { Map<String, String> sub = this.defaultMQPushConsumer.getSubscription(); if (sub != null) { for (final Map.Entry<String, String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subString); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } if (null == this.messageListenerInner) { this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; } } catch (Exception e) { throw new MQClientException("subscription exception", e); } }
拉取消息
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
// 拉取消息的回调 PullCallback: // 消息放入 processQueue 的红黑树中,以分区 offset 作 key boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); // 为拉取到的消息创建 ConsumeRequest 任务 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
存放拉取的消息
// ProcessQueue#putMessage // 消息放入 TreeMap 中,按 offset 排序 public boolean putMessage(final List<MessageExt> msgs) { boolean dispatchToConsume = false; try { this.lockTreeMap.writeLock().lockInterruptibly(); try { int validMsgCnt = 0; for (MessageExt msg : msgs) { MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg); if (null == old) { validMsgCnt++; this.queueOffsetMax = msg.getQueueOffset(); msgSize.addAndGet(msg.getBody().length); } } msgCount.addAndGet(validMsgCnt); if (!msgTreeMap.isEmpty() && !this.consuming) { dispatchToConsume = true; this.consuming = true; } if (!msgs.isEmpty()) { MessageExt messageExt = msgs.get(msgs.size() - 1); String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET); if (property != null) { long accTotal = Long.parseLong(property) - messageExt.getQueueOffset(); if (accTotal > 0) { this.msgAccCnt = accTotal; } } } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (InterruptedException e) { log.error("putMessage exception", e); } return dispatchToConsume; }
提交 ConsumeRequest
// ConsumeMessageConcurrentlyService#submitConsumeRequest // 按照 consumeBatchSize 对消息分批 public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } }
并行消费和顺序消费
ConsumeMessageConcurrentlyService.ConsumeRequest#run
ConsumeMessageOrderlyService.ConsumeRequest#run
按批消费消息,消费完一批消息返回状态。消费成功,则删除这一批消息,并更新 offsetTable。
ConsumeMessageConcurrentlyService 和 ConsumeMessageOrderlyService 的线程池是一样的,顺序消费体现在,同一时刻,一个分区的消息只有一个线程在消费。分区消息的到达是有先后顺序,但消费是在多个线程中进行。 一个 MessageQueue 对应一把锁。
重点看下消费失败的情形
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { // Integer.MAX_VALUE int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); // 消费失败,ackIndex = -1 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); // 消息重新发送回队列 boolean result = this.sendMessageBack(msg, context); // 如果发送失败,则放入 msgBackFailed 重新消费 if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); // 根据 msgBackFailed 重新生成 ConsumeRequest this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } // 发送回 broker 的消息相当于是处理掉了,从红黑树中删除,返回最小的 offset long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
broker 处理 retry 消息
SendMessageProcessor#consumerSendMsgBack
消费失败的消息,会发送到 %RETRY% + consumerGroup,消费者重新拉取消费。
如果重复消费的次数达到 16 次,则 broker 把消息发送到死信队列 %DLQ% + consumerGroup。
位移提交
// MQClientInstance#startScheduledTask // 定时任务提交 offset this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);