zoukankan      html  css  js  c++  java
  • CentOS7 安装kafka集群

    1. 环境准备

    JDK1.8

    ZooKeeper集群(参见本人博文

    Scala2.12(如果需要做scala开发的话,安装方法参见本人博文)

    本次安装的kafka和zookeeper集群在同一套物理机器上,

    192.168.1.101

    192.168.1.102

    192.168.1.103

    2. 下载kafka

    到kafka官网下载kafka,目前最新的版本是kafka_2.12-2.1.0.tgz,前面是scala版本号,后面是kafka版本号。

    wget http://mirror.bit.edu.cn/apache/kafka/2.1.0/kafka_2.12-2.1.0.tgz  #下载kafka_2.12-2.1.0.tgz
    mkdir /usr/local/kafka   #kafka安装文件放置目录
    tar xzf kafka_2.12-2.1.0.tgz          #解压kafka二进制文件
    mv kafka_2.12-2.1.0 /usr/local/kafka  #移动解压后的kafka到/usr/local/kafka
    cd /usr/local/kafka/kafka_2.12-2.10/config   #定位到kafka_2.12-2.10/config目录,以便后面进行参数配置

    3. 配置kafka

    看一下配置文件目录下的文件列表

     ls -l /usr/local/kafka/kafka_2.12-2.1.0/config
    total 68
    -rw-r--r-- 1 root root  906 Nov 10 03:50 connect-console-sink.properties
    -rw-r--r-- 1 root root  909 Nov 10 03:50 connect-console-source.properties
    -rw-r--r-- 1 root root 5321 Nov 10 03:50 connect-distributed.properties
    -rw-r--r-- 1 root root  883 Nov 10 03:50 connect-file-sink.properties
    -rw-r--r-- 1 root root  881 Nov 10 03:50 connect-file-source.properties
    -rw-r--r-- 1 root root 1111 Nov 10 03:50 connect-log4j.properties
    -rw-r--r-- 1 root root 2262 Nov 10 03:50 connect-standalone.properties
    -rw-r--r-- 1 root root 1221 Nov 10 03:50 consumer.properties
    -rw-r--r-- 1 root root 4727 Nov 10 03:50 log4j.properties
    -rw-r--r-- 1 root root 1925 Nov 10 03:50 producer.properties
    -rw-r--r-- 1 root root 6851 Nov 10 03:50 server.properties
    -rw-r--r-- 1 root root 1032 Nov 10 03:50 tools-log4j.properties
    -rw-r--r-- 1 root root 1169 Nov 10 03:50 trogdor.conf
    -rw-r--r-- 1 root root 1023 Nov 10 03:50 zookeeper.properties

    192.160.1.101:

    配置server.properties文件

    vim /usr/local/kafka/kafka_2.12-2.1.0/config/server.properties

    修改其中的内容为:

    #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
    broker.id=0  
    
    #当前kafka对外提供服务的端口默认是9092
    port=9092 
    
    #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
    host.name=192.168.1.101 
    
    #这个是borker进行网络处理的线程数
    num.network.threads=3 
    
    #这个是borker进行I/O处理的线程数
    num.io.threads=8 
    
    #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
    log.dirs=/usr/local/kafka/kafka_2.12-2.1.0/kafkalogs/ 
    
    #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
    socket.send.buffer.bytes=102400 
    
    #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
    socket.receive.buffer.bytes=102400 
    
    #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
    socket.request.max.bytes=104857600 
    
    #默认的分区数,一个topic默认1个分区数
    num.partitions=1 
    
    #默认消息的最大持久化时间,168小时,7天
    log.retention.hours=168 
    
    #消息保存的最大值5M
    message.max.byte=5242880  
    
    #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
    default.replication.factor=2  
    #取消息的最大直接数
    replica.fetch.max.bytes=5242880  
    
    #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
    log.segment.bytes=1073741824 
    
    #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
    log.retention.check.interval.ms=300000 
    
    #是否启用log压缩,一般不用启用,启用的话可以提高性能
    log.cleaner.enable=false 
    
    #设置zookeeper的连接端口
    zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 

    相比默认设置,改动项为:

    #broker.id=0  每台服务器的broker.id都不能相同
    
    #hostname
    host.name=192.168.1.101
    
    #在log.retention.hours=168 下面新增下面三项
    message.max.byte=5242880
    default.replication.factor=2
    replica.fetch.max.bytes=5242880
    
    #设置zookeeper的连接端口
    zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 

    将192.168.1.101上配置好的kafka复制到192.168.1.102和192.168.1.103上

    scp -r /usr/local/kafka/ 192.168.1.102:/usr/local #将目录/usr/local/kafka/复制到192.168.1.102远程机器上去
    scp -r /usr/local/kafka/ 192.168.1.103:/usr/local #将目录/usr/local/kafka/复制到192.168.1.103远程机器上去

    修改102,103上的server.properties,改动配置项为: broker.id,host.name

    4. 配置系统环境变量(三台机器都要做)

    vim  /etc/profile

    在文件末尾添加内容:

    #kafka
    export KAFKA_HOME=/usr/local/kafka/kafka_2.12-2.1.0
    export PATH=$KAFKA_HOME/bin:$PATH

    :wq保存退出

    source  /etc/profile #使环境变量生效

    5. 启动并测试kafka集群

     注意:这个操作三台机器都要做,另外启动之前,请确保zookeeper集群已经启动!

    cd /usr/local/kafka/kafka_2.12-2.1.0  #定位到kafka的根目录
    ./bin/kafka-server-start.sh -daemon  ./config/server.properties #启动kafka服务,-daemon表示在后台运行kafka服务器
    #或者直接使用绝对地址启动
    $KAFKA_HOME/bin/kafka-server-start.sh -daemon  $KAFKA_HOME/config/server.properties

     5.1 检查kafka是否启动

    192.168.1.101

    [root@101 kafka_2.12-2.1.0]# jps
    62403 NameNode
    31013 NodeManager
    22325 Kafka
    7621 PaloFe
    54217 QuorumPeerMain
    4697 BrokerBootstrap
    25836 Jps
    62589 DataNode

    192.168.1.102

    [root@102 kafka_2.12-2.1.0]# jps
    47474 QuorumPeerMain
    15203 NodeManager
    15061 ResourceManager
    7673 Kafka
    59642 PaloFe
    11322 Jps
    1389 BrokerBootstrap
    37517 SecondaryNameNode
    37359 DataNode

    192.168.1.103

    [root@103 kafka_2.12-2.1.0]# jps
    57232 Jps
    1185 RunJar
    52898 RunJar
    52563 PaloFe
    1428 BrokerBootstrap
    62404 NodeManager
    62342 QuorumPeerMain
    20952 ManagerBootStrap
    52440 Kafka
    47901 DataNode

    5.2 创建Topic来验证是否创建成功

    #创建Topic
    $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.1.101:2181 --replication-factor 2 --partitions 1 --topic lenmom
    #解释
    --replication-factor 2   #复制两份
    --partitions 1 #创建1个分区
    --topic #主题为shuaige
    
    #在一台服务器(192.168.1.101)上创建一个发布者
    #创建一个broker,发布者
    $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.1.101:9092 --topic lenmom
    
    #在一台服务器(192.168.1.102)上创建一个订阅者
    #kafka 1.2.2以前版本使用     kafka-console-consumer.sh --zookeeper localhost:2181 --topic lenmom --from-beginning
    #kafka 1.2.2及之后版本是使用 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic lenmom --from-beginning
    $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic lenmom --from-beginning

    192.168.101:

    [root@101 kafka_2.12-2.1.0]# $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.1.101:2181 --replication-factor 2 --partitions 1 --topic lenmom
    Created topic "lenmom".
    [root@101 kafka_2.12-2.1.0]# $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.1.101:9092 --topic lenmom
    >1
    >2
    >3
    >4
    >5
    >6
    >7
    >8
    >9
    >

    192.168.1.102:

    [root@102 kafka_2.12-2.1.0]# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic lenmom --from-beginning
    1
    2
    3
    4
    5
    6
    7
    8
    9

     5.3 查看topic

    $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181   #显示所有主题
    __consumer_offsets
    lenmom

    5.4 查看topic状态

    $KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic lenmom
    Topic:lenmom    PartitionCount:1        ReplicationFactor:2     Configs:
            Topic: lenmom   Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0,2
    #分区为为1  复制因子为2   他的  shuaige的分区为0 
    #Replicas: 0,1   复制的为0,1

    5.5 删除topic

    $KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic lenmom
    Topic lenmom is marked for deletion.
    Note: This will have no impact if delete.topic.enable is not set to true.

    上面消息的意思是,topic被标记为待删除,至于实际是否删除取决于broker的配置参数delete.topic.enable是否为true,在kafka1.0版本之后,该参数值默认为true.

    5.6 Producer吞吐量测试

    $KAFKA_HOME/bin/kafka-producer-perf-test.sh --topic lenmom --num-record 500000 --record-size 200 --throughput 3 --producer-props  bootstrap.servers=192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092 acks=-1 
    17 records sent, 3.4 records/sec (0.00 MB/sec), 27.6 ms avg latency, 357.0 max latency.
    15 records sent, 3.0 records/sec (0.00 MB/sec), 5.8 ms avg latency, 7.0 max latency.
    15 records sent, 3.0 records/sec (0.00 MB/sec), 5.3 ms avg latency, 6.0 max latency.
    15 records sent, 3.0 records/sec (0.00 MB/sec), 5.3 ms avg latency, 6.0 max latency.
    15 records sent, 3.0 records/sec (0.00 MB/sec), 5.2 ms avg latency, 9.0 max latency.
    15 records sent, 3.0 records/sec (0.00 MB/sec), 5.3 ms avg latency, 9.0 max latency.
    15 records sent, 3.0 records/sec (0.00 MB/sec), 4.9 ms avg latency, 6.0 max latency.
    15 records sent, 3.0 records/sec (0.00 MB/sec), 4.9 ms avg latency, 5.0 max latency.
    15 records sent, 3.0 records/sec (0.00 MB/sec), 5.0 ms avg latency, 6.0 max latency.
    15 records sent, 3.0 records/sec (0.00 MB/sec), 4.5 ms avg latency, 5.0 max latency.

    至此,kafka集群安装完毕!

    6. 其他说明

    6.1 日志说明

    默认kafka的日志是保存在$KAFKA_HOME/logs目录下的,这里说几个需要注意的日志

    server.log #kafka的运行日志
    state-change.log  #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里
    
    controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.

    6.2 zookeeper

    #使用客户端进入zk
    ./zkCli.sh -server 127.0.0.1:2181
    #查看目录情况 执行“ls /”
    [zk: 127.0.0.1:12181(CONNECTED) 0] ls /
    #显示结果:[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
    #上面的显示结果中:只有zookeeper是,zookeeper原生的,其他都是Kafka创建的
    #标注一个重要的
    [zk: 127.0.0.1:12181(CONNECTED) 1] get /brokers/ids/0
    {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.1.103:9092"],"jmx_port":-1,"host":"192.168.1.103","timestamp":"1548127205035","port":9092,"version":4}
    cZxid = 0x100000063
    ctime = Tue Jan 22 11:20:05 CST 2019
    mZxid = 0x100000063
    mtime = Tue Jan 22 11:20:05 CST 2019
    pZxid = 0x100000063
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x20000d319500002
    dataLength = 196
    numChildren = 0
    [zk: 127.0.0.1:12181(CONNECTED) 2] 
    
    #还有一个是查看partion
    [zk: 127.0.0.1:12181(CONNECTED) 7] get /brokers/topics/lenmom/partitions/0
    null
    cZxid = 0x10000006b
    ctime = Tue Jan 22 11:46:34 CST 2019
    mZxid = 0x10000006b
    mtime = Tue Jan 22 11:46:34 CST 2019
    pZxid = 0x10000006c
    cversion = 1
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 0
    numChildren = 1

    kafka通过zookeeper来进行集群选举,元数据都存放在zookeeper中.

    参考文档:

    http://kafka.apache.org/documentation.html

  • 相关阅读:
    [LeetCode]Reverse Linked List II
    [LeetCode]Move Zeroes
    Next Greater Element I
    Keyboard Row
    Number Complement
    SQL语句学习(二)
    SQL语句学习(一)
    jQuery学习(三)
    jQuery学习(二)
    JQuery学习(一)
  • 原文地址:https://www.cnblogs.com/lenmom/p/10302445.html
Copyright © 2011-2022 走看看