zoukankan      html  css  js  c++  java
  • 从0开始搭建多节点kafka集群

    环境及版本:mac系统,kafka 0.11,java8,zk3.4.10,100%可复现

    安装Java安装ZooKeeper安装kafka验证测试topic创建测试消息读写

    安装Java

      不论是ZooKeeper还是kakfa都需要提前安装好Java,这里选择的是java8版本。下载地址:https://www.java.com/zh_CN/download/mac_download.jsp


      按提示步骤下载即可,下载完成后,我们需要配置一下java的环境变量。
      首先我们在终端中输入:/usr/libexec/java_home -V,用来查看我们安装的jdk地址

      选择要进行配置的java版本,复制路径。输入open -e .bash_profile,打开配置文件,按照以下格式输入,并将之前复制的路径,粘贴到JAVA_HOME处。保存文件

     

    1JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home
    2PATH=$JAVA_HOME/bin:$PATH
    3export 
    4export

      输入source .bash_profile进行保存,接下来我们就可以验证以下配置是否成功。
      输入echo $JAVA_HOME查看JAVA_HOME配置,输入java -version查看java版本。

    安装ZooKeeper

      ZooKeeper是安装Kafka集群必要的组件,我们选择要安装的Zk版本是3.4.10。
      下载地址:
      https://archive.apache.org/dist/zookeeper/
      zookeeper的默认配置文件为zookeeper/conf/zoo_sample.cfg,需要将其修改为zoo.cfg,其中存放着zookeeper的配置信息,我们可以根据需要进行调整。最后我们可以在终端中进入zk的安装路径,输入bin/zkServer.sh start启动zk。你将看到如下输出

    1ZooKeeper JMX enabled by default
    2Using config: /Users/lijl/kafka/zookeeper-3.4.10/bin/../conf/zoo.cfg
    3Starting zookeeper ... STARTED

      还可以输入:bin/zkServer.sh status来查看当前zk的状态,来验证zk是否启动成功。

    1ZooKeeper JMX enabled by default
    2Using config: /Users/lijl/kafka/zookeeper-3.4.10/bin/../conf/zoo.cfg
    3Mode: standalone

      从输出可以看出,我们已经成功启动了一个单点zk,我们可以输入bin/zkServer.sh stop来关闭zk。

    1ZooKeeper JMX enabled by default
    2Using config: /Users/lijl/kafka/zookeeper-3.4.10/bin/../conf/zoo.cfg
    3Stopping zookeeper ... STOPPED

    安装kafka

      接下来就是我们这篇文章的重点,安装kafka,我们要安装的是一个多点环境的kafka,因为kafka伪集群(多个kakfa部署在同一个服务器上)只能进行业务开发或者是验证功能。但是不能在生产环境直接运用,否则就会失去kafka的分布式特性,比如负载均衡、故障转移等。
      多点kafka集群,是由一套多节点ZooKeeper集群和一套多节点kafka集群组成。所以我们想要搭建多点的kafka集群,首先要搭建多点的Zookeeper集群。ZooKeeper想要正常服务,要满足一个条件,“半数以上服务器存活”,即一个2n+1个服务器搭建的集群,最多可以容忍n台服务器宕机而依然保持服务状态。(我们最好选取奇数个zk服务器搭建集群,否则在进行leader选举的时候,会出现脑裂现象),本文将搭建3个节点的zk集群,主机名分别是:zk1、zk2、zk3。
      下面我们给出一个经典的多节点环境配置文件zoo.cfg,并对其参数进行解释。

    1tickTime=2000
    2dataDir=/Users/lijl/kafka/dataDir
    3clientPort=2181
    4initLimit=5
    5syncLimit=2
    6server.1=zk1:2888:3888
    7server.2=zk2:2888:3888
    8server.3=zk3:2888:3888

      tickTime:Zookeeper最小时间单位,用于丈量心跳时间和超时时间,通常设置成默认值:2秒即可
      dataDir:ZooKeeper会在内存中保存系统快照,并定期写入该路径的指定文件夹中,生产环境下应该转移该文件夹的磁盘占用情况。
      clientPort:Zookeeper监听客户端链接的端口,一般设置成默认的2181
      initLimit:指定follower节点初始时连接leader的最大tick次数,假设是5,那么follower必须在5 * tickTime(2s) = 10s内连接上leader,否则超时。
      SyncLimit:设定了follower节点和leader节点进行同步的最大时间,算法和initLimit类似。
      server.X=host:port:port:X必须是一个全局唯一的数字,并且要与myid文件中的数字对应,X一般取1~255,这行的后面配置了2个端口,第一个端口用于follower节点连接leader节点,第二个端口用于leader选举。myid文件位于zoo.cfg中DataDir配置的目录下,内容仅有一个数字ID。
      接下来,我们进入conf目录,创建3个配置文件zoo1.cfg、zoo2.cfg、zoo3.cfg内容如下(注意:这里server.x配置中的zk是我们zk节点所在的主机名):

     1tickTime=2000
    2dataDir=/Users/lijl/kafka/dataDir/zk1
    3clientPort=2181
    4initLimit=5
    5syncLimit=2
    6server.1=zk:2888:3888
    7server.2=zk:2889:3889
    8server.3=zk:2890:3890
    9
    10tickTime=2000
    11dataDir=/Users/lijl/kafka/dataDir/zk2
    12clientPort=2182
    13initLimit=5
    14syncLimit=2
    15server.1=zk:2888:3888
    16server.2=zk:2889:3889
    17server.3=zk:2890:3890
    18
    19tickTime=2000
    20dataDir=/Users/lijl/kafka/dataDir/zk3
    21clientPort=2183
    22initLimit=5
    23syncLimit=2
    24server.1=zk:2888:3888
    25server.2=zk:2889:3889
    26server.3=zk:2890:3890

      这里我们分别取了2181、2182、2183,其实取一个也是可以的,要确保端口没有冲突。文本只是在一台电脑上进行搭建,所以选了不同的端口,防止冲突,主机名也是相同的。
      创建好配置文件,接下来就要创建myid文件了,该文件必须在我们配置的dataDir中,在终端中输入以下内容:
      echo "1" > /Users/lijl/kafka/dataDir/zk1/myid
      echo "2" > /Users/lijl/kafka/dataDir/zk2/myid
      echo "3" > /Users/lijl/kafka/dataDir/zk3/myid
      进行快速的创建文件以及数据的写入,接下来我们来运行一下这3个zk实例。
      bin/zkServer.sh start conf/zoo1.cfg
      bin/zkServer.sh start conf/zoo2.cfg
      bin/zkServer.sh start conf/zoo3.cfg
      并分别查看他们的状态:bin/zkServer.sh status conf/zoo1.cfg(zoo2、zoo3类似)

     1ZooKeeper JMX enabled by default
    2Using config: conf/zoo1.cfg
    3Mode: follower
    4
    5ZooKeeper JMX enabled by default
    6Using config: conf/zoo2.cfg
    7Mode: leader
    8
    9conf/zoo3.cfg
    10ZooKeeper JMX enabled by default
    11Using config: conf/zoo3.cfg
    12Mode: follower

      至此,我们已经搭建了一个3个节点的zookeepr集群。接下来我们就在这个多节点的zk集群的基础上,搭建多节点kafka集群。
      首先我们下载kafka,选取的版本是0.11.3,下载地址:
      http://kafka.apache.org/downloads


      和搭建zk集群类似,我们也需要创建多份配置文件。(conf下创建3份配置文件,注意:这里listeners配置中的kafka1/2/3 代表我们kafka集群的主机名,zookeeper.connect中配置的zk,是我们zk节点所在的主机名)

     

     1broker.id=0
    2delete.topic.enable=true
    3listeners=PLAINTEXT://kafka1:9002
    4log.dirs=/Users/lijl/kafka/dataDir/kafka1
    5zookeeper.connect=zk:2181,zk:2182,zk1:2183
    6unclean.leader.election.enable=false
    7zookeeper.connection.timeout.ms=6000
    8
    9broker.id=1
    10delete.topic.enable=true
    11listeners=PLAINTEXT://kafka2:9003
    12log.dirs=/Users/lijl/kafka/dataDir/kafka2
    13zookeeper.connect=zk:2181,zk:2182,zk1:2183
    14unclean.leader.election.enable=false
    15zookeeper.connection.timeout.ms=6000
    16
    17broker.id=2
    18delete.topic.enable=true
    19listeners=PLAINTEXT://kafka3:9004
    20log.dirs=/Users/lijl/kafka/dataDir/kafka3
    21zookeeper.connect=zk:2181,zk:2182,zk1:2183
    22unclean.leader.election.enable=false
    23zookeeper.connection.timeout.ms=6000

      我们简单的介绍一下这些配置的含义:
      broder.id:broder的唯一标识
      delete.topic.enable:是否可以删除主题
      listener:该参数用于客户端和broker进行连接,用来绑定私网ip,如果想绑定公网ip,应该设置的是:advertised.listeners这个参数。
      log.dirs:kafka消息持久化的路径
      zookeeper.connect:zk节点,这里注意,应该配置全部的zk节点
      unclean.leader.election.enable:消息同步落后太多的副本可不可以进行leader选举。
      zookeeper.connection.timeout.ms:zk连接超时时间。
      接着我们只需要执行命令启动kafka就可以了。
      bin/kafka-server-start.sh -daemon config/server1.properties
      bin/kafka-server-start.sh -daemon config/server2.properties
      bin/kafka-server-start.sh -daemon config/server3.properties
      我们可以查看位于kafka logs目录下的server.log,确认kafka broker已经启动成功。


      也可以通过jps |grep Kafka来确认broker进程是否启动成功(Kafka大写)

      如果没有正确启动,首先应该着重检查kafka配置是否正确,比如listeners少加了一个s,那么在启动kafka的时候,就会启动默认的主机和端口配置,会报端口占用,而不是帮你检查配置文件是否正确

     

    验证

      此时我们的kakfa多点集群环境已经搭建完成,接下来我们来对这个集群做一些简单的验证。

    测试topic创建

      首先我们创建一个测试用的topic,名为test-topic,为了充分利用搭建的3台服务器,我们选择创建3个分区,每一个分区分配3个副本。
      bin/kafka-topics.sh --zookeeper zk:2181,zk:2182,zk:2183 --create --topic test-topic --partitions 3 --replication-factor 3
      这里的zk指的是zookeeper节点主机名,执行成功后,会输出:Created topic "test-topic"
      接下来通过一些命令来做一下验证,查看所有的topic列表
      bin/kafka-topics.sh --zookeeper zk:2181,zk:2182,zk:2183 -list,执行后显示我们刚创建的test-topic
      bin/kafka-topics.sh --zookeeper zk:2181,zk:2182,zk:2183 --describe --topic test-topic,执行后将显示test-topic的详细信息,包括分区数,副本数,ISR,分区Leader信息等。

    1Topic:test-topic    PartitionCount:3    ReplicationFactor:3 Configs:
    2    Topic: test-topic   Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
    3    Topic: test-topic   Partition: 1    Leader: 2   Replicas: 2,0,1 Isr: 2,0,1
    4    Topic: test-topic   Partition: 2    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2

    测试消息读写

      最后我们对消息的发送和消费进行测试,使用kafka提供的kafka-console-producer和kafka-console-consumer脚本进行数据的读写。具体的命令如下:
      bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test-topic --from-beginning
      bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test-topic --from-beginning
      我们可以同时开启两个终端进行操作,命令中的kafka1/2/3指的是kafka所在的节点主机名。我们可以观察出来,producer脚本产生的消息,consumer脚本马上就能感知到。


      最后,期待您的订阅和点赞,专栏每周都会更新,希望可以和您一起进步,同时也期待您的批评与指正!

    imageimage
  • 相关阅读:
    不务正业系列-浅谈《过气堡垒》,一个RTS玩家的视角
    [LeetCode] 54. Spiral Matrix
    [LeetCode] 40. Combination Sum II
    138. Copy List with Random Pointer
    310. Minimum Height Trees
    4. Median of Two Sorted Arrays
    153. Find Minimum in Rotated Sorted Array
    33. Search in Rotated Sorted Array
    35. Search Insert Position
    278. First Bad Version
  • 原文地址:https://www.cnblogs.com/nedulee/p/12421970.html
Copyright © 2011-2022 走看看