zoukankan      html  css  js  c++  java
  • Kafka——常用命令

    一、启动Kafka

    后台常驻方式,带上参数 -daemon,如:

    bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

    nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &

    指定 JMX port 端口启动,指定 jmx,可以方便监控 Kafka 集群

    JMX_PORT=9991 bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

    二、Topic管理 

    1. 创建Topic

    # bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic ops_coffee --partitions 3 --replication-factor 2
    Created topic "ops_coffee".

    --partitions: 指定分区数,如果不指定默认会使用配置文件中 num.partitions配置的数量

    2. 增加分区数

    # bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic ops_coffee --partitions 5
    Adding partitions succeeded!

    注意:partition的数量只能增加不能减少

    3. 查看Topic列表

    kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

    4. 查看Topic信息

    # bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic ops_coffee
    Topic:ops_coffee    PartitionCount:3    ReplicationFactor:2 Configs:
        Topic: ops_coffee   Partition: 0    Leader: 1   Replicas: 1,2   Isr: 1,2
        Topic: ops_coffee   Partition: 1    Leader: 2   Replicas: 2,3   Isr: 2,3
        Topic: ops_coffee   Partition: 2    Leader: 3   Replicas: 3,1   Isr: 3,1

    5. 删除Topic

    kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic test

    6. 查看Topic消费进度

    kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.16.10.91:9092,172.16.10.92:9092,172.16.10.93:9092 --topic test --time -1

    time参数说明:

    • -1:表示查询test各个分区当前最大的消息位移
    • -2:表示查询test各个分区的最小位移

    三、Consumer管理

    1. 查看Group列表

    bin/kafka-consumer-groups.sh --bootstrap-server 192.168.88.108:9092 --list

    2. 查看Group消费情况

    bin/kafka-consumer-groups.sh --bootstrap-server 172.16.10.91:9092 --group logstash --describe

    • CURRENT-OFFSET:当前消费偏移量
    • LOG-END-OFFSET:末尾偏移量
    • LAG:为未消费的记录,如果有很多,说明消费延迟很严重

    3. 删除Group 中 Topic

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --topic test --delete

    4. 删除Group

    bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test-1 --delete

    5. 重置offset

    1、要求修改的group不能active,查看是否active

    [root@izj6c46svwddzpu0evy0vbz kafka_2.11-2.0.1]# bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --describe
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    Consumer group 'test_4' has no active members.
    
    TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    consumer-send   0          5697            5697            0               -               -               -
    producer-syn    0          4125            4125            0               -               -               -

    2、重置命令

    bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --reset-offsets -to-offset 100 --topic consumer-send --execute

    3、导出offset

    bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --reset-offsets -to-offset 100 --topic consumer-send --export > 1.txt

    四、生产消费者

    1. 创建生成者

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    2. 创建消费者

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic middleware --from-beginning

    五、动态配置

    1. 平衡Leader

    bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092

    六、压力测试

    1. 生产者

    kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 100000 --throughput 1000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

    参数说明:

    • record-size:是一条信息有多大,单位是字节
    • num-records:是总共发送多少条信息
    • throughput :是每秒多少条信息

    生产压力测试过程如下:

    5003 records sent, 1000.2 records/sec (0.10 MB/sec), 0.8 ms avg latency, 22.0 max latency.
    5002 records sent, 1000.2 records/sec (0.10 MB/sec), 0.6 ms avg latency, 7.0 max latency.
    5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.7 ms avg latency, 31.0 max latency.
    5002 records sent, 1000.0 records/sec (0.10 MB/sec), 0.7 ms avg latency, 15.0 max latency.
    5003 records sent, 1000.6 records/sec (0.10 MB/sec), 0.8 ms avg latency, 15.0 max latency.
    5002 records sent, 1000.4 records/sec (0.10 MB/sec), 0.8 ms avg latency, 14.0 max latency.
    5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.6 ms avg latency, 15.0 max latency.
    5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.8 ms avg latency, 18.0 max latency.
    5003 records sent, 1000.4 records/sec (0.10 MB/sec), 0.8 ms avg latency, 13.0 max latency.
    5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.9 ms avg latency, 31.0 max latency.
    100000 records sent, 999.970001 records/sec (0.10 MB/sec), 0.94 ms avg latency, 165.00 ms max latency, 1 ms 50th, 2 ms 95th, 7 ms 99th, 42 ms 99.9th.

    测试结果说明:

    本例中一共写入10万条消息,平均是999.970001条消息/秒,每秒向Kafka写入了0.10MB的数据,每次写入的平均延迟为0.94毫秒,最大的延迟为165毫秒

    2. 消费者

    kafka-consumer-perf-test.sh --zookeeper hadoop111:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1

    参数说明:

    • --zookeeper :指定zookeeper的链接信息,集群中任意一台kafka服务器的地址和端口号
    • --topic :指定topic的名称
    • --fetch-size :指定每次拉取的数据的大小
    • --messages :总共要消费的消息个数

    消费压力测试过程如下:

    start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
    2020-05-15 17:41:57:339, 2020-05-15 17:41:59:243, 9.5367, 5.0088, 100000, 52521.0084

    测试结果说明:

    开始测试时间,测试结束数据,最大吞吐率9.5367MB/s,平均每秒消费5.0088MB/s,最大每秒消费100000条,平均每秒消费52521.0084条。

  • 相关阅读:
    《Mysql
    《Redis
    《pt-query-digest 剖析 Mysql 慢日志》
    《Redis
    《Redis
    《Redis
    《Redis
    《Redis
    python中__new__()与__init__()的区别
    Python常见综合面试题
  • 原文地址:https://www.cnblogs.com/caoweixiong/p/12690685.html
Copyright © 2011-2022 走看看