consumer 拉取消息,对应的 queue 如果没有数据,broker 不会立即返回,而是以一种长轮询的方式处理,把 PullReuqest 保存起来,等待 queue 有了消息后,或者长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest。
1. queue 此时没有数据
// PullMessageProcessor#processRequest 片段 case ResponseCode.PULL_NOT_FOUND: // broker 和 consumer 都允许 suspend,默认开启 if (brokerAllowSuspend && hasSuspendFlag) { // 确定 pollingTimeMills 值 // 以 DefaultMQPushConsumerImpl 为例,BROKER_SUSPEND_MAX_TIME_MILLIS = 15000 long pollingTimeMills = suspendTimeoutMillisLong; if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } // 使用 PullRequest 把原始的 PullMessageRequest 封装起来 String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; }
2. 保存 PullRequest
// org.apache.rocketmq.broker.longpolling.PullRequestHoldService private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = new ConcurrentHashMap<String, ManyPullRequest>(1024); // org.apache.rocketmq.broker.longpolling.ManyPullRequest private final ArrayList<PullRequest> pullRequestList = new ArrayList<>();
3. 当 queue 有新的消息产生,写到 commitLog 后,并且 reput 到 consumeQueue 和 indexFile 后,则触发处理 PullRequest
// org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput 片段 // 写入 consumeQueue 和 indexFile DefaultMessageStore.this.doDispatch(dispatchRequest); if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { // 触发处理 PullRequest DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); }
// 处理 queue 上对应的所有请求,有数据或者超时都返回 // org.apache.rocketmq.broker.longpolling.PullRequestHoldService#notifyMessageArriving public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (mpr != null) { List<PullRequest> requestList = mpr.cloneListAndClear(); if (requestList != null) { List<PullRequest> replayList = new ArrayList<PullRequest>(); for (PullRequest request : requestList) { long newestOffset = maxOffset; if (newestOffset <= request.getPullFromThisOffset()) { newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); } if (newestOffset > request.getPullFromThisOffset()) { boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); // match by bit map, need eval again when properties is not null. if (match && properties != null) { match = request.getMessageFilter().isMatchedByCommitLog(null, properties); } if (match) { try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } } if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } replayList.add(request); } if (!replayList.isEmpty()) { mpr.addPullRequest(replayList); } } } }
4. PullRequestHoldService 是一个线程类,也会在 run 方法里不停地检查 PullRequest
// org.apache.rocketmq.broker.longpolling.PullRequestHoldService#run public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName()); }
5. PullMessageProcessor 处理长轮询醒来的请求,这次不能再阻塞它了
// org.apache.rocketmq.broker.processor.PullMessageProcessor#executeRequestWhenWakeup public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { Runnable run = new Runnable() { @Override public void run() { try { final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false); if (response != null) { response.setOpaque(request.getOpaque()); response.markResponseType(); try { channel.writeAndFlush(response).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { log.error("processRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause()); log.error(request.toString()); log.error(response.toString()); } } }); } catch (Throwable e) { log.error("processRequestWrapper process request over, but response failed", e); log.error(request.toString()); log.error(response.toString()); } } } catch (RemotingCommandException e1) { log.error("excuteRequestWhenWakeup run", e1); } } }; this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request)); }
6. 注意客户端的参数
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl // broker 阻塞时间 private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15; // 客户端超时时间 private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;