zoukankan      html  css  js  c++  java
  • Kafka调试入门(一)

    很多人对kafka消息队列应该不陌生,使用起来也比较方便。对kafka最常见的操作一般有如下几种:

    1. 启动kafka集群
    2. 创建一个名称为xxx的主题(topic)
    3. 查看已经创建好的主题
    4. 向xxx这个主题中插入一些数据
    5. 从xxx这个主题中消费一些数据
    针对这几种操作,其实kafka都为大家提供了一系列方便使用的脚本,这些脚本都在bin文件夹中,主要有
    • bin/kafka-server-start.sh   //作用是启动kafka服务端进程
    • bin/kafka-topics.sh    //作用是创建、查看主题 
    • bin/kafka-console-producer.sh    //命令行方式的生产者
    • bin/kafka-console-consumer.sh    //命令行方式的消费者

    下面是一些常用的操作实例:

    • 启动kafka集群:
        bin/kafka-server-start.sh config/server.properties
    • 创建一个名称是test的主体(topic)
        bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    • 查看创建的主题(topic)
        bin/kafka-topics.sh --list --zookeeper localhost:2181
    • 往test这个topic中插入一些数据,用kafka-console-producer
        bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    • 从test这个topic中消费一些数据,用kafka-console-consumer从最开头开始读取数据:
        bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
     

     正文

    kafka-server-start.sh执行流程分析

    首先分析一下运行bin/kafka-server-start.sh config/server.properties的时候,kafka源码的执行流程。

    第一步:先看看kafka-server-start.sh这个脚本,如下

    #!/bin/bash
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    # 此处是判断一下执行kafka-server-start.sh 这个脚本后边的参数的个数,
    # 如果小于1,那么说明没有传入server.properties这个文件的路径,
    # 那就打印使用说明给用户并直接退出,不继续执行后续的脚本。
    if [ $# -lt 1 ];
    then
    	echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
    	exit 1
    fi
    
    #该脚本所在的文件夹的路径,例如:/data/kafka/bin/
    base_dir=$(dirname $0)
    
    
    
    # 检查KAFKA_LOG4J_OPTS这个变量是否已经被设置
    # 如果没有,就设置为 
    # -Dlog4j.configuration=file:$base_dir/../config/log4j.properties
    # 其中的$base_dir会被替换为真实路径
    # 假设base_dir是/data/kafka/bin/的话
    # KAFKA_LOG4J_OPTS=
    # "-Dlog4j.configuration=file:/data/kafka/bin/../config/log4j.properties"
    
    if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
        export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
    fi
    
    
    # 检查KAFKA_HEAP_OPTS这个变量是否已经被设置
    # 如果没有的话,就设置为
    # -Xmx1G -Xms1G
    # 即 KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    
    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
        export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    fi
    
    
    
    # EXTRA_ARGS="-name kafkaServer -loggc"
    EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
    
    # $1是执行脚本时的第一个参数,比如
    # sh 1.sh a b c
    # 那么$1='a'   $2='b'  $3='c'
    # 所以此处是检查运行kafka-server-start.sh 这个脚本的时候,第一个参数是不是-daemon
    # 假设我运行的是./kafka-server-start.sh -daemon ../conf/server.properties
    # 那么就会进入下面的   -daemon)  分支
    # 假设我运行的是./kafka-server-start.sh  ../conf/server.properties
    # 那么就会进入到下面的   *)   分支
    # -daemon  分支做的事情是 给EXTRA_ARGS这个变量再追加一个参数,
    # 原来EXTRA_ARGS="-name kafkaServer -loggc",
    # 追加后变成了"-daemon -name kafkaServer -loggc"
    
    COMMAND=$1
    case $COMMAND in
      -daemon)
        EXTRA_ARGS="-daemon "$EXTRA_ARGS
        shift
        ;;
      *)
        ;;
    esac
    
    #最后运行程序启动kafka server
    # 可以把exec改成 echo 打印出来看看 下面的脚本都被替换成啥了
    # 我本机输出了 bin/kafka-run-class.sh -name kafkaServer -loggc kafka.Kafka -deamon config/server.properties
    #也就是说其实最后运行的是 bin/kafka-run-class.sh -name kafkaServer -loggc kafka.Kafka -deamon config/server.properties
    
    exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
    

      

    兜兜转转,我们发现我执行的

    bin/kafka-server-start.sh -daemon conf/server.properties

    最后在kafka-server-start.sh脚本里面进行一番转换之后变成了

    bin/kafka-run-class.sh -name kafkaServer -loggc kafka.Kafka -deamon config/server.properties

    那么接着我们就开始分析bin/kafka-run-class.sh这个脚本,还是贴一下这个脚本完整代码,再结合代码做一些解释。

     

    #!/bin/bash
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    #如果参数的个数小于1,进入if条件
    if [ $# -lt 1 ];
    then
      #打印该脚本的使用方式给用户。
      echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
      exit 1
    fi
    
    # 执行一下linux命令 uname -a ,看一下的结果中是不是有“CYGWIN”字符串
    if [[ $(uname -a) =~ "CYGWIN" ]]; then
      #如果有的话,就把CYGWIN设置为1
      CYGWIN=1
    else
      #如果没有匹配到“CYGWIN”字符串,则把CYGWIN设置为0
      CYGWIN=0
    fi
    
    # 判断一下$INCLUDE_TEST_JARS变量的值,是否为空;是的话进入if中
    if [ -z "$INCLUDE_TEST_JARS" ]; then
      #把INCLUDE_TEST_JARS设置为false
      INCLUDE_TEST_JARS=false
    fi
    
    #一个用来匹配测试用的jar包的正则表达式
    regex="(-(test|test-sources|src|scaladoc|javadoc).jar|jar.asc)$"
    
    #一个函数,用来判断一个文件是不是测试相关的或者文档相关的。
    #如果传入的文件是测试相关的,返回0
    #如果传入的是与运行程序息息相关的,重要的,则返回1
    should_include_file() {
      if [ "$INCLUDE_TEST_JARS" = true ]; then
        return 0
      fi
      file=$1
      if [ -z "$(echo "$file" | egrep "$regex")" ] ; then
        return 0
      else
        return 1
      fi
    }
    
    #设置base_dir为当前脚本bin文件夹的上一级目录。
    #比如我的kafka-run-class.sh路径是/data/kafka2.2/bin/,那么base_dir就是/data/kafka2.2/
    base_dir=$(dirname $0)/..
    
    # 判断一下$SCALA_VERSION变量的值,是否为空;是的话进入if中
    if [ -z "$SCALA_VERSION" ]; then
      #设置$SCALA_VERSION为2.13.2
      SCALA_VERSION=2.13.2
      #判断一下base_dir目录下是不是有gradle.properties文件,如果有的话,进入if逻辑块
      if [[ -f "$base_dir/gradle.properties" ]]; then
        #设置SCALA_VERSION变量为从gradle.properties中查询到的值,比如gradle.properties中可能有一行scalaVersion=2.11.1
        #那么用cut切掉等号左边的部分,剩下2.11.1。把2.11.1设置到
        SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
      fi
    fi
    
    # 判断一下$SCALA_BINARY_VERSION内容的长度是不是0,是则进入if
    if [ -z "$SCALA_BINARY_VERSION" ]; then
      #设置SCALA_BINARY_VERSION变量的值为SCALA_VERSION的前三分之二。
      #比如SCALA_VERSION=2.11.1   那么SCALA_BINARY_VERSION=2.11
      SCALA_BINARY_VERSION=$(echo $SCALA_VERSION | cut -f 1-2 -d '.')
    fi
    done
    
    #这里设置nullglob开启,这个nullglob开启之后,shell脚本或者终端中可以识别通配符*,关闭后不能用*这个通配符
    shopt -s nullglob
    
    # 判断一下$UPGRADE_KAFKA_STREAMS_TEST_VERSION变量的值,是否为空;是的话进入if中
    if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
      #依次把base_dir/core/build/文件夹下dependant-libs-${SCALA_VERSION}开头的文件夹路径 赋值给 dir变量
      for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*;
      do
        #把dir变量的内容追加到$CLASSPATH中
        CLASSPATH="$CLASSPATH:$dir/*"
      done
    fi
    
    #依次把base_dir/examples/build/libs/文件夹下kafka-examples开头,.jar结尾的文件全路径(含文件名称) 赋值给 file变量
    for file in "$base_dir"/examples/build/libs/kafka-examples*.jar;
    do
      #判断一下,如果should_include_file返回1,也就是说是重要的jar包,就进入到if逻辑
      if should_include_file "$file"; then
        #添加$file变量的内容到$CLASSPATH中
        CLASSPATH="$CLASSPATH":"$file"
      fi
    done
    
    
    # 判断一下$UPGRADE_KAFKA_STREAMS_TEST_VERSION变量的值,是否为空;是的话进入if中
    if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
      #设置clients_lib_dir为    当期脚本所在目录/../clients/build/libs
      clients_lib_dir=$(dirname $0)/../clients/build/libs
      #设置streams_lib_dir为    当期脚本所在目录/../streams/build/libs
      streams_lib_dir=$(dirname $0)/../streams/build/libs
      #设置streams_dependant_clients_lib_dir为    当期脚本所在目录/../streams/build/dependant-libs-${SCALA_VERSION}
      streams_dependant_clients_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
    else
      #设置clients_lib_dir为    /opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
      clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
      #设置streams_lib_dir为    和clients_lib_dir一样
      streams_lib_dir=$clients_lib_dir
      #设置streams_dependant_clients_lib_dir为    和clients_lib_dir一样
      streams_dependant_clients_lib_dir=$streams_lib_dir
    fi
    
    #依次把$clients_lib_dir文件夹下kafka-clients开头,.jar结尾的文件全路径(含文件名称) 赋值给 file变量
    for file in "$clients_lib_dir"/kafka-clients*.jar;
    do
      #判断一下,如果should_include_file返回1,也就是说是重要的jar包,就进入到if逻辑
      if should_include_file "$file"; then
        #添加$file变量的内容到$CLASSPATH中
        CLASSPATH="$CLASSPATH":"$file"
      fi
    done
    
    #依次把$streams_lib_dir文件夹下kafka-streams开头,.jar结尾的文件全路径(含文件名称) 赋值给 file变量
    for file in "$streams_lib_dir"/kafka-streams*.jar;
    do
      #判断一下,如果should_include_file返回1,也就是说是重要的jar包,就进入到if逻辑
      if should_include_file "$file"; then
        #添加$file变量的内容到$CLASSPATH中
        CLASSPATH="$CLASSPATH":"$file"
      fi
    done
    
    # 判断一下$UPGRADE_KAFKA_STREAMS_TEST_VERSION变量的内容的长度是否为0,是则进入
    if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
      #依次把base_dir/streams/examples/build/libs/文件夹下kafka-streams-examples开头,.jar结尾的文件全路径(含文件名称) 赋值给 file变量
      for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
      do
        #判断一下,如果should_include_file返回1,也就是说是重要的jar包,就进入到if逻辑
        if should_include_file "$file"; then
          #添加$file变量的内容到$CLASSPATH中
          CLASSPATH="$CLASSPATH":"$file"
        fi
      done
    else
      #替换$UPGRADE_KAFKA_STREAMS_TEST_VERSION中的小数点,比如$UPGRADE_KAFKA_STREAMS_TEST_VERSION=2.12.1
      #替换后变成2121
      VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/.//g'`
      #SHORT_VERSION_NO_DOTS等于VERSION_NO_DOTS剔除最后一位,比如VERSION_NO_DOTS等于2121  SHORT_VERSION_NO_DOTS等于212
      SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number
      #依次把base_dir/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/文件夹下
      #kafka-streams-upgrade-system-tests开头,.jar结尾的文件全路径(含文件名称) 赋值给 file变量
      for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
      do
        #判断一下,如果should_include_file返回1,也就是说是重要的jar包,就进入到if逻辑
        if should_include_file "$file"; then
          #添加$file变量的内容到$CLASSPATH中
          CLASSPATH="$file":"$CLASSPATH"
        fi
      done
      #如果$SHORT_VERSION_NO_DOTS等于0100,
      if [ "$SHORT_VERSION_NO_DOTS" = "0100" ]; then
        #追加/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar到classpath
        CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar":"$CLASSPATH"
        #追加/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar到classpath
        CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar":"$CLASSPATH"
      fi
      #如果$SHORT_VERSION_NO_DOTS等于0101
      if [ "$SHORT_VERSION_NO_DOTS" = "0101" ]; then
        #追加/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar到classpath
        CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar":"$CLASSPATH"
        #追加/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar到classpath
        CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar":"$CLASSPATH"
      fi
    fi
    
    #遍历$streams_dependant_clients_lib_dir文件夹下的rocksdb开头,.jar结尾的文件
    for file in "$streams_dependant_clients_lib_dir"/rocksdb*.jar;
    do
      #依次添加文件到classpath中
      CLASSPATH="$CLASSPATH":"$file"
    done
    
    #遍历$streams_dependant_clients_lib_dir文件夹下的中间匹配hamcrest字符串,.jar结尾的文件
    for file in "$streams_dependant_clients_lib_dir"/*hamcrest*.jar;
    do
      CLASSPATH="$CLASSPATH":"$file"
    done
    
    #遍历$streams_dependant_clients_lib_dir文件夹下的kafka-tools开头,.jar结尾的文件
    for file in "$base_dir"/tools/build/libs/kafka-tools*.jar;
    do
      #判断一下,如果should_include_file返回1,也就是说是重要的jar包,就进入到if逻辑
      if should_include_file "$file"; then
        #添加$file变量的内容到$CLASSPATH中
        CLASSPATH="$CLASSPATH":"$file"
      fi
    done
    
    #依次把base_dir/tools/build/文件夹下dependant-libs-${SCALA_VERSION}开头的文件夹路径 赋值给 dir变量
    for dir in "$base_dir"/tools/build/dependant-libs-${SCALA_VERSION}*;
    do
      #添加$file变量的内容到$CLASSPATH中
      CLASSPATH="$CLASSPATH:$dir/*"
    done
    
    #依次把 "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension" 逐个赋值给cc_pkg变量
    for cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
    do
      #遍历$base_dir/connect/${cc_pkg}/build/libs/文件夹下的connect-${cc_pkg}开头,.jar结尾的文件
      for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
      do
        #判断一下,如果should_include_file返回1,也就是说是重要的jar包,就进入到if逻辑
        if should_include_file "$file"; then
          #添加$file变量的内容到$CLASSPATH中
          CLASSPATH="$CLASSPATH":"$file"
        fi
      done
      #判断$base_dir/connect/${cc_pkg}/build/dependant-libs文件夹是否存在,是的话进入if
      if [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then
        #把base_dir/connect/${cc_pkg}/build/dependant-libs/下的所有文件都加入到classpath中去
        CLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*"
      fi
    done
    
    # 遍历base_dir文件夹下的libs
    for file in "$base_dir"/libs/*;
    do
      #判断一下,如果should_include_file返回1,也就是说是重要的jar包,就进入到if逻辑
      if should_include_file "$file"; then
        #添加$file变量的内容到$CLASSPATH中
        CLASSPATH="$CLASSPATH":"$file"
      fi
    done
    
    #遍历base_dir/core/build/libs/文件夹下所有kafka_${SCALA_BINARY_VERSION}开头的文件
    for file in "$base_dir"/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar;
    do
      #判断一下,如果should_include_file返回1,也就是说是重要的jar包,就进入到if逻辑
      if should_include_file "$file"; then
        #添加$file变量的内容到$CLASSPATH中
        CLASSPATH="$CLASSPATH":"$file"
      fi
    done
    #到此关闭nullglob,也就是说后边用不到*去匹配文件和文件夹了
    shopt -u nullglob
    
    
    #判断$CLASSPATH的内容长度是不是0,是则为真
    if [ -z "$CLASSPATH" ] ; then
      #打印错误信息
      echo "Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=$SCALA_VERSION'"
      #退出脚本,不再向下继续执行
      exit 1
    fi
    
    #判断$KAFKA_JMX_OPTS的内容长度是不是0,是则为真
    if [ -z "$KAFKA_JMX_OPTS" ]; then
      #设置KAFKA_JMX_OPTS的值为-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false
      KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "
    fi
    
    #判断$JMX_PORT是不是空的,空的话就进入if块
    if [  $JMX_PORT ]; then
      #设置KAFKA_JMX_OPTS的值为 $KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT
      KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
    fi
    
    #判断$LOG_DIR是不是空的,空的话就进入if块
    if [ "x$LOG_DIR" = "x" ]; then
      #LOG_DIR=$base_dir/log
      LOG_DIR="$base_dir/logs"
    fi
    
    #判断$KAFKA_LOG4J_OPTS内容的长度是不是0,是则为真
    if [ -z "$KAFKA_LOG4J_OPTS" ]; then
      #设置LOG4J_DIR为$base_dir/config/tools-log4j.properties
      LOG4J_DIR="$base_dir/config/tools-log4j.properties"
      # 如果检测到CYGWIN,那么需要转换LOG4J_DIR中的path
      (( CYGWIN )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")
      #设置KAFKA_LOG4J_OPTS为 -Dlog4j.configuration=file:${LOG4J_DIR}
      KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"
    else
      # 如果不存在$LOG_DIR这个文件夹,那么就创建一个文件夹
      if [ ! -d "$LOG_DIR" ]; then
        mkdir -p "$LOG_DIR"
      fi
    fi
    
    # 如果检测到CYGWIN,那么需要转换LOG_DIR中的path
    (( CYGWIN )) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}")
    KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
    
    # 判断$KAFKA_OPTS这个变量内容长度为0,则为真
    if [ -z "$KAFKA_OPTS" ]; then
      #如果为空,设置为""
      KAFKA_OPTS=""
    fi
    
    #如果$KAFKA_DEBUG的内容为空
    if [ "x$KAFKA_DEBUG" != "x" ]; then
    
        # 设置DEFAULT_JAVA_DEBUG_PORT变量为5005
        DEFAULT_JAVA_DEBUG_PORT="5005"
    
        #如果$JAVA_DEBUG_PORT的长度为0,则为真
        if [ -z "$JAVA_DEBUG_PORT" ]; then
            #设置JAVA_DEBUG_PORT为$DEFAULT_JAVA_DEBUG_PORT
            JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"
        fi
    
        # DEFAULT_JAVA_DEBUG_OPTS设置为-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT
        DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"
        # 检查$JAVA_DEBUG_OPTS的内容是不是长度为0,是则进入if
        if [ -z "$JAVA_DEBUG_OPTS" ]; then
            #设置JAVA_DEBUG_OPTS的值等于$DEFAULT_JAVA_DEBUG_OPTS
            JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"
        fi
    
        #打印 Enabling Java debug options: $JAVA_DEBUG_OPTS 到屏幕
        echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"
        #设置KAFKA_OPTS的变量值为 $JAVA_DEBUG_OPTS 拼接上$KAFKA_OPTS
        KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"
    fi
    
    #判断一下使用那个JAVA_HOME
    #首先判断JAVA_HOME这个变量的内容的长度是不是0,是则进入if
    if [ -z "$JAVA_HOME" ]; then
      #设置JAVA变量的值为"JAVA"
      JAVA="java"
    else
      #设置JAVA变量的值为$JAVA_HOME/bin/java
      JAVA="$JAVA_HOME/bin/java"
    fi
    
    #设置内存相关的参数
    #首先判断一下$KAFKA_HEAP_OPTS变量内容的长度是不是0,是则进入if
    if [ -z "$KAFKA_HEAP_OPTS" ]; then
      #设置KAFKA_HEAP_OPTS的值为 -Xmx256M
      KAFKA_HEAP_OPTS="-Xmx256M"
    fi
    
    # JVM性能参数
    # MaxInlineLevel=15是自JDK 14以来的默认值,一旦不再支持旧版JDK,就可以删除。
    #首先判断一下$KAFKA_JVM_PERFORMANCE_OPTS变量内容的长度是不是0,是则进入if
    if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
      #设置KAFKA_JVM_PERFORMANCE_OPTS的值为-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true
      KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
    fi
    
    #首先$#代表的是脚本后边的参数的个数
    #-gt是 greater  than 的缩写,代表的意思是 大于
    #判断一下参数的个数是不是大于0,是则进入
    while [ $# -gt 0 ]; do
      #设置COMMAND的值为第一个参数
      COMMAND=$1
      # 判断COMMAND的内容
      case $COMMAND in
        #如果COMMAND的内容为-name
        -name)
          #设置DAEMON_NAME的值为紧随COMMAND后边的第二个参数
          DAEMON_NAME=$2
          #设置CONSOLE_OUTPUT_FILE的值为$LOG_DIR/$DAEMON_NAME.out
          CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
          #剔除原来的前2个参数,把原来的第三个参数变为第一个参数,原来的第四个参数变为第二个参数
          shift 2
          ;;
        #如果COMMAND的内容为-loggc
        -loggc)
          #判断$KAFKA_GC_LOG_OPTS,如果$KAFKA_GC_LOG_OPTS的内容长度为0,则进入if
          if [ -z "$KAFKA_GC_LOG_OPTS" ]; then
            #GC_LOG_ENABLED的值设置为true
            GC_LOG_ENABLED="true"
          fi
          #剔除原来的1个参数,把原来的第二个参数变为第一个参数,原来的第三个参数变为第二个参数
          shift
          ;;
        #如果COMMAND的内容为-daemon
        -daemon)
          #设置DAEMON_MODE的值为true
          DAEMON_MODE="true"
          #剔除原来的1个参数,把原来的第二个参数变为第一个参数,原来的第三个参数变为第二个参数
          shift
          ;;
        *)
          break
          ;;
      esac
    done
    
    # GC相关的参数设置
    #设置GC_FILE_SUFFIX的值为-gc.log
    #设置GC_LOG_FILE_NAME的指为''
    GC_FILE_SUFFIX='-gc.log'
    GC_LOG_FILE_NAME=''
    #如果GC_LOG_ENABLED的值等于true的话
    if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
      #GC_LOG_FILE_NAME的值设置为$DAEMON_NAME追加$GC_FILE_SUFFIX
      GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX
    
      # 版本号的第一段,对于Java 9之前的版本来说是 "1"。
      # 然后变成'9', '10', ...
      # `java --version`第一行的一些例子。
      # 8 -> java版本 "1.8.0_152"
      # 9.0.4 ->java版本 "9.0.4"
      # 10 ->java版本 "10" 2018-03-20
      # 10.0.1 -> java版本 "10.0.1" 2018-04-17
      # 我们需要匹配到行尾,以防止sed打印不匹配的字符。
    
      #此处匹配java的主版本号,jdk1.8之前的JAVA_MAJOR_VERSION都是1
      #jdk1.8往后就变成了9,10,11等
      JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/1/p')
      #如果JAVA_MAJOR_VERSION的版本号大于等于9话
      if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
        #设置KAFKA_GC_LOG_OPTS的值为 -Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400
        KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400"
      else
        #否则设置为-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M
        KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
      fi
    fi
    
    # 从classpath中删除可能的冒号前缀(当CLASSPATH为空白时,发生在`CLASSPATH="$CLASSPATH:$file"`这样的行)。
    # 右侧使用的语法是原生的Bash字符串操作,更多细节请看下面的内容。
    # http://tldp.org/LDP/abs/html/string-manipulation.html, 特别是标题为 "子串去除 "的部分。
    #设置CLASSPATH的值为
    CLASSPATH=${CLASSPATH#:}
    
    #如果检测到CGYWIN,需要把CLASSPATH的值转化为windowns特有的格式
    (( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")
    
    #登录模式
    #如果DAEMON_MODE是true的话
    if [ "x$DAEMON_MODE" = "xtrue" ]; then
      #后台执行程序,并打印到"$CONSOLE_OUTPUT_FILE" 文件中
      nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
    else
      #前台之前,把日志打印到前台
      exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"
    fi
    

      

     上面对脚本进行了细致的分析,但是最重要的其实我们要分析最后几行代码,

    我以bin/kafka-run-class.sh -name kafkaServer -loggc kafka.Kafka -deamon config/server.properties

    为例子,最终经过这个kafka-run-class.sh脚本后,最后其实真实运行的脚本

    nohup java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 
    -Djava.awt.headless=true -Xloggc:/home/hadoop/software/kafka_2.13-2.5.1/bin/../logs/kafkaServer-gc.log 
    -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M 
    -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false  
    -Dkafka.logs.dir=/home/hadoop/software/kafka_2.13-2.5.1/bin/../logs 
    -Dlog4j.configuration=file:./../config/log4j.properties 
    -cp /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/activation-1.1.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/aopalliance-repackaged-2.5.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/argparse4j-0.7.0.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/audience-annotations-0.5.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/commons-cli-1.4.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/commons-lang3-3.8.1.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-api-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-basic-auth-extension-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-file-2.5.1.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-json-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-mirror-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-mirror-client-2.5.1.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-runtime-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/connect-transforms-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/hk2-api-2.5.0.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/hk2-locator-2.5.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/hk2-utils-2.5.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-annotations-2.10.2.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-core-2.10.2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-databind-2.10.2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-dataformat-csv-2.10.2.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-datatype-jdk8-2.10.2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-jaxrs-base-2.10.2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-jaxrs-json-provider-2.10.2.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-module-jaxb-annotations-2.10.2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-module-paranamer-2.10.2.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jackson-module-scala_2.13-2.10.2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jakarta.activation-api-1.2.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jakarta.annotation-api-1.3.4.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jakarta.inject-2.5.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/javassist-3.22.0-CR2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/javassist-3.26.0-GA.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/javax.servlet-api-3.1.0.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/javax.ws.rs-api-2.1.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jaxb-api-2.3.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jersey-client-2.28.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jersey-common-2.28.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jersey-container-servlet-2.28.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jersey-container-servlet-core-2.28.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jersey-hk2-2.28.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jersey-media-jaxb-2.28.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jersey-server-2.28.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-client-9.4.24.v20191120.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-continuation-9.4.24.v20191120.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-http-9.4.24.v20191120.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-io-9.4.24.v20191120.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-security-9.4.24.v20191120.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-server-9.4.24.v20191120.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-servlet-9.4.24.v20191120.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-servlets-9.4.24.v20191120.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jetty-util-9.4.24.v20191120.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/jopt-simple-5.0.4.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka_2.13-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka_2.13-2.5.1-sources.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka-clients-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka-log4j-appender-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka-streams-2.5.1.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka-streams-examples-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka-streams-scala_2.13-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka-streams-test-utils-2.5.1.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/kafka-tools-2.5.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/log4j-1.2.17.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/lz4-java-1.7.1.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/maven-artifact-3.6.3.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-buffer-4.1.50.Final.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-codec-4.1.50.Final.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-common-4.1.50.Final.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-handler-4.1.50.Final.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-resolver-4.1.50.Final.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-transport-4.1.50.Final.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-transport-native-epoll-4.1.50.Final.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/netty-transport-native-unix-common-4.1.50.Final.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/osgi-resource-locator-1.0.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/paranamer-2.8.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/plexus-utils-3.2.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/reflections-0.9.12.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/rocksdbjni-5.18.3.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/scala-collection-compat_2.13-2.1.3.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/scala-java8-compat_2.13-0.9.0.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/scala-library-2.13.1.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/scala-logging_2.13-3.9.2.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/scala-reflect-2.13.1.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/slf4j-api-1.7.30.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/slf4j-log4j12-1.7.30.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/snappy-java-1.1.7.3.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/validation-api-2.0.1.Final.jar:
    /home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/zookeeper-3.5.8.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/zookeeper-jute-3.5.8.jar:/home/hadoop/software/kafka_2.13-2.5.1/bin/../libs/zstd-jni-1.4.4-7.jar  
    "kafka.Kafka ../conf/server.properties" > "/home/hadoop/software/kafka_2.13-2.5.1/bin/../logs/kafkaServer.out" 2>&1 < /dev/null &

     最终发现执行的是kafka.Kafka类,传入的参数是 ../conf/server.properties,然后把日志输出到/home/hadoop/software/kafka_2.13-2.4.1/logs/kafkaServer.out文件中。

    后续的文章中我们将会继续对kafka.Kafka的执行流程进行分析,本文先暂时分析shell脚本部分。

  • 相关阅读:
    The Future of Middleware and the BizTalk Roadmap
    FW: How to spawn a process that runs under the context of the impersonated user in Microsoft ASP.NET pages
    Strips illegal Xml characters
    luogu P2280 激光炸弹(二维前缀和)
    luogu P2704 炮兵阵地(经典状态压缩DP)
    SP1716 GSS3 Can you answer these queries III (线段树维护最大连续子段和)
    二分图判定、匹配问题
    C++语法综合 | 基于char*设计一个字符串类MyString
    luogu P1044 火车进出栈问题(Catalan数)
    C++设计模式 | 三种设计模式基础
  • 原文地址:https://www.cnblogs.com/lukairui/p/14362979.html
Copyright © 2011-2022 走看看