zoukankan      html  css  js  c++  java
  • 🏆【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPushConsumer的实现原理及源码分析

    RocketMQ的前提回顾

    RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

    1. 能够保证严格的消息顺序
    2. 提供丰富的消息拉取模式
    3. 高效的订阅者水平扩展能力
    4. 实时的消息订阅机制
    5. 亿级消息堆积能力

    为什么使用RocketMQ

    1. 强调集群无单点,可扩展,任意一点高可用、水平可扩展
    2. 海量消息堆积能力,消息堆积后写入低延迟
    3. 支持上万个队列
    4. 消息失败重试机制
    5. 消息可查询
    6. 开源社区活跃
    7. 成熟度已经经过淘宝双十一的考验

    RocketMQ的发展变化

    RocketMQ开源是使用文件作为持久化工具,阿里内部未开源的性能会更高,使用oceanBase作为持久化工具。
    在RocketMQ1.x和2.x使用zookeeper管理集群,3.x开始使用nameserver代替zk,更轻量级,此外RocketMQ的客户端拥有两种的操作方式:DefaultMQPushConsumer和DefaultMQPullConsumer。

    DefaultMQPushConsumer的Maven配置

    <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-client</artifactId>
       <version>4.3.0</version>
    </dependency>
    

    DefaultMQPushConsumer使用示例

    1. CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
    2. CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
    3. CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费

    以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始

    public class MQPushConsumer {
        public static void main(String[] args) throws MQClientException {
            String groupName = "rocketMqGroup1";
            // 用于把多个Consumer组织到一起,提高并发处理能力
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
            // 设置nameServer地址,多个以;分隔
            consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.setMessageModel(MessageModel.BROADCASTING);
            // 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息
            consumer.subscribe("order-topic", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> mgs,
                        ConsumeConcurrentlyContext consumeconcurrentlycontext) {
                    System.out.println(Thread.currentThread().getName()+"Receive New Messages:"+mgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        }
    }
    
    • CLUSTERING:默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所
    • 订阅topic整体,从而达到负载均衡的目的
    • BROADCASTING:同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。

    ConsumeConcurrentlyStatus.RECONSUME_LATER boker会根据设置的messageDelayLevel发起重试,默认16次。

    DefaultMQPushConsumerImpl中各个对象的主要功能如下:

    RebalancePushImpl:主要负责决定,当前的consumer应该从哪些Queue中消费消息;

    • 1)PullAPIWrapper:长连接,负责从broker处拉取消息,然后利用ConsumeMessageService回调用户的Listener执行消息消费逻辑;
    • 2)ConsumeMessageService:实现所谓的"Push-被动"消费机制;从Broker拉取的消息后,封装成ConsumeRequest提交给ConsumeMessageSerivce,此service负责回调用户的Listener消费消息;
    • 3)OffsetStore:维护当前consumer的消费记录(offset);有两种实现,Local和Rmote,Local存储在本地磁盘上,适用于BROADCASTING广播消费模式;而Remote则将消费进度存储在Broker上,适用于CLUSTERING集群消费模式;
    • 4)MQClientFactory:负责管理client(consumer、producer),并提供多中功能接口供各个Service(Rebalance、PullMessage等)调用;大部分逻辑均在这个类中完成;

    consumer.registerMessageListener执行过程:

    /**
         * Register a callback to execute on message arrival for concurrent consuming.
         * @param messageListener message handling callback.
         */
        @Override
        public void registerMessageListener(MessageListenerConcurrently messageListener) {
            this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
        }
    

    通过源码可以看出主要实现过程在DefaultMQPushConsumerImpl类中consumer.start后调用DefaultMQPushConsumerImpl的同步start方法

    public synchronized void start() throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                    log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                        this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                    this.serviceState = ServiceState.START_FAILED;
                    this.checkConfig();
                    this.copySubscription();
                    if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                        this.defaultMQPushConsumer.changeInstanceNameToPID();
                    }
                    this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                    this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                    this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                    this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                    this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                    this.pullAPIWrapper = new PullAPIWrapper(
                        mQClientFactory,
                        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                    this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
                    if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                        this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                    } else {
                        switch (this.defaultMQPushConsumer.getMessageModel()) {
                            case BROADCASTING:
                                this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            case CLUSTERING:
                                this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            default:
                                break;
                        }
                      this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                    }
                    this.offsetStore.load();
                    if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                        this.consumeOrderly = true;
                        this.consumeMessageService =
                            new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                    } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                        this.consumeOrderly = false;
                        this.consumeMessageService =
                            new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                    }
                    this.consumeMessageService.start();
                    boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        this.consumeMessageService.shutdown();
                        throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                    }
                    mQClientFactory.start();
                    log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                        + this.serviceState
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }
            this.updateTopicSubscribeInfoWhenSubscriptionChanged();
            this.mQClientFactory.checkClientInBroker();
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            this.mQClientFactory.rebalanceImmediately();
        }
    

    通过mQClientFactory.start();发我们发现他调用

    public void start() throws MQClientException {
            synchronized (this) {
                switch (this.serviceState) {
                    case CREATE_JUST:
                        this.serviceState = ServiceState.START_FAILED;
                        // If not specified,looking address from name server
                        if (null == this.clientConfig.getNamesrvAddr()) {
                            this.mQClientAPIImpl.fetchNameServerAddr();
                        }
                        // Start request-response channel
                        this.mQClientAPIImpl.start();
                        // Start various schedule tasks
                        this.startScheduledTask();
                        // Start pull service
                        this.pullMessageService.start();
                        // Start rebalance service
                        this.rebalanceService.start();
                        // Start push service
                      this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                        log.info("the client factory [{}] start OK", this.clientId);
                        this.serviceState = ServiceState.RUNNING;
                        break;
                    case RUNNING:
                        break;
                    case SHUTDOWN_ALREADY:
                        break;
                    case START_FAILED:
                        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                    default:
                        break;
                }
            }
        }
    

    在这个方法中有多个start,我们主要看pullMessageService.start();通过这里我们发现RocketMQ的Push模式底层其实也是通过pull实现的,下面我们来看下pullMessageService处理了哪些逻辑:

    private void pullMessage(final PullRequest pullRequest) {
            final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
            if (consumer != null) {
                DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
                impl.pullMessage(pullRequest);
            } else {
                log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
            }
        }
    

    我们发现其实他还是通过DefaultMQPushConsumerImpl类的pullMessage方法来进行消息的逻辑处理.

    pullRequest拉取方式

    PullRequest这里说明一下,上面我们已经提了一下rocketmq的push模式其实是通过pull模式封装实现的,pullrequest这里是通过长轮询的方式达到push效果。

    长轮询方式既有pull的优点又有push模式的实时性有点。

    • push方式是server端接收到消息后,主动把消息推送给client端,实时性高。弊端是server端工作量大,影响性能,其次是client端处理能力不同且client端的状态不受server端的控制,如果client端不能及时处理消息容易导致消息堆积已经影响正常业务等。

    • pull方式是client循环从server端拉取消息,主动权在client端,自己处理完一个消息再去拉取下一个,缺点是循环的时间不好设定,时间太短容易忙等,浪费CPU资源,时间间隔太长client的处理能力会下降,有时候有些消息会处理不及时。

    长轮询的方式可以结合两者优点
    1. 检查PullRequest对象中的ProcessQueue对象的dropped是否为true(在RebalanceService线程中为topic下的MessageQueue创建拉取消息请求时要维护对应的ProcessQueue对象,若Consumer不再订阅该topic则会将该对象的dropped置为true);若是则认为该请求是已经取消的,则直接跳出该方法;
    2. 更新PullRequest对象中的ProcessQueue对象的时间戳(ProcessQueue.lastPullTimestamp)为当前时间戳;
    3. 检查该Consumer是否运行中,即DefaultMQPushConsumerImpl.serviceState是否为RUNNING;若不是运行状态或者是暂停状态(DefaultMQPushConsumerImpl.pause=true),则调用PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)方法延迟再拉取消息,其中timeDelay=3000;该方法的目的是在3秒之后再次将该PullRequest对象放入PullMessageService. pullRequestQueue队列中;并跳出该方法;
    4. 进行流控。若ProcessQueue对象的msgCount大于了消费端的流控阈值(DefaultMQPushConsumer.pullThresholdForQueue,默认值为1000),则调用PullMessageService.executePullRequestLater方法,在50毫秒之后重新该PullRequest请求放入PullMessageService.pullRequestQueue队列中;并跳出该方法;
    5. 若不是顺序消费(即DefaultMQPushConsumerImpl.consumeOrderly等于false),则检查ProcessQueue对象的msgTreeMap:TreeMap<Long,MessageExt>变量的第一个key值与最后一个key值之间的差额,该key值表示查询的队列偏移量queueoffset;若差额大于阈值(由DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定,默认是2000),则调用PullMessageService.executePullRequestLater方法,在50毫秒之后重新将该PullRequest请求放入PullMessageService.pullRequestQueue队列中;并跳出该方法;
    6. 以PullRequest.messageQueue对象的topic值为参数从RebalanceImpl.subscriptionInner: ConcurrentHashMap, SubscriptionData>中获取对应的SubscriptionData对象,若该对象为null,考虑到并发的关系,调用executePullRequestLater方法,稍后重试;并跳出该方法;
    7. 若消息模型为集群模式(RebalanceImpl.messageModel等于CLUSTERING),则以PullRequest对象的MessageQueue变量值、type =READ_FROM_MEMORY(从内存中获取消费进度offset值)为参数调用DefaultMQPushConsumerImpl. offsetStore对象(初始化为RemoteBrokerOffsetStore对象)的readOffset(MessageQueue mq, ReadOffsetType type)方法从本地内存中获取消费进度offset值。若该offset值大于0 则置临时变量commitOffsetEnable等于true否则为false;该offset值作为pullKernelImpl方法中的commitOffset参数,在Broker端拉取消息之后根据commitOffsetEnable参数值决定是否用该offset更新消息进度。该readOffset方法的逻辑是:以入参MessageQueue对象从RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap <MessageQueue,AtomicLong>变量中获取消费进度偏移量;若该偏移量不为null则返回该值,否则返回-1;
    8. 当每次拉取消息之后需要更新订阅关系(由DefaultMQPushConsumer. postSubscriptionWhenPull参数表示,默认为false)并且以topic值参数从RebalanceImpl.subscriptionInner获取的SubscriptionData对象的classFilterMode等于false(默认为false),则将sysFlag标记的第3个字节置为1,否则该字节置为0;
    9. 该sysFlag标记的第1个字节置为commitOffsetEnable的值;第2个字节(suspend标记)置为1;第4个字节置为classFilterMode的值;
    10. 初始化匿名内部类PullCallback,实现了onSucess/onException方法; 该方法只有在异步请求的情况下才会回调;
    11. 调用底层的拉取消息API接口:

    PullAPIWrapper.pullKernelImpl

    PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法进行消息拉取操作。

    将回调类PullCallback传入该方法中,当采用异步方式拉取消息时,在收到响应之后会回调该回调类的方法。

    public void pullMessage(final PullRequest pullRequest) {
            final ProcessQueue processQueue = pullRequest.getProcessQueue();
            if (processQueue.isDropped()) {
                log.info("the pull request[{}] is dropped.", pullRequest.toString());
                return;
            }
            pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
            try {
                this.makeSureStateOK();
            } catch (MQClientException e) {
                log.warn("pullMessage exception, consumer state not ok", e);
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                return;
            }
            if (this.isPause()) {
                log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
                return;
            }
            long cachedMessageCount = processQueue.getMsgCount().get();
            long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
            if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                        this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
                }
                return;
            }
            if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                        this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
                }
                return;
            }
            if (!this.consumeOrderly) {
                if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                    if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                        log.warn(
                            "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                            processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                            pullRequest, queueMaxSpanFlowControlTimes);
                    }
                    return;
                }
            } else {
                if (processQueue.isLocked()) {
                    if (!pullRequest.isLockedFirst()) {
                        final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                        boolean brokerBusy = offset < pullRequest.getNextOffset();
                        log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                            pullRequest, offset, brokerBusy);
                        if (brokerBusy) {
                            log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                                pullRequest, offset);
                        }
                        pullRequest.setLockedFirst(true);
                        pullRequest.setNextOffset(offset);
                    }
                } else {
                    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                    log.info("pull message later because not locked in broker, {}", pullRequest);
                    return;
                }
            }
            final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
            if (null == subscriptionData) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                log.warn("find the consumer's subscription failed, {}", pullRequest);
                return;
            }
            final long beginTimestamp = System.currentTimeMillis();
            PullCallback pullCallback = new PullCallback() {
                @Override
                public void onSuccess(PullResult pullResult) {
                    if (pullResult != null) {
                        pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                            subscriptionData);
                        switch (pullResult.getPullStatus()) {
                            case FOUND:
                                long prevRequestOffset = pullRequest.getNextOffset();
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                long pullRT = System.currentTimeMillis() - beginTimestamp;
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullRT);
                                long firstMsgOffset = Long.MAX_VALUE;
                                if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                } else {
                                    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                        pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                        pullResult.getMsgFoundList(),
                                        processQueue,
                                        pullRequest.getMessageQueue(),
                                        dispatchToConsume);
                                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                    } else {
                                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                    }
                                }
                                if (pullResult.getNextBeginOffset() < prevRequestOffset
                                    || firstMsgOffset < prevRequestOffset) {
                                    log.warn(
                                        "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                        pullResult.getNextBeginOffset(),
                                        firstMsgOffset,
                                        prevRequestOffset);
                                }
                                break;
                            case NO_NEW_MSG:
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                break;
                            case NO_MATCHED_MSG:
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                break;
                            case OFFSET_ILLEGAL:
                                log.warn("the pull request offset illegal, {} {}",
                                    pullRequest.toString(), pullResult.toString());
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                pullRequest.getProcessQueue().setDropped(true);
                                DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
                                    @Override
                                    public void run() {
                                        try {
                                            DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                                pullRequest.getNextOffset(), false);
                                            DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                            DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                            log.warn("fix the pull request offset, {}", pullRequest);
                                        } catch (Throwable e) {
                                            log.error("executeTaskLater Exception", e);
                                        }
                                    }
                                }, 10000);
                                break;
                            default:
                                break;
                        }
                    }
                }
                @Override
                public void onException(Throwable e) {
                    if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("execute the pull request exception", e);
                    }
                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                }
            };
            boolean commitOffsetEnable = false;
            long commitOffsetValue = 0L;
            if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
                commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
                if (commitOffsetValue > 0) {
                    commitOffsetEnable = true;
                }
            }
            String subExpression = null;
            boolean classFilter = false;
            SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
            if (sd != null) {
                if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                    subExpression = sd.getSubString();
                }
                classFilter = sd.isClassFilterMode();
            }
            int sysFlag = PullSysFlag.buildSysFlag(
                commitOffsetEnable, // commitOffset
                true, // suspend
                subExpression != null, // subscription
                classFilter // class filter
            );
            try {
                // 下面我们看继续跟进这个方法,这个方法已经就是客户端如何拉取消息
                this.pullAPIWrapper.pullKernelImpl(
                    pullRequest.getMessageQueue(),
                    subExpression,
                    subscriptionData.getExpressionType(),
                    subscriptionData.getSubVersion(),
                    pullRequest.getNextOffset(),
                    this.defaultMQPushConsumer.getPullBatchSize(),
                    sysFlag,
                    commitOffsetValue,
                    BROKER_SUSPEND_MAX_TIME_MILLIS,
                    CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                    // 消息的通信方式为异步
                    CommunicationMode.ASYNC,
                    pullCallback
                );
            } catch (Exception e) {
                log.error("pullKernelImpl exception", e);
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        }
    

    发送远程请求拉取消息

    在MQClientAPIImpl.pullMessage方法中,根据入参communicationMode的值分为异步拉取和同步拉取方式两种。

    无论是异步方式拉取还是同步方式拉取,在发送拉取请求之前都会构造一个ResponseFuture对象,以请求消息的序列号为key值,存入NettyRemotingAbstract.responseTable:ConcurrentHashMap, ResponseFuture>变量中,对该变量有几种情况会处理:

    1. 发送失败后直接删掉responseTable变量中的相应记录;
    2. 收到响应消息之后,会以响应消息中的序列号(由服务端根据请求消息的序列号原样返回)从responseTable中查找ResponseFuture对象,并设置该对象的responseCommand变量。若是同步发送会唤醒等待响应的ResponseFuture.waitResponse方法;若是异步发送会调用ResponseFuture.executeInvokeCallback()方法完成回调逻辑处理;
    3. 在NettyRemotingClient.start()启动时,也会初始化定时任务,该定时任务每隔1秒定期扫描responseTable列表,遍历该列表中的ResponseFuture对象,检查等待响应是否超时,若超时,则调用ResponseFuture. executeInvokeCallback()方法,并将该对象从responseTable列表中删除;
    public PullResult pullMessage(
            final String addr,
            final PullMessageRequestHeader requestHeader,
            final long timeoutMillis,
            final CommunicationMode communicationMode,
            final PullCallback pullCallback
        ) throws RemotingException, MQBrokerException, InterruptedException {
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
            switch (communicationMode) {
                case ONEWAY:
                    assert false;
                    return null;
                case ASYNC:
                    this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
                    return null;
                case SYNC:
                    return this.pullMessageSync(addr, request, timeoutMillis);
                default:
                    assert false;
                    break;
            }
            return null;
        }
    

    同步拉取

    对于同步发送方式,调用MQClientAPIImpl.pullMessageSync(String addr, RemotingCommand request, long timeoutMillis)方法,大致步骤如下:

    1. 调用RemotingClient.invokeSync(String addr, RemotingCommand request, long timeoutMillis)方法:
      • 获取Broker地址的Channel信息。根据broker地址从RemotingClient.channelTables:ConcurrentHashMap, ChannelWrapper>变量中获取ChannelWrapper对象并返回该对象的Channel变量;若没有ChannelWrapper对象则与broker地址建立新的连接并将连接信息存入channelTables变量中,便于下次使用;
      • 若NettyRemotingClient.rpcHook:RPCHook变量不为空(该变量在应用层初始化DefaultMQPushConsumer或者DefaultMQPullConsumer对象传入该值),则调用RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法;
      • 调用NettyRemotingAbstract.invokeSyncImpl(Channel channel, RemotingCommand request, long timeoutMillis)方法,该方法的逻辑如下:
        • A)使用请求的序列号(opaue)、超时时间初始化ResponseFuture对象;并将该ResponseFuture对象存入NettyRemotingAbstract.responseTable: ConcurrentHashMap变量中;
        • B)调用Channel.writeAndFlush(Object msg)方法将请求对象RemotingCommand发送给Broker;然后调用addListener(GenericFutureListener<? extends Future<? super Void>> listener)方法添加内部匿名类:该内部匿名类实现了ChannelFutureListener接口的operationComplete方法,在发送完成之后回调该监听类的operationComplete方法,在该方法中,首先调用ChannelFuture. isSuccess()方法检查是否发送成功,若成功则置ResponseFuture对象的sendRequestOK等于true并退出此回调方法等待响应结果;若不成功则置ResponseFuture对象的sendRequestOK等于false,然后从NettyRemotingAbstract.responseTable中删除此请求序列号(opaue)的记录,置ResponseFuture对象的responseCommand等于null,并唤醒ResponseFuture.waitResponse(long timeoutMillis)方法的等待;
        • C)调用ResponseFuture.waitResponse(long timeoutMillis)方法等待响应结果;在发送失败或者收到响应消息(详见5.10.3小节)或者超时的情况下会唤醒该方法返回ResponseFuture.responseCommand变量值;
        • D)若上一步返回的responseCommand值为null,则抛出异常:若ResponseFuture.sendRequestOK为true,则抛出RemotingTimeoutException异常,否则抛出RemotingSendRequestException异常;
        • E)若上一步返回的responseCommand值不为null,则返回responseCommand变量值;
      • 若NettyRemotingClient.rpcHook: RPCHook变量不为空,则调用RPCHook.doAfterResponse(String remoteAddr, RemotingCommand request)方法;
    • 以上一步的返回值RemotingCommand对象为参数调用MQClientAPIImpl. processPullResponse (RemotingCommand response)方法将返回对象解析并封装成PullResultExt对象然后返回给调用者,响应消息的结果状态转换如下:
      • 若RemotingCommand对象的Code等于SUCCESS,则PullResultExt.pullStatus=FOUND;
      • 若RemotingCommand对象的Code等于PULL_NOT_FOUND,则PullResultExt.pullStatus= NO_NEW_MSG;
      • 若RemotingCommand对象的Code等于PULL_RETRY_IMMEDIATELY,则PullResultExt.pullStatus= NO_MATCHED_MSG;
      • 若RemotingCommand对象的Code等于PULL_OFFSET_MOVED,则PullResultExt.pullStatus= OFFSET_ILLEGAL;
    @Override
        public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
            throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
            long beginStartTime = System.currentTimeMillis();
            final Channel channel = this.getAndCreateChannel(addr);
            if (channel != null && channel.isActive()) {
                try {
                    if (this.rpcHook != null) {
                        this.rpcHook.doBeforeRequest(addr, request);
                    }
                    long costTime = System.currentTimeMillis() - beginStartTime;
                    if (timeoutMillis < costTime) {
                        throw new RemotingTimeoutException("invokeSync call timeout");
                    }
                    RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                    if (this.rpcHook != null) {
                        this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                    }
                    return response;
                } catch (RemotingSendRequestException e) {
                    log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                    this.closeChannel(addr, channel);
                    throw e;
                } catch (RemotingTimeoutException e) {
                    if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                        this.closeChannel(addr, channel);
                        log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                    }
                    log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                    throw e;
                }
            } else {
                this.closeChannel(addr, channel);
                throw new RemotingConnectException(addr);
            }
        }
    

    getMQClientAPIImpl().pullMessage最终通过channel写入并刷新队列中。然后在消息服务端大体的处理逻辑是服务端收到新消息请求后,如果队列中没有消息不急于返回,通过一个循环状态,每次waitForRunning一段时间默认5秒,然后再check,如果broker一直没有新新消息,第三次check的时间等到时间超过SuspendMaxTimeMills就返回空,如果在等待过程中收到了新消息直接调用notifyMessageArriving函数返回请求结果。“长轮询”的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给 Consumer 。长轮询的主动权掌握在consumer中,即使broker有大量的消息堆积也不会主动推送给consumer。

    极限就是为了超越而存在的
  • 相关阅读:
    C#多线程之基础篇3
    C#多线程之基础篇2
    C#多线程之基础篇1
    Log4net入门(帮助类篇)
    Log4net入门(WCF篇)
    Log4net入门(ASP.NET MVC 5篇)
    Log4net入门(SQL篇)
    深入理解java:4.3. 框架编程之MyBatis原理深入解析
    深入理解java:4.2. 框架编程之Spring框架的设计理念
    深入理解java:4.1. 框架编程之Spring MVC
  • 原文地址:https://www.cnblogs.com/liboware/p/15596209.html
Copyright © 2011-2022 走看看