zoukankan      html  css  js  c++  java
  • RocketMq Broker 启动流程

    启动源:  

    RocketMq  Broker 启动 从 rocketmq broker 启动mqbroker 启动脚本可以得知,最终运行的是 BrokerStartup  的main 方法,并将脚本参数传递。

    export ROCKETMQ_HOME
    //运行的启动脚本   $@ 表示附加的所有参数信息传递给 BrokerStartup
    sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@

    关于Broker启动参数,我们不用死记,用的时候可以参考

      org.apache.rocketmq.common.BrokerConfig    -------> broker 配置文件

      org.apache.rocketmq.remoting.netty.NettyServerConfig  ------> netty 服务端配置文件

      org.apache.rocketmq.remoting.netty.NettyClientConfig  ------> netty 客户端配置文件

      org.apache.rocketmq.store.config.MessageStoreConfig   ------> store 配置文件

    apache 命令脚手架:

      命令脚手架即我们启动broker main 方法时候,会附带一些参数信息,使用命令脚手架可以让我们很方便的知道对应的参数信息,我们可以通过 -c 配置文件路径 ,来初始化配置。

    启动流程:

     /* main 方法启动
         * @param args
         */
        public static void main(String[] args) {
            //创建brkerController
            start(createBrokerController(args));
        }

    1.命令脚手架注册:

      可以得知broker 启动追加的参数信息

        -n :  指定broker 的  namesrvAddr 地址

        -h :打印命令

        -c: 指定配置文件的路径

        -p: 启动时候日志打印配置信息

        -m:启动时候日志打印导入的配置信息

     //1.注册一些 -n -h 命令
                Options options = ServerUtil.buildCommandlineOptions(new Options());
                //2.注册 -c -p -m 命令
                commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                    new PosixParser());
                if (null == commandLine) {
                    System.exit(-1);
                }

    2.设置 nettyServerConfig绑定的端口信息

     //设置broker 服务端绑定的端口为10911 
                nettyServerConfig.setListenPort(10911);

    3.  组装配置文件信息:

           //1. -c  后面追加的参数为配置文件路径
                if (commandLine.hasOption('c')) {
                    String file = commandLine.getOptionValue('c');
                    if (file != null) {
                        configFile = file;
                        InputStream in = new BufferedInputStream(new FileInputStream(file));
                        properties = new Properties();
                        properties.load(in);
    
                        properties2SystemEnv(properties);
                        //2.反射将配置文件broker信息注入BrokerConfig 配置类中
                        MixAll.properties2Object(properties, brokerConfig);
                        //3.反射将配置文件NettyServer信息注入NettyServerConfig  配置类中
                        MixAll.properties2Object(properties, nettyServerConfig);
                        //4.反射将配置文件NettyClient信息注入NettyClientConfig 配置类中
                        MixAll.properties2Object(properties, nettyClientConfig);
                        //5.反射将配置文件store 信息注入MessageStoreConfig 配置类中
                        MixAll.properties2Object(properties, messageStoreConfig);
    
                        BrokerPathConfigHelper.setBrokerConfigPath(file);
                        in.close();
                    }
                }
                //处理其他命令,注入到brokerConfig 中,比如我们的-n 命令
    
                MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

     4.创建BrokerController:

           //1.创建 BrokerController instance
                final BrokerController controller = new BrokerController(
                    brokerConfig,
                    nettyServerConfig,
                    nettyClientConfig,
                    messageStoreConfig);
                // remember all configs to prevent discard
                controller.getConfiguration().registerConfig(properties);
                //2.进行初始化 Broker controller
                boolean initResult = controller.initialize();

      创建实例:

    public BrokerController(
            final BrokerConfig brokerConfig,
            final NettyServerConfig nettyServerConfig,
            final NettyClientConfig nettyClientConfig,
            final MessageStoreConfig messageStoreConfig
        ) {
            //broker 配置信息
            this.brokerConfig = brokerConfig;
            //nettyServer配置信息
            this.nettyServerConfig = nettyServerConfig;
            //nettyClient 配置信息
            this.nettyClientConfig = nettyClientConfig;
            //store 配置信息
            this.messageStoreConfig = messageStoreConfig;
            //consumer 偏移量管理器,会读取store/config/consumerOffset.json  json 配置文件,维护了offsetTable Map 结构
            this.consumerOffsetManager = new ConsumerOffsetManager(this);
            //topic 配置管理器,会读取store/config/topics.json
            this.topicConfigManager = new TopicConfigManager(this);
            //拉去消息处理器,用来处理消费端消息拉去,关联的业务code 为RequestCode.PULL_MESSAGE
            this.pullMessageProcessor = new PullMessageProcessor(this);
            //
            this.pullRequestHoldService = new PullRequestHoldService(this);
            //
            this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
            //
            this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
            this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
            // 消费过滤管理器会读取store/config/consumerFilter.json
            this.consumerFilterManager = new ConsumerFilterManager(this);
            //生产者管理器
            this.producerManager = new ProducerManager();
            //用于清除不活动的连接,可以看到里面有一些扫描生产者以及消费者不活动连接的方法
            this.clientHousekeepingService = new ClientHousekeepingService(this);
            //
            this.broker2Client = new Broker2Client(this);
            this.subscriptionGroupManager = new SubscriptionGroupManager(this);
            //NettyClient 初始化
            this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
            //
            this.filterServerManager = new FilterServerManager(this);
            //
            this.slaveSynchronize = new SlaveSynchronize(this);
            //发送线程池队列
            this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
            //拉取线程池队列
            this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
            //查询线程池队列
            this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
            this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
            this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
            //broker 状态管理器
            this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
            this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
            //
            this.brokerFastFailure = new BrokerFastFailure(this);
            this.configuration = new Configuration(
                log,
                BrokerPathConfigHelper.getBrokerConfigPath(),
                this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
            );
        }

       初始化brokercotroller

     public boolean initialize() throws CloneNotSupportedException {
            boolean result = this.topicConfigManager.load();
            //加载对应管理器的配置文件
            result = result && this.consumerOffsetManager.load();
            result = result && this.subscriptionGroupManager.load();
            result = result && this.consumerFilterManager.load();
    
            if (result) {
                try {
                    this.messageStore =
                        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                            this.brokerConfig);
                    this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                    //load plugin
                    MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                    this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                    this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
                } catch (IOException e) {
                    result = false;
                    log.error("Failed to initialize", e);
                }
            }
            //加载CommitLog 文件
            result = result && this.messageStore.load();
    
            if (result) {
                //初始化NettyServer
                this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
                NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
                fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
                //初始化VIP NettyServer 端口为在109011 -2 
                this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
                //初始化一些线程池
                this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getSendMessageThreadPoolNums(),
                    this.brokerConfig.getSendMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.sendThreadPoolQueue,
                    new ThreadFactoryImpl("SendMessageThread_"));
    
                this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getPullMessageThreadPoolNums(),
                    this.brokerConfig.getPullMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.pullThreadPoolQueue,
                    new ThreadFactoryImpl("PullMessageThread_"));
    
                this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getQueryMessageThreadPoolNums(),
                    this.brokerConfig.getQueryMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.queryThreadPoolQueue,
                    new ThreadFactoryImpl("QueryMessageThread_"));
    
                this.adminBrokerExecutor =
                    Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                        "AdminBrokerThread_"));
    
                this.clientManageExecutor = new ThreadPoolExecutor(
                    this.brokerConfig.getClientManageThreadPoolNums(),
                    this.brokerConfig.getClientManageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.clientManagerThreadPoolQueue,
                    new ThreadFactoryImpl("ClientManageThread_"));
    
                this.consumerManageExecutor =
                    Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                        "ConsumerManageThread_"));
                //注册消息处理器,针对客户端发过来的消息code,会有针对的处理器进行处理
                this.registerProcessor();
    
                final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
                final long period = 1000 * 60 * 60 * 24;
                //执行定定时任务
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.getBrokerStats().record();
                        } catch (Throwable e) {
                            log.error("schedule record error.", e);
                        }
                    }
                }, initialDelay, period, TimeUnit.MILLISECONDS);
    
               //定时 保存consumerOffset.json 文件
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.consumerOffsetManager.persist();
                        } catch (Throwable e) {
                            log.error("schedule persist consumerOffset error.", e);
                        }
                    }
                }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
                //定时保存 consumerfilter.json 文件
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.consumerFilterManager.persist();
                        } catch (Throwable e) {
                            log.error("schedule persist consumer filter error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
                //
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.protectBroker();
                        } catch (Throwable e) {
                            log.error("protectBroker error.", e);
                        }
                    }
                }, 3, 3, TimeUnit.MINUTES);
                //打印水印日志
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printWaterMark();
                        } catch (Throwable e) {
                            log.error("printWaterMark error.", e);
                        }
                    }
                }, 10, 1, TimeUnit.SECONDS);
                //
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                    @Override
                    public void run() {
                        try {
                            log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                        } catch (Throwable e) {
                            log.error("schedule dispatchBehindBytes error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
    
                //
                if (this.brokerConfig.getNamesrvAddr() != null) {
                    this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                    log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
                } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                            } catch (Throwable e) {
                                log.error("ScheduledTask fetchNameServerAddr exception", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
                }
    
                if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                        this.updateMasterHAServerAddrPeriodically = false;
                    } else {
                        this.updateMasterHAServerAddrPeriodically = true;
                    }
    
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.slaveSynchronize.syncAll();
                            } catch (Throwable e) {
                                log.error("ScheduledTask syncAll slave exception", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                } else {
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.printMasterAndSlaveDiff();
                            } catch (Throwable e) {
                                log.error("schedule printMasterAndSlaveDiff error.", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                }
            }
    
            return result;
        }

      5. 启动BrokerController

     public void start() throws Exception {
            //刷盘,同步,高可用开启
            if (this.messageStore != null) {
                this.messageStore.start();
            }
            //10911 NettyServer 端口绑定 ,开始服务
            if (this.remotingServer != null) {
                this.remotingServer.start();
            }
            //VIP 10911 -2 NettyServer 端口绑定 ,开始服务
            if (this.fastRemotingServer != null) {
                this.fastRemotingServer.start();
            }
            //初始化组装NettyClient bootstrap 信息
            if (this.brokerOuterAPI != null) {
                this.brokerOuterAPI.start();
            }
            //
    
            if (this.pullRequestHoldService != null) {
                this.pullRequestHoldService.start();
            }
            //开启检测不活动连接的服务,定时任务,每10s运行一次
            if (this.clientHousekeepingService != null) {
                this.clientHousekeepingService.start();
            }
            //
            if (this.filterServerManager != null) {
                this.filterServerManager.start();
            }
            //向nameServer 注册broker topic 信息
            this.registerBrokerAll(true, false);
         //每30s 向 每个nameServer 注册broker topic 信息
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        BrokerController.this.registerBrokerAll(true, false);
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
            //
            if (this.brokerStatsManager != null) {
                this.brokerStatsManager.start();
            }
            //
            if (this.brokerFastFailure != null) {
                this.brokerFastFailure.start();
            }
        }

      向nameServer 注册broker topic 信息:

      Broker 在每经过30s 都会向nameserver 报告注册自己的topic 信息

     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        BrokerController.this.registerBrokerAll(true, false);
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
            //1.得到TopicConfigManager 维护的topicConfigTable map 结构信息
            TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    
            if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
                ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
                for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                    TopicConfig tmp =
                        new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                            this.brokerConfig.getBrokerPermission());
                    topicConfigTable.put(topicConfig.getTopicName(), tmp);
                }
                topicConfigWrapper.setTopicConfigTable(topicConfigTable);
            }
            //2.发送注册消息
            RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
                this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.getHAServerAddr(),
                topicConfigWrapper,
                this.filterServerManager.buildNewFilterServerList(),
                oneway,
                this.brokerConfig.getRegisterBrokerTimeoutMills());
    
            if (registerBrokerResult != null) {
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                }
    
                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
    
                if (checkOrderConfig) {
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
            }
        }
    private RegisterBrokerResult registerBroker(
            final String namesrvAddr,
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final boolean oneway,
            final int timeoutMills
        ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
            InterruptedException {
            //1.注册broker 消息头 RequestCode.REGISTER_BROKER
            RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
            //2.设置body
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            request.setBody(requestBody.encode());
    
            if (oneway) {
                try {
                    this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
                } catch (RemotingTooMuchRequestException e) {
                    // Ignore
                }
                return null;
            }
            //3.同步发送
            RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.SUCCESS: {
                    RegisterBrokerResponseHeader responseHeader =
                        (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                    RegisterBrokerResult result = new RegisterBrokerResult();
                    result.setMasterAddr(responseHeader.getMasterAddr());
                    result.setHaServerAddr(responseHeader.getHaServerAddr());
                    if (response.getBody() != null) {
                        result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                    }
                    return result;
                }
                default:
                    break;
            }
    
            throw new MQBrokerException(response.getCode(), response.getRemark());
        }

    TopicConfigManager

      topicConfigManager 维护了broker 所有的topic 信息,也是与将 topicConfigTable 定时上报给namesrv ;

     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        private static final long LOCK_TIMEOUT_MILLIS = 3000;
        private transient final Lock lockTopicConfigTable = new ReentrantLock();
        //维护了topic 的相关信息
        private final ConcurrentMap<String, TopicConfig> topicConfigTable =
            new ConcurrentHashMap<String, TopicConfig>(1024);
        private final DataVersion dataVersion = new DataVersion();
        private final Set<String> systemTopicList = new HashSet<String>();
        private transient BrokerController brokerController;
  • 相关阅读:
    Windows 下Nexus搭建Maven私服
    WebService中获取request对象一例
    利用window.navigator.userAgent判断当前是否微信内置浏览器
    批量插入写法
    MySql 使用递归函数时遇到的级联删除问题
    【Mysql】 你会用 information_schema吗?
    Volatile 多线程中用到的关键字
    spring+springMVC中使用@Transcational方式管理事务的必须要配的东西。
    Android中操作SQLite数据库
    Oracle中的字符处理方法
  • 原文地址:https://www.cnblogs.com/iscys/p/13124053.html
Copyright © 2011-2022 走看看