zoukankan      html  css  js  c++  java
  • rocketmq--push消费过程

    Rocketmq消费分为push和pull两种方式,push为被动消费类型,pull为主动消费类型,push方式最终还是会从broker中pull消息。不同于pull的是,push首先要注册消费监听器,当监听器处触发后才开始消费消息,所以被称为“被动”消费。
     
     具体地,以pushConsumer的测试例子展开介绍,通常使用push消费的过程如下:
    public class PushConsumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
            consumer.subscribe("Jodie_topic_1023", "*");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //wrong time format 2017_0422_221800
            consumer.setConsumeTimestamp("20170422221800");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }
     
    上述过程背后设计到的点如下: 
     
    I. checkConfig 检查内容:
    1.消费组 -- (不能与默认DEFAULT_CONSUMER同名)
    2.消费模型 -- (默认CLUSTERING)
    3.从何处开始消费 -- (默认CONSUME_FROM_LAST_OFFSET)
    4.消费时间戳 -- (消息回溯,默认Default backtracking consumption time Half an hour ago)
    5.消费负载均衡策略 -- (默认AllocateMessageQueueAveragely)
    6.订阅关系 --(map类型,即可订阅多个topic;key=Topic, value=订阅描述)
    7.消费监听 --(必须为orderly or concurrently类型之一)
    8.消费消息的线程数量控制 -- (消费线程池最大、最小数量)
    9.检查单队列并行消费允许的最大跨度 --(consumeConcurrentlyMaxSpan)
    10.检查拉消息本地队列缓存消息最大数 --(pullThresholdForQueue)(processQueue.getMsgCount()记数)
    11.检查拉取时间间隔 --(拉消息间隔,由于是长轮询,所以默认为0)
    12.检查批量消费的个数 --(一次消费多少条消息)
    13.检查批量拉取消息的个数 --(一次最多拉多少条)
     
    II. copySubscription:
    将订阅信息设置到rebalanceImpl的map中用于负载。另外,如果该消费者的消费模式为集群消费,则会将retry的topic一并放到rebalanceImpl的map中用于负载。
     
    III. 设置rebanlance信息
    IV. 实例化pull消息的包装类型
     
    V. 如果不存在offsetStore对象,实例化offsetStore
    广播模式:
    public class LocalFileOffsetStore implements OffsetStore {...}
    注:load()函数体不为空
    集群模式:
    public class RemoteBrokerOffsetStore implements OffsetStore {...}
    注:load()函数体为空
     
    VI. 获取监听器,实例化consumeMessageService服务并启动
    ConsumeMessageOrderlyService启动后会对拉取下来的消息进行处理。ConsumeMessageOrderlyService有两种类型:ConsumeMessageOrderlyService和ConsumeMessageConcurrentlyService。
    1). 如果消息监听器是orderly类型,则创建ConsumeMessageOrderlyService实例
    ConsumeMessageOrderlyService.start()只处理消息模式为CLUSTERING的消息消费。
    public void start() {
            if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
                .messageModel())) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        ConsumeMessageOrderlyService.this.lockMQPeriodically();
                    }
                }, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS);
            }
        }
    线程启动后会每隔20s执行lockMQPeriodicallys(),lockMQPeriodicallys()会将消费的队列上锁,然后处理,具体过程,有机会单独成文分析。
     
    2). 如果消息监听器是concurrently类型,则创建ConsumeMessageConcurrentlyService实例
    ConsumeMessageConcurrentlyService.start()会定时清除过期消息 --> cleanExpireMsg()。
     
    VII. 注册消费组
    将group和consumer注册到MQClientInstance实例。
    与生产者注册生产者组类似,一个客户端进程中一个consumerGroup只能有一个实例。
    MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
    if (prev != null) {
        log.warn("the consumer group[" + group + "] exist already.");
        return false;
    }
    如果没有注册成功,则关闭消费服务,consumeMessageService.shutdown()。
     
    VIII. 启动mQClientFactory及MQClientInstance
    1). 获取client实例对象MQClientInstance -- getAndCreateMQClientInstance。一个进程只能产生一个MQClientInstance实例对象, 某个客户端的生产者与消费者共用这个实例对象。
    2). 启动客户端实例的个各种服务:
    public void start() throws MQClientException {
            synchronized (this) {
                switch (this.serviceState) {
                    case CREATE_JUST:
                        this.serviceState = ServiceState.START_FAILED;
                        // 1.判断NamesrvAddr是否为空,为空去远程http服务拉去地址
                        if (null == this.clientConfig.getNamesrvAddr()) {
                            this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
                        }
                        // 2.开启通信服务
                        this.mQClientAPIImpl.start();
                        // 3.启动各种定时任务
                        this.startScheduledTask();
                        // 4.启动消息拉取服务,循环拉取阻塞队列pullRequestQueue
                        this.pullMessageService.start();
                        // 5. 启动负载均衡服务
                        this.rebalanceService.start();
                        // 6.启动消息生产服务
                        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                        log.info("the client factory [{}] start OK", this.clientId);
                        this.serviceState = ServiceState.RUNNING;
                        break;
                    case RUNNING:
                        break;
                    case SHUTDOWN_ALREADY:
                        break;
                    case START_FAILED:
                        throw new MQClientException("The Factory object[" + this.getClientId()
                                + "] has been created before, and failed.", null);
                    default:
                        break;
                }
            }
        }
    
    分析push消费的过程,需对上述过程第3点、第4点、第5点依次介绍。
    第3点、启动各种定时任务过程
     编号  任务 周期 启动时延 
     获取namesrv地址 每隔2分钟  0.01s
    2  更新路由信息 每隔3分钟  001s
    3  向所有broker发送心跳包,并清除无效broker   每隔30s  1s
    4  持久化消费位置offset 每隔5s  10s
    5  调整消费线程池大小 每隔1分钟  1min
    注:编号3中,客户端会通过心跳消息,向broker注册消费信息。Broker收到该心跳消息,把它维护在一个叫做ConsumerManager的对象里面,为之后做消费的负载均衡提供数据,负载均衡在消费端做,消费端在负载均衡时首先要从broker那获取这份全局信息。
     
    第4点 启动pullMessageService服务
    初始化客户端实例时,创建PullMessageService服务对象。
    this.pullMessageService = new PullMessageService(this),其中PullMessageService继承于ServiceThread,是一个线程对象。启动消息拉取服务线程后,在线程没有阻塞的情况下会不断地从循环阻塞队列pullRequestQueue拉取PullRequest对象,然后执行this.pullMessage(pullRequest)。
     
    那么pullRequestQueue的数据如何put进去的?核心是doRebalance ,负载均衡具体细节可以参考:http://www.cnblogs.com/chenjunjie12321/p/7913323.html

    例如当前有N个客户端同时消费一个topic下的消息队列(如上图),当前客户端( clientId = currentCId),经过负载均衡处理后得到分配给当前消费者的消息队列(如上图的qM、qN),之后将这些队列与processQueueTable中的队列进行比对分析,见下面第五点。

     
    第5点 RebalancePushImpl 负载均衡,分发pullRequest到pullRequestQueue。
    负载均衡处理后得到分配给当前消费者的消息队列,然后将这些队列进行updateProcessQueueTableInRebalance 处理。updateProcessQueueTableInRebalance 的大致逻辑为如下 I、II 两步:
     
     
    I. 首先检查当前RebalancePushImpl实例processQueueTable中与mqSet的包含关系
    (1)如图中processQueueTable的灰色部分,表示与mqSet集合不互不包含的队列,对这些队列首先设置Dropped为true,然后看这些队列是否可以移除出processQueueTable--removeUnnecessaryMessageQueue,即每隔1s 看是否可以拿到当前队列的消费锁(tryLock()),拿到后返回true, 如果等待1s后仍然拿不到当前队列的消费锁则返回false,如果返回true则从processQueueTable移除对应的Entry<MessageQueue, ProcessQueue>;
     
    (2) 如图中processQueueTable的白色部分,表示与mqSet集合的交集队列,对于这些队列,如果是消费类型是pull型,则不用管,如果是push型,看这些队列是否isPullExpired,如果是这些队列首先设置Dropped为true,则可以移除出processQueueTable--removeUnnecessaryMessageQueue。
     
    II. 经过 I 处理,processQueueTable更新之后, 将processQueueTable集合与mqSet的的相对补集: processQueueTable(mq) - mqSet 里的消息队列依次封装成pullRequest,然后dispatchPullRequest到pullRequestQueue中。
     
    经过上述处理后,待消费的队列放在了pullRequestList中,之后遍历pullRequestList,对遍历的每个队列进行消费,代码如下:
     @Override
        public void dispatchPullRequest(List<PullRequest> pullRequestList) {
            for (PullRequest pullRequest : pullRequestList) {
                this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
                log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
            }
        }
    

    executePullRequestImmediately的逻辑功能:

     public void executePullRequestImmediately(final PullRequest pullRequest) {
            try {
                this.pullRequestQueue.put(pullRequest);
            } catch (InterruptedException e) {
                log.error("executePullRequestImmediately pullRequestQueue.put", e);
            }
        }

    总之,最终会将负载均衡得到的队列存放到pullRequestQueue。

     
    回过来继续分析第4点,pullMessageService线程涉及到消费的核心过程DefaultMQPushConsumerImpl.pullMessage,pullMessageService线程线程体源码如下:
     @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
    
            while (!this.isStopped()) {
                try {
                    PullRequest pullRequest = this.pullRequestQueue.take();
                    if (pullRequest != null) {
                        this.pullMessage(pullRequest);
                    }
                } catch (InterruptedException e) {
                } catch (Exception e) {
                    log.error("Pull Message Service Run Method exception", e);
                }
            }
    
            log.info(this.getServiceName() + " service end");
        }
    

    调用DefaultMQPushConsumerImpl.pullMessage方法:

     private void pullMessage(final PullRequest pullRequest) {
            final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
            if (consumer != null) {
                DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
                impl.pullMessage(pullRequest);
            } else {
                log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
            } 

    pullMessage具体体拉流程如下图所示:

     
     


    下面对并发消费模型(concurrently)的消费代码进行展示:


    class ConsumeRequest implements Runnable ,其线程体方法如下:
    @Override
            public void run() {
                if (this.processQueue.isDropped()) {
                    log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                    return;
                }
    
                MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
                ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
                ConsumeConcurrentlyStatus status = null;
    
                ConsumeMessageContext consumeMessageContext = null;
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    consumeMessageContext = new ConsumeMessageContext();
                    consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
                    consumeMessageContext.setProps(new HashMap<String, String>());
                    consumeMessageContext.setMq(messageQueue);
                    consumeMessageContext.setMsgList(msgs);
                    consumeMessageContext.setSuccess(false);
                    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                }
    
                long beginTimestamp = System.currentTimeMillis();
                boolean hasException = false;
                ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                try {
                    ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
                    if (msgs != null && !msgs.isEmpty()) {
                        for (MessageExt msg : msgs) {
                            MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                        }
                    }
                    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
                } catch (Throwable e) {
                    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                        RemotingHelper.exceptionSimpleDesc(e),
                        ConsumeMessageConcurrentlyService.this.consumerGroup,
                        msgs,
                        messageQueue);
                    hasException = true;
                }
                long consumeRT = System.currentTimeMillis() - beginTimestamp;
                if (null == status) {
                    if (hasException) {
                        returnType = ConsumeReturnType.EXCEPTION;
                    } else {
                        returnType = ConsumeReturnType.RETURNNULL;
                    }
                } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                    returnType = ConsumeReturnType.TIME_OUT;
                } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                    returnType = ConsumeReturnType.FAILED;
                } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
                    returnType = ConsumeReturnType.SUCCESS;
                }
    
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                }
    
                if (null == status) {
                    log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                        ConsumeMessageConcurrentlyService.this.consumerGroup,
                        msgs,
                        messageQueue);
                    status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
    
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    consumeMessageContext.setStatus(status.toString());
                    consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                }
    
                ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
                    .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
    
                if (!processQueue.isDropped()) {
                    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
                } else {
                    log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
                }
            }
    View Code

    consumerRequest逻辑:

     

    processConsumeResult -- 对消费结果进行处理:

     重试队列发消息逻辑:

    生成一个重试队列,重试队列topic =  %RETRY% + consumerGroup的形式。
     
     
     附:
    值得注意的是每次消费pullRequest上的一条数据后上更新消费到达的 offset,然后将pullRequest.setNextOffset(offset);
    //这里的 this 为一个 DefaultMQPushConsumerImpl 实例对象
    final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
    ...
    pullRequest.setNextOffset(offset);

    其中 computePullFromWhere采用的策略有如下三种(另外还有几个已经被弃用的(@Deprecated)):

    CONSUME_FROM_LAST_OFFSET(默认): 一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费。
    CONSUME_FROM_FIRST_OFFSET: 一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费。
    CONSUME_FROM_TIMESTAMP: 一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费。

    DefaultMQPushConsumer 中默认采用 CONSUME_FROM_LAST_OFFSET 这种方式,当然可以根据自己需要修改computePullFromWhere的策略

    private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
    
    IX. updateTopicSubscribeInfoWhenSubscriptionChanged
    X. sendHeartbeatToAllBrokerWithLock
    XI. rebalanceImmediately
     
    (完)
  • 相关阅读:
    TCP 连接中的TIME_WAIT
    tcp 重组原理
    自己用wireshark 抓了个包,分析了一下
    wireshark 使用技巧
    IP 网际协议
    CSS3 选择器
    ajax 底层源码解析
    初识 Java
    jQuery (DOM篇)
    绘制 SVG
  • 原文地址:https://www.cnblogs.com/chenjunjie12321/p/7922362.html
Copyright © 2011-2022 走看看