zoukankan      html  css  js  c++  java
  • Linux Kafka源码环境搭建

    本文主要讲述的是如何搭建Kafka的源码环境,主要针对的Linux操作系统下IntelliJ IDEA编译器,其余操作系统或者IDE可以类推。

    1.安装和配置JDK
    确认JDK版本至少为1.7,最好是1.8及以上。使用java -version命令来查看当前JDK的版本,示例如下:

    lenmom@M1701:~/workspace/software/hadoop-2.7.3/bin$ java -version
    java version "1.8.0_191"
    Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
    Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)


    2.下载并安装配置Gradle
    下载地址为:https://gradle.org/releases/,本人使用的版本是5.2.1(官方在kafka2.1.0的源码中使用的是4.10.2版本,本人在linux上使用5.2.1版本是成功的,但是在windows上是失败的,如果在windows上进行搭建,建议采用4.10.2版本)。一般只需要将下载的包解压,然后再将$GRADLE_HOME/bin的路径添加到环境变量Path中即可,其中$GRADLE_HOME指的是Gradle的根目录。可以使用gradle -v命令来验证Gradle是否已经配置完成,示例如下:

    wget https://downloads.gradle.org/distributions/gradle-5.2.1-bin.zip
    unzip gradle-5.2.1-bin.zip -C /home/lenmom/software/

    配置环境变量

    vim /etc/profile

    添加以下内容

    export  GRADLE_HOME=/home/lenmom/software/grandle-5.2.1
    export PATH=$GRADLE_HOME/bin:$PATH

    保存退出,source /etc/profile使环境变量生效

    3.下载并安装配置Scala
    下载地址为:http://www.scala-lang.org/download/all.html,目前最新的版本是2.12.8,不过笔者这里使用的版本是2.11.8。如Gradle一样,只需要解压并将$SCALA_HOME/bin的路径添加到环境变量Path即可,其中$SCALA_HOME指的是Scala的根目录。可以使用scala -version命令来验证scala是否已经配置完成,示例如下:

    lenmom@M1701:~/workspace/software/hadoop-2.7.3/bin$ scala -version
    Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

    4. 构建Kafka源码环境
    Kafka下载地址为:http://kafka.apache.org/downloads,目前最新的版本是2.10。将下载的压缩包解压,并在Kafka的根目录执行gradle idea命令进行构建,如果你使用的是Eclipse,则只需采用gradle eclipse命令构建即可。构建细节如下所示:

    lenmom@M1701:~/workspace/open-source/kafka-2.1.0-src$ gradle idea
    Starting a Gradle Daemon (subsequent builds will be faster)
    
    > Configure project :
    Building project 'core' with Scala version 2.11.8
    Building project 'streams-scala' with Scala version 2.11.8
    
    > Task :idea
    Generated IDEA project at file:///home/lenmom/workspace/open-source/kafka-2.1.0-src/kafka-2.1.0-src.ipr
    
    Deprecated Gradle features were used in this build, making it incompatible with Gradle 6.0.
    Use '--warning-mode all' to show the individual deprecation warnings.
    See https://docs.gradle.org/5.2.1/userguide/command_line_interface.html#sec:command_line_warnings
    
    BUILD SUCCESSFUL in 19m 51s
    28 actionable tasks: 28 executed

    之后将Kafka导入到IDEA中即可。不过这样还没有结束,对于IDEA而言,还需要安装Scala插件,在Setting->Plugin中搜索scala并安装,可以参考下图,笔者这里是已经安装好的状态:

     

    5. 修改kafka_source_home/ gradle.properties中的scala版本

    vim  /home/lenmom//workspace/open-source/kafka-2.1.0-src/gradle.properties

    将其中的scalaVersion改为2.11.8,原来是2.11.11

    修改 /home/lenmom/workspace/open-source/kafka-2.1.0-src/gradle/dependencies.gradle scala版本

    /home/lenmom/workspace/open-source/kafka-2.1.0-src/gradle/dependencies.gradle 

    将其中的def defaultScala211Version = '2.11.11'改为def defaultScala211Version = '2.11.8'

    如果更改了scalaVersion,需要重新执行gradle idea命令来重新构建。虽然很多时候在操作系统中安装其他版本的Scala也并没有什么问题,比如安装2.12.12版本。但是有些情况下运行Kafka时会出现一些异常,而这些异常却又是由于Scala版本不一致而引起的,比如会出现下面示例中的报错:

    [2019-02-10 17:09:21,119] FATAL (kafka.Kafka$)
    java.lang.NoSuchMethodError: scala.collection.TraversableOnce.$init$(Lscala/collection/TraversableOnce;)V
    at kafka.message.MessageSet.<init>(MessageSet.scala:72)
    at kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:129)
    at kafka.message.MessageSet$.<init>(MessageSet.scala:32)
    at kafka.message.MessageSet$.<clinit>(MessageSet.scala)
    at kafka.server.Defaults$.<init>(KafkaConfig.scala:52)
    at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
    at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:686)
    at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
    at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
    at kafka.Kafka$.main(Kafka.scala:82)
    at kafka.Kafka.main(Kafka.scala)

    所以为了省去一些不必要的麻烦,还是建议读者在安装Scala版本之前先查看下Kafka源码中gradle.properties文件中配置的scalaVersion。

    6. 配置Kafka源码环境

    6.1 在确保了scalaVersion之后,需要将config目录下的log4j.properties文件拷贝到core/src/main/scala目录下,这样可以让Kafka在运行时能够输出日志信息

    cp /home/lenmom/workspace/open-source/kafka-2.1.0-src/config/log4j.properties  /home/lenmom/workspace/open-source/kafka-2.1.0-src/core/src/main/scala/


    6.2 配置server.properties文件,一般只需要修改以下一些配置项

    # 是否允许topic被删除,设置为true则topic可以被删除,
    # 开启这个功能方便Kafka在运行一段时间之后,能够删除一些不需要的临时topic
    delete.topic.enable=true
    # 禁用自动创建topic的功能
    auto.create.topics.enable=false
    # 存储log文件的目录,默认值为/tmp/kafka-logs
    # 示例是在Windows环境下运行,所以需要修改这个配置,注意这里的双反斜杠。
    log.dir=/home/lenmom/workspace/open-source/kafka-2.1.0-src/kafka-logs
    
    # 配置kafka依赖的zookeeper路径地址,这里的前提是在本地开启了一个zookeeper的服务
    zookeeper.connect=localhost:2181

    6.3 创建kafka日志保存目录

    mkdir /home/lenmom/workspace/open-source/kafka-2.1.0-src/kafka-logs

    6.4 确保zookeeper已经启动

    zookeeper的安装与启动请参见本人的前期博文

    6.5 启动kafka

    配置参数:

    Main class:    
    kafka.Kafka
    
    VM options:    
    -Dkafka.logs.dir=/home/lenmom/workspace/open-source/kafka-2.1.0-src/logs
    -Dlog4j.configuration=file:/home/lenmom/workspace/open-source/kafka-2.1.0-src/config/log4j.properties
    
    Program arguments: 
    /home/lenmom/workspace/open-source/kafka-2.1.0-src/config/server.properties
    
    Enviroment variables:
    JMX_PORT=9999
    
    Use classpath of module:
    Core_main 

     

    这里配置Main class为kafka.Kafka,并制定启动时所需要的配置文件地址,即:config/server.properties。配置JMX_PORT是为了方便搜集Kafka自身的Metrics数据。

    如此便可以顺利的运行Kafka服务了(第一次启动时会有一个耗时较长的编译过程),部分启动日志如下:

        zookeeper.session.timeout.ms = 6000
        zookeeper.set.acl = false
        zookeeper.sync.time.ms = 2000
     (kafka.server.KafkaConfig)
    [2019-02-10 19:27:20,203] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
    [2019-02-10 19:27:20,205] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
    [2019-02-10 19:27:20,210] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
    [2019-02-10 19:27:20,367] INFO Loading logs. (kafka.log.LogManager)
    [2019-02-10 19:27:20,403] INFO Logs loading complete in 36 ms. (kafka.log.LogManager)
    [2019-02-10 19:27:20,461] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
    [2019-02-10 19:27:20,467] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
    [2019-02-10 19:27:21,958] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
    [2019-02-10 19:27:22,098] INFO [SocketServer brokerId=0] Started 1 acceptor threads (kafka.network.SocketServer)
    [2019-02-10 19:27:22,200] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2019-02-10 19:27:22,204] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2019-02-10 19:27:22,207] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2019-02-10 19:27:22,272] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
    [2019-02-10 19:27:22,533] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.zk.KafkaZkClient)
    [2019-02-10 19:27:22,539] INFO Result of znode creation at /brokers/ids/0 is: OK (kafka.zk.KafkaZkClient)
    [2019-02-10 19:27:22,543] INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArrayBuffer(EndPoint(M1701,9092,ListenerName(PLAINTEXT),PLAINTEXT)) (kafka.zk.KafkaZkClient)
    [2019-02-10 19:27:22,760] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2019-02-10 19:27:22,785] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2019-02-10 19:27:22,791] INFO [ExpirationReaper-0-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2019-02-10 19:27:22,903] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.group.GroupCoordinator)
    [2019-02-10 19:27:22,907] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 2 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    [2019-02-10 19:27:22,907] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
    [2019-02-10 19:27:22,961] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:6000,blockEndProducerId:6999) by writing to Zk with path version 7 (kafka.coordinator.transaction.ProducerIdManager)
    [2019-02-10 19:27:23,044] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
    [2019-02-10 19:27:23,049] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
    [2019-02-10 19:27:23,050] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
    [2019-02-10 19:27:23,209] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
    [2019-02-10 19:27:23,230] INFO [SocketServer brokerId=0] Started processors for 1 acceptors (kafka.network.SocketServer)
    [2019-02-10 19:27:23,235] WARN Error while loading kafka-version.properties :null (org.apache.kafka.common.utils.AppInfoParser)
    [2019-02-10 19:27:23,238] INFO Kafka version : unknown (org.apache.kafka.common.utils.AppInfoParser)
    [2019-02-10 19:27:23,238] INFO Kafka commitId : unknown (org.apache.kafka.common.utils.AppInfoParser)
    [2019-02-10 19:27:23,244] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
    [2019-02-10 19:27:27,404] WARN Client session timed out, have not heard from server in 4179ms for sessionid 0x1000144f4a50007 (org.apache.zookeeper.ClientCnxn)
    [2019-02-10 19:27:27,406] INFO Client session timed out, have not heard from server in 4179ms for sessionid 0x1000144f4a50007, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
    [2019-02-10 19:27:29,323] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
    [2019-02-10 19:27:29,324] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
    [2019-02-10 19:27:29,328] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1000144f4a50007, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)

    如果出现:

    Failed to load class "org.slf4j.impl.StaticLoggerBinder" 错误消息,解决办法为:

    下载

    log4j-1.2.17.jar
    slf4j-api-1.7.25.jar
    slf4j-log4j12-1.7.25.jar

    并在File->Project Structure->Project Settings>Modules->core->core_main->Dependencies,添加这几个jar包为依赖项。

    本人在windows上按照这个操作亦成功搭建起了调试环境

    or using command line, which skip runing tests and checkstyle validation:

    gradle build -x test -x checkstyleMain -x checkstyleTest -x checkstyleScoverage -x spotbugsMain 



  • 相关阅读:
    python函数执行超时处理的两种方法
    Flask常用方法函数汇总
    夜神模拟器操作
    简单auto.js自动化处理andorid手机案例
    TCP-三次握手和四次挥手简单概述
    android手机执行shell脚本
    接口测试要测试什么?怎么测?
    python unittest单元测试
    python webdriver 测试框架--数据驱动之Excel驱动
    顺时针打印矩阵
  • 原文地址:https://www.cnblogs.com/lenmom/p/10359315.html
Copyright © 2011-2022 走看看