zoukankan      html  css  js  c++  java
  • Kafka集群环境搭建

    本文只讲述Kafka集群环境的搭建步骤,后续会对kafka的其他相关知识进行整理.

    1、准备工作

    Linux服务器 3台(本文将在一台linux服务器上建立三个文件夹来模拟三台linux服务器,搭建伪集群)
    JDK1.8 http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
    Zookeeper http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/
    Kafka https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz


        

    2、开始搭建

    安装JDK,参考:http://www.cnblogs.com/sky-lz/p/8351827.html

    配置&安装Zookeeper

    #我的目录统一放在/opt下面
    #首先创建Zookeeper项目目录
    mkdir zookeeper 
    
    #再进入新建好的zookeeper目录下创建模拟的三个服务器的项目目录(server1,server2,server3)
    mkdir server1
    mkdir server2
    mkdir server3
    
    #将下载好的zookeeper-3.4.10.tar.gz解压到三个server中.
    tar -zxvf zookeeper-3.4.10.tar.gz
    
    #在server1,server2,server3中分别创建下面两个文件夹
    mkdir data  #存放快照日志
    mkdir datalog #存放事物日志

     修改配置文件

    进入到解压后的zookeeper-3.4.10的conf目录.

    #进入conf目录
    /opt/zookeeper/server1/zookeeper-3.4.10/conf
    /opt/zookeeper/server2/zookeeper-3.4.10/conf
    /opt/zookeeper/server2/zookeeper-3.4.10/conf

    conf目录中的zoo_sample.cfg  这个文件是官方给我们的zookeeper的样板文件,我们复制一份命名为zoo.cfg保存在和样板文件同目录下(server1,server2,server3操作步骤一样),zoo.cfg是官方指定的文件命名规则。

    修改/opt/zookeeper/server1/zookeeper-3.4.10/conf/zoo.cfg文件如下:(代码中红色字体是需要我们更改的内容.)

    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/opt/zookeeper/server1/data
    dataLogDir=/opt/zookeeper/server1/datalog
    clientPort=2181
    server.1=127.0.0.1:2888:3888
    server.2=127.0.0.1:2889:3889
    server.3=127.0.0.1:2890:3890

    server2,server3中的/zookeeper-3.4.10/conf/zoo.cfg文件和server1中的更改内容大致相同,需要注意的是dataDir,dataLogDir属性的值要改为server2,server3相应的data和datalog目录. clientPort=2181端口号要在server2和server3中分别改为 clientPort=2182和 clientPort=2183

    配置文件解释:

    #tickTime:
    这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
    #initLimit:
    这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
    #syncLimit:
    这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒
    #dataDir:
    快照日志的存储路径
    #dataLogDir:
    事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
    #clientPort:
    这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点

    创建myid文件:

    #在server中的data文件夹下创建myid文件
    
    #server1
    /opt/zookeeper/server1/data/myid
    文件中内容为 1
    
    #server2
    /opt/zookeeper/server2/data/myid
    文件中内容为 2
    
    #server3
    /opt/zookeeper/server3/data/myid
    文件中内容为 3

    启动服务并查看

    1,启动服务

    #进入到Zookeeper的bin目录下
    #server1
    cd /opt/zookeeper/server1/zookeeper-3.4.10/bin
    
    #启动服务
    ./zkServer.sh start
    
    #server2
    cd /opt/zookeeper/server2/zookeeper-3.4.10/bin
    
    #启动服务
    ./zkServer.sh start
    
    #server3
    cd /opt/zookeeper/server3/zookeeper-3.4.10/bin
    
    #启动服务
    ./zkServer.sh start
    
    #启动成功后的信息(server1为例):
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/server1/zookeeper-3.4.10/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED

    2,检查服务状态

    #检查服务器状态
    ./zkServer.sh status
    #下面的状态为启动成功。
    ZooKeeper JMX enabled by default
    Using config: /home/user/zookeeper/server3/zookeeper3.4.10/bin/../conf/zoo.cfg
    Mode: follower      
    #Mode有两种类型:leader和follower,leader代表领导(主节点),follower代表下属(备节点)
    #zk集群一般只有一个leader,多个follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来。

     至此,zookeeper集群就成功搭建完成了,接下来我们开始搭建kafka。

    配置&安装Kafka

    #创建目录
    cd /opt/
    mkdir kafka #创建项目目录
    cd kafka
    mkdir kafkalogs #创建kafka消息目录,主要存放kafka消息-对应server1服务器
    mkdir kafkalogs1 #创建kafka消息目录,主要存放kafka消息-对应server2服务器
    mkdir kafkalogs2 #创建kafka消息目录,主要存放kafka消息-对应server3服务器
    #解压kafka_2.11-1.0.0.tgz到kafka目录下

    tar -zxvf kafka_2.11-1.0.0.tgz
    #如果是三台真实的linux服务器,只需要将kafka_2.11-1.0.0.tgz解压到三台服务器的/opt/kafka目录下,再新建kafkalogs即可。

    修改kafka配置文件

    #进入到config目录
    cd /opt/kafka/kafka_2.11-1.0.0/config/

    我们可以看到目录下有zookeeper的一些文件,这是kafka内置的zookeeper集群,我们可以使用它来直接启动,但建议使用独立的zookeeper集群。

    -rw-r--r--. 1 root root  906 Oct 27 08:56 connect-console-sink.properties
    -rw-r--r--. 1 root root  909 Oct 27 08:56 connect-console-source.properties
    -rw-r--r--. 1 root root 5807 Oct 27 08:56 connect-distributed.properties
    -rw-r--r--. 1 root root  883 Oct 27 08:56 connect-file-sink.properties
    -rw-r--r--. 1 root root  881 Oct 27 08:56 connect-file-source.properties
    -rw-r--r--. 1 root root 1111 Oct 27 08:56 connect-log4j.properties
    -rw-r--r--. 1 root root 2730 Oct 27 08:56 connect-standalone.properties
    -rw-r--r--. 1 root root 1221 Oct 27 08:56 consumer.properties
    -rw-r--r--. 1 root root 4727 Oct 27 08:56 log4j.properties
    -rw-r--r--. 1 root root 1919 Oct 27 08:56 producer.properties
    -rw-r--r--. 1 root root  173 Jan  7 05:54 server-1.properties
    -rw-r--r--. 1 root root  173 Jan  7 05:56 server-2.properties
    -rw-r--r--. 1 root root  172 Jan  7 05:55 server.properties
    -rw-r--r--. 1 root root 1032 Oct 27 08:56 tools-log4j.properties
    -rw-r--r--. 1 root root 1023 Oct 27 08:56 zookeeper.properties

    我们主要修改 server.properties 这个文件即可. 用以下代码覆盖server.properties里的内容进行保存.(这些是主要参数,以后需要自定义其他参数再做调整)

    broker.id=0
    listeners=PLAINTEXT://127.0.0.1:9092
    port=9092
    host.name=127.0.0.1
    log.dirs=/opt/kafka/kafkalogs
    zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

    因为我是在一台服务器搭建,可以将server.properties复制两份,分别命名为server1.properties,server2.properties,来模拟三台服务器。如果是在三台不同服务器,则不需要复制多份,只需要在三台服务器分别对server.properties进行配置即可.

    #在config目录下复制两份server.properties,并命名 server-1.propertis, server-2.propertis
    
    cp server.properties server-1.propertis
    
    cp server.properties server-2.propertis

    修改server1.properties如下:

    broker.id=1
    listeners=PLAINTEXT://127.0.0.1:9093
    port=9093
    host.name=127.0.0.1
    log.dirs=/opt/kafka/kafkalogs1
    zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

    修改server2.properties如下:

    broker.id=2
    listeners=PLAINTEXT://127.0.0.1:9094
    port=9094
    host.name=127.0.0.1
    log.dirs=/opt/kafka/kafkalogs2
    zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

    启动Kafka集群并测试

    1、启动服务

    #从后台启动Kafka集群(3台都需要启动)
    
    #进入到kafka的根目录 
    cd /opt/kafka/kafka_2.11-1.0.0
    
    #模拟启动三个配置文件,代表三台服务器.
    ./bin/kafka-server-start.sh -daemon config/server.properties
    ./bin/kafka-server-start.sh -daemon config/server1.properties
    ./bin/kafka-server-start.sh -daemon config/server2.properties
    
    # 启动命令中的 -daemon 表示以守护进程的方式启动.

     2、测试kafka

    创建主题:

    #创建一个test主题,分区数为3,备份数为3
    #在kafka根目录执行下面命令
    bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 3 --topic test

    启动生产者:

    #kafka根目录执行,启动一个生产者
    bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

    不要关闭生产者窗口,再打开一个新窗口,进入到kafka根目录,启动消费者:

    #启动消费者命令
    bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning

    在生产者发送一条消息,查看消费者是否有接收成功。接收成功后kafka集成环境搭建完成。

    附加:

    以下附赠kafka一些基本的命令,(这些命令都在kafka根目录下执行)。

    #启动命令 -daemon表示以守护进程启动
    bin/kafka-server-start.sh -daemon config/server.properties
    
    #停止命令 
    bin/kafka-server-stop.sh -daemon config/server.properties
    
    #查看topic列表
    bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
    
    #创建topic:
    bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 5 --topic test
    
    #查看"test"主题详情
    bin/kafka-topics.sh --describe --zookeeper  127.0.0.1:2181 --topic test
    
    #为主题test增加2个分区数量
    bin/kafka-topics.sh --alter --zookeeper localhost:2182 --topic test --partitions 2
    
    #启动生产者
    bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
    
    #启动消费者
    #  --from-beginning代表启动后将这个主题之前的消息都显示出来。
    bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning

    #查看消费者组列表
    bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

    #查看某个消费者组的偏移量(offset)
    bin/kafka-consumer-groups.sh --bootstrap-server 192.168.33.69:9092 --describe --group my-group

     如果遇到如下报错,把127.0.0.1改为192.168.0.188类似的真实ip就可以解决

    [root@localhost bin]# ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
    aaa
    bbb
    [2018-03-09 06:16:28,195] ERROR Error when sending message to topic test with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
    [2018-03-09 06:17:28,202] ERROR Error when sending message to topic test with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
    aaa
    [2018-03-09 06:19:14,385] ERROR Error when sending message to topic test with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

  • 相关阅读:
    20150603_Andriod 多个窗体数据回调
    onActivityResult传值的使用
    20150602_Andriod 向窗体传递参数
    20150601_Andriod 打开新窗体
    C# 添加.DLL 出错的解决方法
    c# 中crystal report输出PDF文件
    参考_Android中,如何新建一个界面,并且实现从当前界面切换到到刚才新建的(另外一个)界面
    andriod 新建 Activity_ Form (详细设置)
    sql in
    如何取得GridView被隐藏列的值
  • 原文地址:https://www.cnblogs.com/xiaohanlin/p/8636327.html
Copyright © 2011-2022 走看看