zoukankan      html  css  js  c++  java
  • kafka集群安装及基本命令行使用

    集群安装

    因为官方文档整理的不好,所以按照《Kafka入门与实践》这本书学习,学习的版本是0.10.1.1,学习完后再关注高版本的变化及新增的特性即可。

    环境介绍

    本次安装kafka集群利用的是自带的zooKeeper,其实最好是把kafka和zooKeeper部署在不同的节点上,这样更高可用。

    三个节点:

    kafka1:192.168.56.100

    kafka2:192.168.56.101

    kafka2:192.168.56.102

    zooKeeper所需的三个端口:

    2181:对客户端提供服务

    2888:集群内节点通信使用

    3888:选举leader使用

    kafka所需的端口:9092:对客户端提供服务

    选用kafka版本:kafka_2.12-2.2.0.tgz,kafka版本是2.2.0,scala版本是2.12。上传到kafka1节点,解压到/home/koushengrui/app目录,即KAFKA_HOME是/home/koushengrui/app/kafka_2.12-2.2.0。配置文件在config子目录中,命令在bin子目录中,日志在logs子目录中。

    zooKeeper的相关配置,修改zookeeper.properties:

    dataDir=/data/zookeeper
    clientPort=2181
    maxClientCnxns=0
    tickTime=2000
    initLimit=20
    syncLimit=10
    server.1=192.168.56.100:2888:3888
    server.2=192.168.56.101:2888:3888
    server.3=192.168.56.102:2888:3888

    kafka的相关配置,修改server.properties:

    broker.id=1
    listeners=PLAINTEXT://192.168.56.100:9092
    log.dirs=/data/kafka-logs
    log.retention.check.interval.ms=300000
    log.retention.hours=168
    log.segment.bytes=1073741824
    num.io.threads=4
    num.partitions=3 default.replication.factor=2 num.network.threads=3 num.recovery.threads.per.data.dir=1 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 socket.send.buffer.bytes=102400 zookeeper.connect=192.168.56.100:2181,192.168.56.101:2181,192.168.56.102:2181 zookeeper.connection.timeout.ms=10000 fetch.message.max.bytes=52428800 replica.fetch.max.bytes=52428800
    delete.topic.enable=true

    其中,num.partitions=3表示默认分区个数是3,即每个topic都有3个分区。default.replication.factor=2表示默认副本因子是2,即每个分区有2个副本。log.dirs表示的是kafka的数据目录,换句话说是topic数据目录,里面存放了segment日志段。kafka服务本身的日志默认会放在${KAFKA_HOME}/logs,即放在与config目录同级的logs目录中,如果我们想改变这个目录,则需要修改bin目录中的kafka-run-class.sh脚本,搜索LOG_DIR,在这一行的上面指定LOG_DIR的值,如LOG_DIR=/root/logs。

    创建zooKeeper的数据目录/data/zookeeper,并在其中创建myid文件,内容为1。

    复制kafka_2.12-2.2.0目录到kafka2、kafka3节点,并对这两个节点的server.properties做稍微修改,修改broker.id和listeners的值。此外,还要这两个节点上都创建/data/zookeeper目录和/data/zookeeper/myid文件,内容分别是2和3。

    在三个节点上分别执行下面命令以启动zooKeeper集群。注意,在启动zookeeper时必须显式指定zookeeper的配置文件,否则会启动不成功。

    cd $KAFKA_HOME/bin;nohup ./zookeeper-server-start.sh ../config/zookeeper.properties &

    可以在各节点通过jps命令查看zooKeeper是否启动成功。启动成功的话,可以看到QuorumPeerMain进程。

    执行cd $KAFKA_HOME/bin;zookeeper-shell.sh 192.168.56.100:2181后可以执行zookeeper客户端命令,如ls /。可以看到此时有个/zookeeper节点,zookeeper节点下面有/zookeeper/quota子节点。

    在三个节点上分别执行下面命令以启动kafka集群:注意,在启动kafka时必须显式指定kafka的配置文件,否则会启动不成功

    cd $KAFKA_HOME/bin;nohup ./kafka-server-start.sh ../config/server.properties &

    这个时候再用jps命令,如果kafka启动成功的话,可以看到kafka进程。至于哪个节点是kafkaController,可以通过各节点的logs目录中的controller.log看出来。再连接上zookeeper,可以看到此时新增了cluster,controller_epoch,controller,brokers,admin,isr_change_notification,consumers,log_dir_event_notification,latest_producer_id_block,config节点。

    get /controller也可以看到leader controller是哪台broker

    其实每个节点都包含着很多信息,以后再研究。

    基本命令行使用:(命令在$KAFKA_HOME/bin目录中)

    1、查看kafka集群有哪些topic:

    kafka-topics.sh --list --zookeeper 192.168.56.100:2181

    2、查看kafka集群所有topic分区及其副本情况:

    kafka-topics.sh --describe --zookeeper 192.168.56.100:2181

    当然,如果添加--topic test,就可以只看test这个topic的分区及其副本情况。

    3、创建一个新的topic:

    kafka-topics.sh --create --topic test --partitions 3 --replication-factor 3 --zookeeper 192.168.56.100:2181

    4、修改某topic的分区数。分区数只能新增,不能减少。

    5、控制台往某topic中生产消息:

    kafka-console-producer.sh --topic test --broker-list 192.168.56.100:9092

    6、查看某topic各分区消息偏移量:

    kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --time -2 --broker-list 192.168.56.100:9092    // 查看最小偏移量

    kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --time -1 --broker-list 192.168.56.100:9092    // 查看最大偏移量

    7、控制台消费某topic:

    kafka-console-consumer.sh --topic test --bootstrap-server 192.168.56.100:9092 --from-beginning

    8、查看所有的消费组:

    kafka-consumer-groups.sh --list --bootstrap-server 192.168.56.100:9092

    9、查看某消费组的消费情况:

    kafka-consumer-groups.sh --describe --group console-consumer-44081 --bootstrap-server 192.168.56.100:9092

    其中console-consumer-44081是消费组名。

    koushengrui@koushengrui ~ % kafka-consumer-groups --describe --group my-test-consumer-group2 --bootstrap-server 127.0.0.1:9092

    GROUP                   TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID

    my-test-consumer-group2 test            0          28              30              2               consumer-1-326f898c-c3e0-4798-b28a-cf08d31fc008 /127.0.0.1      consumer-1

    my-test-consumer-group2 test            1          27              30              3               consumer-1-326f898c-c3e0-4798-b28a-cf08d31fc008 /127.0.0.1      consumer-1

    my-test-consumer-group2 test            2          27              30              3               consumer-1-326f898c-c3e0-4798-b28a-cf08d31fc008 /127.0.0.1      consumer-1

    如上,可看出每一个分区的消息偏移量及消费偏移量及对应的消费者id。

  • 相关阅读:
    Jabref安装及使用教程
    均方误差与均方根误差
    费马大定理
    JabRef设置
    申请内存时底层发生了什么?
    什么是Pipeline?
    基于三维点云场景的语义及实例分割
    华为 MateView 系列显示器新品
    C#-WinForm跨线程修改UI界面
    C# 代码生成二维码方法及代码示例(QRCoder)
  • 原文地址:https://www.cnblogs.com/koushr/p/5873429.html
Copyright © 2011-2022 走看看