zoukankan      html  css  js  c++  java
  • 【RocketMQ】客户端源码解析

    简述

    消息由PullMessageService类采用单线程的方式从每个消息队列中轮询拉取。

    一个PullRequest对应一个消息队列。PullRequest启动时由RebalanceImpl类根据监听的TOPIC信息自动获取并放入请求队列中。如果消息队列发生更新则会进行更新操作。

     1 // PullMessageService类
     2 public void run() {
     3     log.info(this.getServiceName() + " service started");
     4 
     5     while (!this.isStopped()) {
     6         try {
     7             PullRequest pullRequest = this.pullRequestQueue.take();
     8             //不同的请求对应不同的MessageQueue
     9             this.pullMessage(pullRequest);
    10         } catch (InterruptedException ignored) {
    11         } catch (Exception e) {
    12             log.error("Pull Message Service Run Method exception", e);
    13         }
    14     }
    15 
    16     log.info(this.getServiceName() + " service end");
    17 }

    拉取消费流控

    数据流控在DefaultMQPushConsumerImpl 中进行控制处理;

    流控主要分为两个维度,一个是待处理的消息数量,一个是待处理的消息大小。

     1 //DefaultMQPushConsumerImpl类
     2 public void pullMessage(final PullRequest pullRequest) {
     3         final ProcessQueue processQueue = pullRequest.getProcessQueue();
     4         if (processQueue.isDropped()) {
     5             log.info("the pull request[{}] is dropped.", pullRequest.toString());
     6             return;
     7         }
     8 
     9         pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
    10 
    11         try {
    12             this.makeSureStateOK();
    13         } catch (MQClientException e) {
    14             log.warn("pullMessage exception, consumer state not ok", e);
    15             this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    16             return;
    17         }
    18 
    19         if (this.isPause()) {
    20             log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
    21             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
    22             return;
    23         }
    24 
    25         long cachedMessageCount = processQueue.getMsgCount().get();
    26         long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
    27 
    28         if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
             //待处理消息数量大于临界值时,则发起流控。默认值是1000条
    29 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 30 if ((queueFlowControlTimes++ % 1000) == 0) { 31 log.warn( 32 "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", 33 this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); 34 } 35 return; 36 } 37 38 if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
             //待处理消息大小大于临界值时,则发起流控。默认值是100M
    39 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 40 if ((queueFlowControlTimes++ % 1000) == 0) { 41 log.warn( 42 "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", 43 this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); 44 } 45 return; 46 } 47   //省略代码,感兴趣去查找源文件 48 }

     流控处理则是将当前PullRequest延迟timeDelay毫秒后,再放入到pullRequestQueue队列中等待下次拉取处理。

    // PullMessageService类
    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
            if (!isStopped()) {
                this.scheduledExecutorService.schedule(new Runnable() {
                    @Override
                    public void run() {
                        PullMessageService.this.executePullRequestImmediately(pullRequest);
                    }
                }, timeDelay, TimeUnit.MILLISECONDS);
            } else {
                log.warn("PullMessageServiceScheduledThread has shutdown");
            }
        }

    可以设置参数在拉取到待消费的数据后,流控一段时间进行下次拉取。

     1 PullCallback pullCallback = new PullCallback() {
     2             @Override
     3             public void onSuccess(PullResult pullResult) {
     4                 if (pullResult != null) {
     5                     pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
     6                         subscriptionData);
     7 
     8                     switch (pullResult.getPullStatus()) {
     9                         case FOUND:
    10                             long prevRequestOffset = pullRequest.getNextOffset();
    11                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    12                             long pullRT = System.currentTimeMillis() - beginTimestamp;
    13                             DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
    14                                 pullRequest.getMessageQueue().getTopic(), pullRT);
    15 
    16                             long firstMsgOffset = Long.MAX_VALUE;
    17                             if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
    18                                 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    19                             } else {
    20                                 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
    21 
    22                                 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
    23                                     pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
    24 
    25                                 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
    26                                 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    27                                     pullResult.getMsgFoundList(),
    28                                     processQueue,
    29                                     pullRequest.getMessageQueue(),
    30                                     dispatchToConsume);
    31 
    32                                 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
    33                                     // 控制拉取频率,默认值是0
    34                                     DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
    35                                         DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
    36                                 } else {
    37                                     DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    38                                 }
    39                             }
    40 
    41                             if (pullResult.getNextBeginOffset() < prevRequestOffset
    42                                 || firstMsgOffset < prevRequestOffset) {
    43                                 log.warn(
    44                                     "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
    45                                     pullResult.getNextBeginOffset(),
    46                                     firstMsgOffset,
    47                                     prevRequestOffset);
    48                             }
    49 
    50                             break;

    默认的流控值

    /**
    * Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
    * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
    */
    private int pullThresholdForQueue = 1000;
    
    /**
    * Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
    * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit      *
    * <p>
    * The size of a message only measured by message body, so it's not accurate
    */
    private int pullThresholdSizeForQueue = 100;
    
    /**
    * Message pull Interval
    */
    private long pullInterval = 0;

    消费消息流控

    拉取到的消息都放到消费线程池中进行处理。可以通过控制消费线程池大小来达到流控作用,降低对消费端的消耗。

    //ConsumeMessageConcurrentlyService类 
    public void submitConsumeRequest(
            final List<MessageExt> msgs,
            final ProcessQueue processQueue,
            final MessageQueue messageQueue,
            final boolean dispatchToConsume) {
            final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
            if (msgs.size() <= consumeBatchSize) {
                ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
                try {
                    //消息的处理交到消费线程池进行处理;
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    this.submitConsumeRequestLater(consumeRequest);
                }
            } else {
                for (int total = 0; total < msgs.size(); ) {
                    List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                    for (int i = 0; i < consumeBatchSize; i++, total++) {
                        if (total < msgs.size()) {
                            msgThis.add(msgs.get(total));
                        } else {
                            break;
                        }
                    }
    
                    ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                    try {
                        this.consumeExecutor.submit(consumeRequest);
                    } catch (RejectedExecutionException e) {
                        for (; total < msgs.size(); total++) {
                            msgThis.add(msgs.get(total));
                        }
    
                        this.submitConsumeRequestLater(consumeRequest);
                    }
                }
            }
        }
    public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
            MessageListenerConcurrently messageListener) {
            this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
            this.messageListener = messageListener;
    
            this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
            this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
            this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
    
            this.consumeExecutor = new ThreadPoolExecutor(
                this.defaultMQPushConsumer.getConsumeThreadMin(),
                this.defaultMQPushConsumer.getConsumeThreadMax(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.consumeRequestQueue,
                new ThreadFactoryImpl("ConsumeMessageThread_"));
    
            this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
            this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
        }

    配置参数

    /**
         * Minimum consumer thread number
         */
        private int consumeThreadMin = 20;
    
        /**
         * Max consumer thread number
         */
        private int consumeThreadMax = 20;
    
        /**
         * Threshold for dynamic adjustment of the number of thread pool
         */
        private long adjustThreadPoolNumsThreshold = 100000;

    源码参考:RocketMQ版本4.6.0  https://github.com/apache/rocketmq

      

  • 相关阅读:
    地铁线路问题分析
    软件工程大作业(社团管理系统)-个人总结报告
    第九组_社团管理系统_原型相关文档
    北京地铁线路出行和规划
    地铁线路规划
    WC 个人项目 ( node.js 实现 )
    自我介绍 + 软工5问
    软工个人项目(Java实现)
    自我介绍+软工五问
    结对编程(前后端分离)
  • 原文地址:https://www.cnblogs.com/whroid/p/14629045.html
Copyright © 2011-2022 走看看