zoukankan      html  css  js  c++  java
  • RocketMQ(4.8.0)——Broker 概述、启动和停止流程

    Broker 概述、启动和停止流程

      Broker 是 RocketMQ 体系中核心组件之一,存储是 Broker 的核心功能之一,决定整个 RocketMQ 体系的吞吐性能、可靠性和可用性。

    一、Broker 概述

      1.1 什么是 Broker

      Broker 是 RocketMQ 中核心的模块之一,主要负责处理各种 TCP 请求(计算)和存储消息(存储),在各个组件中的角色:

      Broker 分为 Master 和 Slave。Master 主要提供服务,Slave 在 Master 宕机后提供消费服务。

      1.2 Broker 存储目录结构

      commitlog:这是一个目录,其中包含具体的 commitlog 文件。文件名长度为 20 个字符,文件名由该文件保存消息的最大物理机 offset 值在高危补全 0 组成。每个文件大小一般是 1 GB,可以通过 mapedFileSizeCommitLog 进行配置。

      consumequeue:这是一个目录,包含该 Broker 上所有的 Topic 对应的消费队列文件信息。消费队列文件的格式为 "consumequeue/Topic 名字/queue id/具体消费队列文件"。每个消费队列其实是 commitlog 的一个索引,提供给消费者做拉取消息、更新位点使用。

      Index:这是一个目录,全部的文件都是按照消息 key 创建的 Hash 索引。文件名是用创建的时间戳命名的。

      Config:这是一个目录,保存了当前 Broker 中全部的 Topic、订阅关系和消费进度。这些数据 Broker 会定时从内存持久化到磁盘,以便宕机后恢复。

      abort:Broker 是否异常关闭的标志。正常关闭时候该文件被删除,异常关闭时则不会。当 Broker 重新启动时,根据是否异常宕机决定是否需要重新构建 Index 索引等操作。

      checkpoint:Broker 最近一次正常运行时的状态,比如最后一次正常刷盘的时间、最后一次正确索引的时间。

      1.3 Broker 启动和停止流程

      启动命令分为两个脚本:bin/mqbroker 和 bin/runbroker.sh。mqbroker 准备了 RocketMQ 启动本身的环境数据,比如 ROCKETMQ_HOME 环境变量。runbroker.sh 主要设置了 JVM 启动参数,比如 JAVA_HOME、Xms、Xmx。

      os.sh 是 RocketMQ 开发人员认为适合的系统调优参数。sh os.sh执行即可设置。启动 bin/mqbroker 的时候,需要按照自己环境的内存大小,在 bin/runbroker.sh 设置合适的内存大小。

      BrokerStartup.java 类主要负责为真正的启动过程做准备,解析脚本传过来的参数,初始化 Broker 配置,创建 BrokerController 实例等工作。

      BrokerController.java 类是 Broker 的掌控者,它管理和控制 Broker 的各个模块,包含通信模块、存储模块、索引模块、定时任务等。在 BrokerController 全部模块初始化并启动成功后,将在日志中输出 info信息"boot success"。

      第一步:初始化启动环境。由 bin/mqbroker 和 bin/runbroker.sh 两个脚本来完成的。bin/mqbroker 脚本主要用于设置 RocketMQ 根目录环境变量,调用 bin/runbroker.sh 进入 RocketMQ 的启动入口,代码路径:rocketmq/bin/mqbroker,代码如下:

     1 # Unless required by applicable law or agreed to in writing, software
     2 # distributed under the License is distributed on an "AS IS" BASIS,
     3 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     4 # See the License for the specific language governing permissions and
     5 # limitations under the License.
     6 
     7 if [ -z "$ROCKETMQ_HOME" ] ; then
     8   ## resolve links - $0 may be a link to maven's home
     9   PRG="$0"
    10 
    11   # need this for relative symlinks
    12   while [ -h "$PRG" ] ; do
    13     ls=`ls -ld "$PRG"`
    14     link=`expr "$ls" : '.*-> (.*)$'`
    15     if expr "$link" : '/.*' > /dev/null; then
    16       PRG="$link"
    17     else
    18       PRG="`dirname "$PRG"`/$link"
    19     fi
    20   done
    21 
    22   saveddir=`pwd`
    23 
    24   ROCKETMQ_HOME=`dirname "$PRG"`/..
    25 
    26   # make it fully qualified
    27   ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`
    28 
    29   cd "$saveddir"
    30 fi
    31 
    32 export ROCKETMQ_HOME
    33 echo ${ROCKETMQ_HOME}
    34 sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@
    35                                                                                           

      bin/runbroker.sh 脚本的主要功能是检测 JDK 的环境配置和 JVM 的参数配置。JDK 的环境配置的检查逻辑代码路径:bin/runbroker.sh,具体的实现代码如下:

    1 [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
    2 [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
    3 [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
    4 
    5 export JAVA_HOME
    6 export JAVA="$JAVA_HOME/bin/java"
    7 export BASE_DIR=$(dirname $0)/..
    8 export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}

      注意 bin/runbroker.sh 中 JVM 的参数设置,通常,-Xms、-Xmx、-Xmn 和 -XX:MaxDirectMemorySize 这 4 个参数会随着 RocketMQ 服务器的物理机内存大小的变化而进行相应的改变。

      第二步:初始化 BrokerController。

      该初始化主要包含 RocketMQ 启动命令行参数解析、Broker 各个模块配置参数解析、Broker 各个模块初始化、进程关机 Hook 初始化等过程。

      RocketMQ 启动命令行参数解析。

      brokerConfig、nettyServerConfig、nettyClientConfig、messageStoreConfig 这些基本配置对象初始化完毕后,还有后续代码依据各种启动条件重新调整部分参数。

      在各个配置对象初始化完毕后,程序会调用 BrokerController.initialize()方法对 Broker 的各个模块进行初始化。

      首先,加载 Broker 基础数据配置和存储层服务,代码路径:D: ocketmq-masterrokersrcmainjavaorgapache ocketmqrokerBrokerController.java,具体代码如下:

     1     public boolean initialize() throws CloneNotSupportedException {
     2         boolean result = this.topicConfigManager.load();
     3 
     4         result = result && this.consumerOffsetManager.load();
     5         result = result && this.subscriptionGroupManager.load();
     6         result = result && this.consumerFilterManager.load();
     7 
     8         if (result) {
     9             try {
    10                 this.messageStore =
    11                     new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
    12                         this.brokerConfig);
    13                 if (messageStoreConfig.isEnableDLegerCommitLog()) {
    14                     DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
    15                     ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
    16                 }
    17                 this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
    18                 //load plugin
    19                 MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
    20                 this.messageStore = MessageStoreFactory.build(context, this.messageStore);
    21                 this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
    22             } catch (IOException e) {
    23                 result = false;
    24                 log.error("Failed to initialize", e);
    25             }
    26         }
    27 
    28         result = result && this.messageStore.load();  #加载 Broker 基础数据配置,包含 Broker 中的 Topic、消费位点、订阅关系、消费过滤(无实际数据需要加载)

      之后,初始化存储层服务对象 messageStore 和 Broker 监控统计对象 brokerStats。

      然后,Broker 会初始化通信层服务和一系列的定时任务。通信层服务主要初始化正常通信通道、VIP通信通道和通信线程池。由于代码太多,并且大多数逻辑类似,所以这里以 VIP 通道为例,讲解通信层服务初始化;以消费进度定时持久化为例,讲解定时任务初始化。

      在 Broker 中 VIP 通道通信端口 10909,与正常通信端口 10911 相差2,10909 和 10911 这两个端口有什么关系呢?VIP 通道又如何初始化的呢?

     1         if (result) {
     2             this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
     3              = (NettyServerConfig) this.nettyServerConfig.clone();
     4             fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
     5             this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
     6             this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
     7                 this.brokerConfig.getSendMessageThreadPoolNums(),
     8                 this.brokerConfig.getSendMessageThreadPoolNums(),
     9                 1000 * 60,
    10                 TimeUnit.MILLISECONDS,
    11                 this.sendThreadPoolQueue,
    12                 new ThreadFactoryImpl("SendMessageThread_"));

      fastConfig 就是 VIP 通信层的配置,其配置对象 "克隆" 自正常通信的配置对象,唯独通信端口是 nettyServerConfig.getListenPort() - 2,也就是 10911 - 2。利用 fastConfig 初始化 fastRemotingServer 的结果也就是我们常用的 VIP 通道。

      RocketMQ 的通信层实现本质上是基于 Netty 的,那么通信层又是如何处理客户端发送的 Netty 请求的呢?

      通信层对象初始化完成后,会调用 this.registerProcessor() 方法,这里将正常的通信层对象和 VIP 通道的通信层对象与各个请求处理器进行关联,比如将发送消息的请求交给接收消息的请求处理器进行处理。

      在 VIP 通信层初始化有了基本料及哦吼,开始消费进度定时持久化。

      Broker 在接收消费者上报的消费进度后,会定期持久化到物理机文件中,当消费者因为重新发布或者宕机而重启时,能从消费进度中得知恢复,不至于重复消费。

      第三步:启动 RocketMQ的各个组件。

      组件启动代码路径:D: ocketmq-masterrokersrcmainjavaorgapache ocketmqrokerBrokerController.java 中 start() 方法,由于启动过程非常复杂,这里按照代码执行顺序讲解各个模块功能。

      this.messageStore:存储层服务,比如 CommitLog、ConsumeQueue 存储管理。

      this.remotingServer:普通通道请求处理服务。一般的请求都是在这里被处理的。

      this.fastRemotingServer:VIP 通道请求处理服务。如果普通通道比较忙,那么可以使用 VIP 通道,一般作为客户端降级使用。

      this.brokerOuterAPI:Broker 访问对外接口的封装对象。

      this.pullRequestHoldService:Pull 长轮询服务。

      this.clientHousekeepingService:清理心跳超时的生产者、消费者、过滤服务器。

      this.filterServerManager:过滤服务器管理。

      this.brokerStatsManager:Broker 监控数据统计管理。

      this.brokerFastFailure:Broker 快速失败处理。

      Broker 启动进程后,关闭 Broker 进程其实是启动过程的逆过程,这里不再赘述。

      Broker 关闭只是调用 BrokerStartup.java 中注册 JVM Hook 的 BrokerController.shutdown()方法,该方法再调用各个模块关闭方法,最后关闭整个进程。Broker 进程关闭处理完成后,日志输出 info 信息 "Shutdown hook over"

  • 相关阅读:
    动画电影分享
    Nginx 学习
    震惊!一步激活idea,亲测有效-2020-7-9
    根据oracle判断语句结果,进行循环语句
    Oracle11g的exp导出空表提示EXP-00011: 不存在
    查询某个用户下各个表的数据量
    Oracle批量修改表字段类型(存储过程)
    PLS-00201: identifier 'SYS.DBMS_EXPORT_EXTENSION' must be declared
    Oracle AWR报告生成和大概分析
    oracle如何给原有的用户变更表空间
  • 原文地址:https://www.cnblogs.com/zuoyang/p/14441905.html
Copyright © 2011-2022 走看看