zoukankan      html  css  js  c++  java
  • RocketMq总结(三) -- 消费者启动 MQClientInstance

    一 消费者成员变量

    public class DefaultMQPushConsumerImpl implements MQConsumerInner {
        /**
         * Delay some time when exception occur
         */
        private long pullTimeDelayMillsWhenException = 3000;
        /**
         * Flow control interval
         */
        private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
        /**
         * Delay some time when suspend pull service
         */
        private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;
        private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
        private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
        private final InternalLogger log = ClientLogger.getLog();
        private final DefaultMQPushConsumer defaultMQPushConsumer;
        private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
        private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
        private final long consumerStartTimestamp = System.currentTimeMillis();
        private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
        private final RPCHook rpcHook;
        private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
        private MQClientInstance mQClientFactory;
        private PullAPIWrapper pullAPIWrapper;
        private volatile boolean pause = false;
        private boolean consumeOrderly = false;
        private MessageListener messageListenerInner;
        private OffsetStore offsetStore;
        private ConsumeMessageService consumeMessageService;
        private long queueFlowControlTimes = 0;
        private long queueMaxSpanFlowControlTimes = 0;

      MQClientInstance 是负责网络通信的工具类,其内部实现是netty

      初始化的地方在start方法

    public synchronized void start() throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                    log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                        this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                    this.serviceState = ServiceState.START_FAILED;
    
                    this.checkConfig();
    
                    this.copySubscription();
    
                    if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                        this.defaultMQPushConsumer.changeInstanceNameToPID();
                    }
    
                    this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

    MQClientManager#getOrCreateMQClientInstance

     public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
            String clientId = clientConfig.buildMQClientId();
            MQClientInstance instance = this.factoryTable.get(clientId);
            if (null == instance) {
                instance =
                    new MQClientInstance(clientConfig.cloneClientConfig(),
                        this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
                MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
                if (prev != null) {
                    instance = prev;
                    log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
                } else {
                    log.info("Created new MQClientInstance for clientId:[{}]", clientId);
                }
            }
    
            return instance;
        }

     MQClientInstance 最重要的属性就是 

     private final MQClientAPIImpl mQClientAPIImpl; 

    而 MQClientAPIImpl 其内部就是Netty的客户端

     this.remotingClient = new NettyRemotingClient(nettyClientConfig, null); 

    public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
    
        private static final long LOCK_TIMEOUT_MILLIS = 3000;
    
        private final NettyClientConfig nettyClientConfig;
        private final Bootstrap bootstrap = new Bootstrap();
        private final EventLoopGroup eventLoopGroupWorker;

    二 客户端定期创建拉取任务

    在 DefaultMQPushConsumerImpl的start()中  

     mQClientFactory.start(); 

    public void start() throws MQClientException {
    
            synchronized (this) {
                switch (this.serviceState) {
                    case CREATE_JUST:
                        this.serviceState = ServiceState.START_FAILED;
                        // If not specified,looking address from name server
                        if (null == this.clientConfig.getNamesrvAddr()) {
                            this.mQClientAPIImpl.fetchNameServerAddr();
                        }
                        // Start request-response channel
                        this.mQClientAPIImpl.start();
                        // Start various schedule tasks
                        this.startScheduledTask();
                        // Start pull service
                        this.pullMessageService.start();
                        // Start rebalance service
                        this.rebalanceService.start();
                        // Start push service
                        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                        log.info("the client factory [{}] start OK", this.clientId);
                        this.serviceState = ServiceState.RUNNING;
                        break;
                    case START_FAILED:
                        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                    default:
                        break;
                }
            }
        }

    PullMessageService实现了Runnable接口

    @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
    
            while (!this.isStopped()) {
                try {
                    PullRequest pullRequest = this.pullRequestQueue.take();
                    this.pullMessage(pullRequest);
                } catch (InterruptedException ignored) {
                } catch (Exception e) {
                    log.error("Pull Message Service Run Method exception", e);
                }
            }
    
            log.info(this.getServiceName() + " service end");
        }

    不断地从pullRequestQueue取任务去拉取消息

    那么又是什么时候向pullRequestQueue中放入拉取请求的呢。

    1 负载均衡器每隔20s会执行一次负载均衡,如果发现某一个ProcessQueue没有拉取任务,就把对应的拉取任务放到PullQueue里面

    2 每次拉取结束之后,DefaultMQPushConsumerImpl都会调用方法把下次的拉取请求放入其中

    PullCallback pullCallback = new PullCallback() {
                @Override
                public void onSuccess(PullResult pullResult) {
                    if (pullResult != null) {
                        pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                            subscriptionData);
    
                        switch (pullResult.getPullStatus()) {
                            case FOUND:
                                long prevRequestOffset = pullRequest.getNextOffset();
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                long pullRT = System.currentTimeMillis() - beginTimestamp;
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullRT);
    
                                long firstMsgOffset = Long.MAX_VALUE;
                                if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                } else {
                                    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
    
                                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                        pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
    
                                    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                        pullResult.getMsgFoundList(),
                                        processQueue,
                                        pullRequest.getMessageQueue(),
                                        dispatchToConsume);
    
                                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                    } else {
                                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                    }

    看一下PullRequest的成员变量

    下面让我们一一来介绍一下 PullRequest 的核心属性
    1 ) String consumerGroup :消费者组
    2)  MessageQueue messageQueue:待拉取消息队列
    3 ) ProcessQueue processQueue :消息处理队列,从Broker拉取到的消息先存入ProccessQueue,然后再提交到消费者消费线程池消费
    4 ) long nextOffset :待拉取的 MessageQueue 偏移
    5 ) Boolean lockedFirst :是否被锁定
     
    下一节继续分析拉取过程
  • 相关阅读:
    单元测试之道读书笔记(七)
    单元测试之道读书笔记(六)
    单元测试之道读书笔记(五)
    单元测试之道读书笔记(三)
    技术网站推荐
    CentOS7部署Haproxy 1.7.2
    Centos7.0配置MySQL主从服务器
    Centos7.0安装mysql5.6
    centos7配置Java环境
    Centos6.5 DNS配置
  • 原文地址:https://www.cnblogs.com/juniorMa/p/15122772.html
Copyright © 2011-2022 走看看