zoukankan      html  css  js  c++  java
  • RocketMQ 消费者

    本文分析 DefaultMQPushConsumer,异步发送消息,多线程消费的情形。

    DefaultMQPushConsumerImpl
      MQClientInstance 一个客户端进程只有一个 MQClientInstance 实例
        MQClientAPIImpl 和 netty 打交道
        PullMessageService 拉取消息
        RebalanceService 客户端 rebalance
        ConcurrentMap<String/* group */, MQProducerInner>
        ConcurrentMap<String/* group */, MQConsumerInner>
      ConsumeMessageService 消费消息
        ThreadPoolExecutor 消费线程池
      RebalancePushImpl  负责 rebalance
        ConcurrentMap<MessageQueue, ProcessQueue>

    消费者启动从 DefaultMQPushConsumerImpl#start 开始,创建 MQClientInstance 实例,创建 ConsumeMessageService 实例,然后分别启动 ConsumeMessageService#start, MQClientInstance#start,这时 PullMessageService 和 RebalanceService 就开始工作了。目前来看,常规用法中,在一个 jvm 进程中,只有一个 MQClientInstance 对象,对象中存储了 jvm 中所有的 producer 和 consumer。

    org.apache.rocketmq.client.impl.consumer.RebalanceService#run

    public void run() {
        log.info(this.getServiceName() + " service started");
    
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
    
        log.info(this.getServiceName() + " service end");
    }

    RebalanceService 一会睡眠,一会工作,当消费者数量发生变化时,broker 会推送通知给所有消费者,消费者唤醒 rebalance 线程,开始工作。

    所有的客户端按照相同的算法,分配 queue,然后把结果保存到 processQueueTable 中。

    在第一次 rebalance 时,会创建 PullRequest 放入 PullMessageService 的队列中,开启了拉取消息的序幕。

    // mqSet 是分配给当前消费者的 queue
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
        boolean changed = false;
    
        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            MessageQueue mq = next.getKey();
            ProcessQueue pq = next.getValue();
    
            if (mq.getTopic().equals(topic)) {
                if (!mqSet.contains(mq)) {
                    // 该 queue 分配给其他消费者了
                    pq.setDropped(true);
                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                        it.remove();
                        changed = true;
                        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                    }
                } else if (pq.isPullExpired()) {
                    switch (this.consumeType()) {
                        case CONSUME_ACTIVELY:
                            break;
                        case CONSUME_PASSIVELY:
                            pq.setDropped(true);
                            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                it.remove();
                                changed = true;
                                log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                    consumerGroup, mq);
                            }
                            break;
                        default:
                            break;
                    }
                }
            }
        }
    
        // 为新分配的 queue 创建 PullRequest 请求
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }
    
                this.removeDirtyOffset(mq);
                ProcessQueue pq = new ProcessQueue();
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }
    
        this.dispatchPullRequest(pullRequestList);
    
        return changed;
    }

    rocketmq 拉取消息是客户端源源不断拉取的,在收到 PullRequest 的响应后,会继续创建一个 PullRequest 放入队列,无限循环下去。

    org.apache.rocketmq.client.impl.consumer.PullMessageService#run

    public void run() {
        log.info(this.getServiceName() + " service started");
    
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }
    
        log.info(this.getServiceName() + " service end");
    }

    注册回调函数
    org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

    拉取消息有流控逻辑,对于每一个分区,如果 cachedMessageCount 大于 1000,cachedMessageSizeInMiB 大于 100M,则延迟执行 PullRequest。把 PullRequest 重新放入队列。

    PullCallback pullCallback = new PullCallback() {
        @Override
        public void onSuccess(PullResult pullResult) {
            if (pullResult != null) {
                pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                    subscriptionData);
    
                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        long prevRequestOffset = pullRequest.getNextOffset();
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                        long pullRT = System.currentTimeMillis() - beginTimestamp;
                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                            pullRequest.getMessageQueue().getTopic(), pullRT);
    
                        long firstMsgOffset = Long.MAX_VALUE;
                        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        } else {
                            firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
    
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                            // 拉取到的消息放入 processQueue
                            boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                            // 创建 ConsumeRequest 放入 consumeMessageService 的线程池中
                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                pullResult.getMsgFoundList(),
                                processQueue,
                                pullRequest.getMessageQueue(),
                                dispatchToConsume);
    
                            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                    DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                            } else {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            }
                        }
    
                        if (pullResult.getNextBeginOffset() < prevRequestOffset
                            || firstMsgOffset < prevRequestOffset) {
                            log.warn(
                                "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                pullResult.getNextBeginOffset(),
                                firstMsgOffset,
                                prevRequestOffset);
                        }
    
                        break;
                    case NO_NEW_MSG:
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    
                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
    
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        break;
                    case NO_MATCHED_MSG:
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    
                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
    
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        break;
                    case OFFSET_ILLEGAL:
                        log.warn("the pull request offset illegal, {} {}",
                            pullRequest.toString(), pullResult.toString());
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    
                        pullRequest.getProcessQueue().setDropped(true);
                        DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
    
                            @Override
                            public void run() {
                                try {
                                    DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                        pullRequest.getNextOffset(), false);
    
                                    DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
    
                                    DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
    
                                    log.warn("fix the pull request offset, {}", pullRequest);
                                } catch (Throwable e) {
                                    log.error("executeTaskLater Exception", e);
                                }
                            }
                        }, 10000);
                        break;
                    default:
                        break;
                }
            }
        }
    
        @Override
        public void onException(Throwable e) {
            if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                log.warn("execute the pull request exception", e);
            }
    
            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }
    };

    回调传递给通信层
    org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync

    private void pullMessageAsync(
        final String addr,
        final RemotingCommand request,
        final long timeoutMillis,
        final PullCallback pullCallback
    ) throws RemotingException, InterruptedException {
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                RemotingCommand response = responseFuture.getResponseCommand();
                if (response != null) {
                    try {
                        PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
                        assert pullResult != null;
                        pullCallback.onSuccess(pullResult);
                    } catch (Exception e) {
                        pullCallback.onException(e);
                    }
                } else {
                    if (!responseFuture.isSendRequestOK()) {
                        pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
                    } else if (responseFuture.isTimeout()) {
                        pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
                            responseFuture.getCause()));
                    } else {
                        pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
                    }
                }
            }
        });
    }

    触发回调
    NettyRemotingClient.NettyClientHandler#channelRead0
    org.apache.rocketmq.remoting.netty.ResponseFuture#executeInvokeCallback

    消息消费
    org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run

  • 相关阅读:
    windows下mysql多实例安装
    linux下mysql多实例安装
    redisAPI整理
    Flink
    Google Dremel架构
    Apache Kylin
    Phoenix概述
    SQL on Hadoop技术综述
    AES对称加密算法
    Hawq架构
  • 原文地址:https://www.cnblogs.com/allenwas3/p/11652918.html
Copyright © 2011-2022 走看看