zoukankan      html  css  js  c++  java
  • RocketMq Broker 启动流程

    启动源:  

    RocketMq  Broker 启动 从 rocketmq broker 启动mqbroker 启动脚本可以得知,最终运行的是 BrokerStartup  的main 方法,并将脚本参数传递。

    export ROCKETMQ_HOME
    //运行的启动脚本   $@ 表示附加的所有参数信息传递给 BrokerStartup
    sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@

    关于Broker启动参数,我们不用死记,用的时候可以参考

      org.apache.rocketmq.common.BrokerConfig    -------> broker 配置文件

      org.apache.rocketmq.remoting.netty.NettyServerConfig  ------> netty 服务端配置文件

      org.apache.rocketmq.remoting.netty.NettyClientConfig  ------> netty 客户端配置文件

      org.apache.rocketmq.store.config.MessageStoreConfig   ------> store 配置文件

    apache 命令脚手架:

      命令脚手架即我们启动broker main 方法时候,会附带一些参数信息,使用命令脚手架可以让我们很方便的知道对应的参数信息,我们可以通过 -c 配置文件路径 ,来初始化配置。

    启动流程:

     /* main 方法启动
         * @param args
         */
        public static void main(String[] args) {
            //创建brkerController
            start(createBrokerController(args));
        }

    1.命令脚手架注册:

      可以得知broker 启动追加的参数信息

        -n :  指定broker 的  namesrvAddr 地址

        -h :打印命令

        -c: 指定配置文件的路径

        -p: 启动时候日志打印配置信息

        -m:启动时候日志打印导入的配置信息

     //1.注册一些 -n -h 命令
                Options options = ServerUtil.buildCommandlineOptions(new Options());
                //2.注册 -c -p -m 命令
                commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                    new PosixParser());
                if (null == commandLine) {
                    System.exit(-1);
                }

    2.设置 nettyServerConfig绑定的端口信息

     //设置broker 服务端绑定的端口为10911 
                nettyServerConfig.setListenPort(10911);

    3.  组装配置文件信息:

           //1. -c  后面追加的参数为配置文件路径
                if (commandLine.hasOption('c')) {
                    String file = commandLine.getOptionValue('c');
                    if (file != null) {
                        configFile = file;
                        InputStream in = new BufferedInputStream(new FileInputStream(file));
                        properties = new Properties();
                        properties.load(in);
    
                        properties2SystemEnv(properties);
                        //2.反射将配置文件broker信息注入BrokerConfig 配置类中
                        MixAll.properties2Object(properties, brokerConfig);
                        //3.反射将配置文件NettyServer信息注入NettyServerConfig  配置类中
                        MixAll.properties2Object(properties, nettyServerConfig);
                        //4.反射将配置文件NettyClient信息注入NettyClientConfig 配置类中
                        MixAll.properties2Object(properties, nettyClientConfig);
                        //5.反射将配置文件store 信息注入MessageStoreConfig 配置类中
                        MixAll.properties2Object(properties, messageStoreConfig);
    
                        BrokerPathConfigHelper.setBrokerConfigPath(file);
                        in.close();
                    }
                }
                //处理其他命令,注入到brokerConfig 中,比如我们的-n 命令
    
                MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

     4.创建BrokerController:

           //1.创建 BrokerController instance
                final BrokerController controller = new BrokerController(
                    brokerConfig,
                    nettyServerConfig,
                    nettyClientConfig,
                    messageStoreConfig);
                // remember all configs to prevent discard
                controller.getConfiguration().registerConfig(properties);
                //2.进行初始化 Broker controller
                boolean initResult = controller.initialize();

      创建实例:

    public BrokerController(
            final BrokerConfig brokerConfig,
            final NettyServerConfig nettyServerConfig,
            final NettyClientConfig nettyClientConfig,
            final MessageStoreConfig messageStoreConfig
        ) {
            //broker 配置信息
            this.brokerConfig = brokerConfig;
            //nettyServer配置信息
            this.nettyServerConfig = nettyServerConfig;
            //nettyClient 配置信息
            this.nettyClientConfig = nettyClientConfig;
            //store 配置信息
            this.messageStoreConfig = messageStoreConfig;
            //consumer 偏移量管理器,会读取store/config/consumerOffset.json  json 配置文件,维护了offsetTable Map 结构
            this.consumerOffsetManager = new ConsumerOffsetManager(this);
            //topic 配置管理器,会读取store/config/topics.json
            this.topicConfigManager = new TopicConfigManager(this);
            //拉去消息处理器,用来处理消费端消息拉去,关联的业务code 为RequestCode.PULL_MESSAGE
            this.pullMessageProcessor = new PullMessageProcessor(this);
            //
            this.pullRequestHoldService = new PullRequestHoldService(this);
            //
            this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
            //
            this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
            this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
            // 消费过滤管理器会读取store/config/consumerFilter.json
            this.consumerFilterManager = new ConsumerFilterManager(this);
            //生产者管理器
            this.producerManager = new ProducerManager();
            //用于清除不活动的连接,可以看到里面有一些扫描生产者以及消费者不活动连接的方法
            this.clientHousekeepingService = new ClientHousekeepingService(this);
            //
            this.broker2Client = new Broker2Client(this);
            this.subscriptionGroupManager = new SubscriptionGroupManager(this);
            //NettyClient 初始化
            this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
            //
            this.filterServerManager = new FilterServerManager(this);
            //
            this.slaveSynchronize = new SlaveSynchronize(this);
            //发送线程池队列
            this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
            //拉取线程池队列
            this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
            //查询线程池队列
            this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
            this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
            this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
            //broker 状态管理器
            this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
            this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
            //
            this.brokerFastFailure = new BrokerFastFailure(this);
            this.configuration = new Configuration(
                log,
                BrokerPathConfigHelper.getBrokerConfigPath(),
                this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
            );
        }

       初始化brokercotroller

     public boolean initialize() throws CloneNotSupportedException {
            boolean result = this.topicConfigManager.load();
            //加载对应管理器的配置文件
            result = result && this.consumerOffsetManager.load();
            result = result && this.subscriptionGroupManager.load();
            result = result && this.consumerFilterManager.load();
    
            if (result) {
                try {
                    this.messageStore =
                        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                            this.brokerConfig);
                    this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                    //load plugin
                    MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                    this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                    this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
                } catch (IOException e) {
                    result = false;
                    log.error("Failed to initialize", e);
                }
            }
            //加载CommitLog 文件
            result = result && this.messageStore.load();
    
            if (result) {
                //初始化NettyServer
                this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
                NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
                fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
                //初始化VIP NettyServer 端口为在109011 -2 
                this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
                //初始化一些线程池
                this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getSendMessageThreadPoolNums(),
                    this.brokerConfig.getSendMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.sendThreadPoolQueue,
                    new ThreadFactoryImpl("SendMessageThread_"));
    
                this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getPullMessageThreadPoolNums(),
                    this.brokerConfig.getPullMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.pullThreadPoolQueue,
                    new ThreadFactoryImpl("PullMessageThread_"));
    
                this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getQueryMessageThreadPoolNums(),
                    this.brokerConfig.getQueryMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.queryThreadPoolQueue,
                    new ThreadFactoryImpl("QueryMessageThread_"));
    
                this.adminBrokerExecutor =
                    Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                        "AdminBrokerThread_"));
    
                this.clientManageExecutor = new ThreadPoolExecutor(
                    this.brokerConfig.getClientManageThreadPoolNums(),
                    this.brokerConfig.getClientManageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.clientManagerThreadPoolQueue,
                    new ThreadFactoryImpl("ClientManageThread_"));
    
                this.consumerManageExecutor =
                    Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                        "ConsumerManageThread_"));
                //注册消息处理器,针对客户端发过来的消息code,会有针对的处理器进行处理
                this.registerProcessor();
    
                final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
                final long period = 1000 * 60 * 60 * 24;
                //执行定定时任务
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.getBrokerStats().record();
                        } catch (Throwable e) {
                            log.error("schedule record error.", e);
                        }
                    }
                }, initialDelay, period, TimeUnit.MILLISECONDS);
    
               //定时 保存consumerOffset.json 文件
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.consumerOffsetManager.persist();
                        } catch (Throwable e) {
                            log.error("schedule persist consumerOffset error.", e);
                        }
                    }
                }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
                //定时保存 consumerfilter.json 文件
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.consumerFilterManager.persist();
                        } catch (Throwable e) {
                            log.error("schedule persist consumer filter error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
                //
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.protectBroker();
                        } catch (Throwable e) {
                            log.error("protectBroker error.", e);
                        }
                    }
                }, 3, 3, TimeUnit.MINUTES);
                //打印水印日志
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printWaterMark();
                        } catch (Throwable e) {
                            log.error("printWaterMark error.", e);
                        }
                    }
                }, 10, 1, TimeUnit.SECONDS);
                //
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                    @Override
                    public void run() {
                        try {
                            log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                        } catch (Throwable e) {
                            log.error("schedule dispatchBehindBytes error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
    
                //
                if (this.brokerConfig.getNamesrvAddr() != null) {
                    this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                    log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
                } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                            } catch (Throwable e) {
                                log.error("ScheduledTask fetchNameServerAddr exception", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
                }
    
                if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                        this.updateMasterHAServerAddrPeriodically = false;
                    } else {
                        this.updateMasterHAServerAddrPeriodically = true;
                    }
    
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.slaveSynchronize.syncAll();
                            } catch (Throwable e) {
                                log.error("ScheduledTask syncAll slave exception", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                } else {
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.printMasterAndSlaveDiff();
                            } catch (Throwable e) {
                                log.error("schedule printMasterAndSlaveDiff error.", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                }
            }
    
            return result;
        }

      5. 启动BrokerController

     public void start() throws Exception {
            //刷盘,同步,高可用开启
            if (this.messageStore != null) {
                this.messageStore.start();
            }
            //10911 NettyServer 端口绑定 ,开始服务
            if (this.remotingServer != null) {
                this.remotingServer.start();
            }
            //VIP 10911 -2 NettyServer 端口绑定 ,开始服务
            if (this.fastRemotingServer != null) {
                this.fastRemotingServer.start();
            }
            //初始化组装NettyClient bootstrap 信息
            if (this.brokerOuterAPI != null) {
                this.brokerOuterAPI.start();
            }
            //
    
            if (this.pullRequestHoldService != null) {
                this.pullRequestHoldService.start();
            }
            //开启检测不活动连接的服务,定时任务,每10s运行一次
            if (this.clientHousekeepingService != null) {
                this.clientHousekeepingService.start();
            }
            //
            if (this.filterServerManager != null) {
                this.filterServerManager.start();
            }
            //向nameServer 注册broker topic 信息
            this.registerBrokerAll(true, false);
         //每30s 向 每个nameServer 注册broker topic 信息
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        BrokerController.this.registerBrokerAll(true, false);
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
            //
            if (this.brokerStatsManager != null) {
                this.brokerStatsManager.start();
            }
            //
            if (this.brokerFastFailure != null) {
                this.brokerFastFailure.start();
            }
        }

      向nameServer 注册broker topic 信息:

      Broker 在每经过30s 都会向nameserver 报告注册自己的topic 信息

     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        BrokerController.this.registerBrokerAll(true, false);
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
            //1.得到TopicConfigManager 维护的topicConfigTable map 结构信息
            TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    
            if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
                ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
                for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                    TopicConfig tmp =
                        new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                            this.brokerConfig.getBrokerPermission());
                    topicConfigTable.put(topicConfig.getTopicName(), tmp);
                }
                topicConfigWrapper.setTopicConfigTable(topicConfigTable);
            }
            //2.发送注册消息
            RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
                this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.getHAServerAddr(),
                topicConfigWrapper,
                this.filterServerManager.buildNewFilterServerList(),
                oneway,
                this.brokerConfig.getRegisterBrokerTimeoutMills());
    
            if (registerBrokerResult != null) {
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                }
    
                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
    
                if (checkOrderConfig) {
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
            }
        }
    private RegisterBrokerResult registerBroker(
            final String namesrvAddr,
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final boolean oneway,
            final int timeoutMills
        ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
            InterruptedException {
            //1.注册broker 消息头 RequestCode.REGISTER_BROKER
            RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
            //2.设置body
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            request.setBody(requestBody.encode());
    
            if (oneway) {
                try {
                    this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
                } catch (RemotingTooMuchRequestException e) {
                    // Ignore
                }
                return null;
            }
            //3.同步发送
            RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.SUCCESS: {
                    RegisterBrokerResponseHeader responseHeader =
                        (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                    RegisterBrokerResult result = new RegisterBrokerResult();
                    result.setMasterAddr(responseHeader.getMasterAddr());
                    result.setHaServerAddr(responseHeader.getHaServerAddr());
                    if (response.getBody() != null) {
                        result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                    }
                    return result;
                }
                default:
                    break;
            }
    
            throw new MQBrokerException(response.getCode(), response.getRemark());
        }

    TopicConfigManager

      topicConfigManager 维护了broker 所有的topic 信息,也是与将 topicConfigTable 定时上报给namesrv ;

     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        private static final long LOCK_TIMEOUT_MILLIS = 3000;
        private transient final Lock lockTopicConfigTable = new ReentrantLock();
        //维护了topic 的相关信息
        private final ConcurrentMap<String, TopicConfig> topicConfigTable =
            new ConcurrentHashMap<String, TopicConfig>(1024);
        private final DataVersion dataVersion = new DataVersion();
        private final Set<String> systemTopicList = new HashSet<String>();
        private transient BrokerController brokerController;
  • 相关阅读:
    BOI 2002 双调路径
    BOI'98 DAY 2 TASK 1 CONFERENCE CALL Dijkstra/Dijkstra+priority_queue/SPFA
    USACO 2013 November Contest, Silver Problem 2. Crowded Cows 单调队列
    BOI 2003 Problem. Spaceship
    USACO 2006 November Contest Problem. Road Blocks SPFA
    CEOI 2004 Trial session Problem. Journey DFS
    USACO 2015 January Contest, Silver Problem 2. Cow Routing Dijkstra
    LG P1233 木棍加工 动态规划,Dilworth
    LG P1020 导弹拦截 Dilworth
    USACO 2007 February Contest, Silver Problem 3. Silver Cow Party SPFA
  • 原文地址:https://www.cnblogs.com/iscys/p/13124053.html
Copyright © 2011-2022 走看看