zoukankan      html  css  js  c++  java
  • rocketMQ 长轮询

    consumer 拉取消息,对应的 queue 如果没有数据,broker 不会立即返回,而是以一种长轮询的方式处理,把 PullReuqest 保存起来,等待 queue 有了消息后,或者长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest。

    1. queue 此时没有数据

    // PullMessageProcessor#processRequest 片段
    case ResponseCode.PULL_NOT_FOUND:
        // broker 和 consumer 都允许 suspend,默认开启
        if (brokerAllowSuspend && hasSuspendFlag) {
            // 确定 pollingTimeMills 值
            // 以 DefaultMQPushConsumerImpl 为例,BROKER_SUSPEND_MAX_TIME_MILLIS = 15000
            long pollingTimeMills = suspendTimeoutMillisLong;
            if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
            }
    
            // 使用 PullRequest 把原始的 PullMessageRequest 封装起来
            String topic = requestHeader.getTopic();
            long offset = requestHeader.getQueueOffset();
            int queueId = requestHeader.getQueueId();
            PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
            this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
            response = null;
            break;
        }

    2. 保存 PullRequest

    // org.apache.rocketmq.broker.longpolling.PullRequestHoldService
    private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
        new ConcurrentHashMap<String, ManyPullRequest>(1024);
    
    // org.apache.rocketmq.broker.longpolling.ManyPullRequest
    private final ArrayList<PullRequest> pullRequestList = new ArrayList<>();    

    3. 当 queue 有新的消息产生,写到 commitLog 后,并且 reput 到 consumeQueue 和 indexFile 后,则触发处理 PullRequest

    // org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput 片段
    // 写入 consumeQueue 和 indexFile
    DefaultMessageStore.this.doDispatch(dispatchRequest);
    
    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
        && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
        // 触发处理 PullRequest
        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
    }
    // 处理 queue 上对应的所有请求,有数据或者超时都返回
    // org.apache.rocketmq.broker.longpolling.PullRequestHoldService#notifyMessageArriving
    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
        long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            List<PullRequest> requestList = mpr.cloneListAndClear();
            if (requestList != null) {
                List<PullRequest> replayList = new ArrayList<PullRequest>();
    
                for (PullRequest request : requestList) {
                    long newestOffset = maxOffset;
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    }
    
                    if (newestOffset > request.getPullFromThisOffset()) {
                        boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                            new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                        // match by bit map, need eval again when properties is not null.
                        if (match && properties != null) {
                            match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                        }
    
                        if (match) {
                            try {
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());
                            } catch (Throwable e) {
                                log.error("execute request when wakeup failed.", e);
                            }
                            continue;
                        }
                    }
    
                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }
    
                    replayList.add(request);
                }
    
                if (!replayList.isEmpty()) {
                    mpr.addPullRequest(replayList);
                }
            }
        }
    }

    4. PullRequestHoldService 是一个线程类,也会在 run 方法里不停地检查 PullRequest

    // org.apache.rocketmq.broker.longpolling.PullRequestHoldService#run
    public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    this.waitForRunning(5 * 1000);
                } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }
    
                long beginLockTimestamp = this.systemClock.now();
                this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                }
            } catch (Throwable e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
    
        log.info("{} service end", this.getServiceName());
    }

    5. PullMessageProcessor 处理长轮询醒来的请求,这次不能再阻塞它了

    // org.apache.rocketmq.broker.processor.PullMessageProcessor#executeRequestWhenWakeup
    public void executeRequestWhenWakeup(final Channel channel,
            final RemotingCommand request) throws RemotingCommandException {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
    
                    if (response != null) {
                        response.setOpaque(request.getOpaque());
                        response.markResponseType();
                        try {
                            channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
                                @Override
                                public void operationComplete(ChannelFuture future) throws Exception {
                                    if (!future.isSuccess()) {
                                        log.error("processRequestWrapper response to {} failed",
                                            future.channel().remoteAddress(), future.cause());
                                        log.error(request.toString());
                                        log.error(response.toString());
                                    }
                                }
                            });
                        } catch (Throwable e) {
                            log.error("processRequestWrapper process request over, but response failed", e);
                            log.error(request.toString());
                            log.error(response.toString());
                        }
                    }
                } catch (RemotingCommandException e1) {
                    log.error("excuteRequestWhenWakeup run", e1);
                }
            }
        };
        this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
    }

    6. 注意客户端的参数

    // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
    // broker 阻塞时间
    private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
    // 客户端超时时间
    private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
  • 相关阅读:
    VS2015&windows开发环境配置
    Chapter 12. Classes
    Chapter 11. Generic Algorithms
    Chapter 10. Associative Containers
    Chapter 9. Sequential Containers
    Chapter 8. The IO Library
    Chapter 7. Functions
    Chapter 5. Expressions
    案例分析 极化跳变
    机器如果能够实现自己建模,应该是下一次人工智能的飞跃点
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12761383.html
Copyright © 2011-2022 走看看