zoukankan      html  css  js  c++  java
  • Kafka常用命令

    Kafka常用命令

      日常进程kafka运维操作时,除了借助kafka-manager图形工具查看kafka信息时,我们还可以通过kafka提供的命令工具来进行kafka的操作与查看。

    1、查看所有topic

    ./kafka-topics.sh --zookeeper=10.111.30.3:2181,10.111.30.10:2181,10.111.30.5:2181  --list

    2、删除某个topic

    ./kafka-topics.sh --zookeeper=10.111.30.3:2181,10.111.30.10:2181,10.111.30.5:2181 --topic test  --delete

    3、命令行消费某个topic的消息

    ./kafka-console-consumer.sh --bootstrap-server=10.111.30.3:9092 --topic=loc --from-beginning  #从头开始消费

    4、kafka重置分组已经消费的偏移量offest

    ./kafka-consumer-groups.sh --bootstrap-server=10.111.30.10:9092,10.111.30.5:9092,10.111.30.3:9092 --execute --reset-offsets --topic=loc_topic1 --group=testPlatform-local-pha  --to-earliest

    5、命令行创建主题,指定该主题的定时清理时间

    ./kafka-topics.sh --create --zookeeper localhost:2181 --topic test  --partitions 1 --replication-factor 1 --config retention.ms=172800000

    6、查看topic属性

    ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic access

    7、查看分组所属topic的消费情况

    ./kafka-consumer-groups.sh --bootstrap-server=10.2.48.31:9092,10.2.32.26:9092,10.2.16.25:9092  --group=testPlatform --describe

    8、topic增加分区

    ./kafka-topics.sh --alter --zookeeper 192.168.119.131:2181 --topic yqtopic1 --partitions 12   #分区数增加到12个,会引起rebalance操作

    9、创建主题(必须指定分区和副本,否则会报错)

    ./kafka-topics.sh --zookeeper=10.111.30.3:2181,10.111.30.10:2181,10.111.30.5:2181 --create --topic TEST --partitions 1 -replication-factor 1

    10、指定topic创建消费者分组

    ./kafka-console-consumer.sh  --bootstrap-server=10.111.30.3:9092,10.111.30.10:9092,10.111.30.5:9092 --topic TEST --consumer-property group.id=test1

    11、将某个consumer group 的某个topic或所有topic重置到某个时间点

    相关格式:

    --to-current                            Reset offsets to current offset.       
    --to-datetime <String: datetime>        Reset offsets to offset from datetime. 
                                              Format: 'YYYY-MM-DDTHH:mm:SS.sss'      2020-07-01T12:00:00.000
    --to-earliest                           Reset offsets to earliest offset.      
    --to-latest                             Reset offsets to latest offset.        
    --to-offset <Long: offset>              Reset offsets to a specific offset.    
    --topic <String: topic>                 The topic whose consumer group  

    12、时间点重置

    ./kafka-consumer-groups.sh --bootstrap-server=192.168.2.4:9092,192.168.2.3:9092,192.168.2.2:9092 --group=gateway-calc-trip --topic=loc --execute --reset-offsets --to-datetime 2020-07-01T12:00:00.000   
    #此处时间为UTC时间,需修改为提前8个小时,修改完成可命令行查看重置后的offset,然后指定这个offset及分区开始消费,查看当前消息时间,验证重置时间点是否正确

    13、offset重置

    ./kafka-consumer-groups.sh  --bootstrap-server=10.111.30.3:9092,10.111.30.10:9092,10.111.30.5:9092 --topic=locationUp --group=jt809-down-xg --execute --reset-offsets --to-offset 359905139

    14、offset重置(所有topic)

    ./kafka-consumer-groups.sh --bootstrap-server 10.111.30.4:9092,10.111.30.8:9092,10.111.30.9:9092 --group testPlatform2 --reset-offsets --all-topics --to-latest  --execute

    15、kafka指定offset与partition导出消息

    ./kafka-console-consumer.sh --bootstrap-server 10.111.30.4:9092 --topic topic-upstreammsg-statistic  --offset 825000 --partition 0 >> log.log

    16、修改主题级别的参数

    bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760

    17、查看topic消息总数

    $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -2 --topic test-topic
    test-topic:0:0
    test-topic:1:0
    $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -1 --topic test-topic
    
    test-topic:0:5500000
    test-topic:1:5500000
    
    -1 和 -2 分别表示获取某分区最新的位移和最早的位移,这两个位移值的差值就是这个分区当前的消息数,在这个例子中,差值是 500 万条

    18、测试生产者性能脚本kafka-producer-perf-test.sh

    $ bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4
    2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.
    4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.
    10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.
    1. 向指定主题发送了 1 千万条消息,每条消息大小是 1KB。该命令允许你在 producer-props 后面指定要设置的生产者参数,比如本例中的压缩算法、延时时间等。
    2. 该命令的输出值得好好说一下。它会打印出测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时。一般情况下,消息延时不是一个简单的数字,而是一组分布
    3. 在上面的输出中,99th 值是 604ms,这表明测试生产者生产的消息中,有 99% 消息的延时都在 604ms 以内。你完全可以把这个数据当作这个生产者对外承诺的 SLA。

    19、测试消费者性能脚本kafka-consumer-perf-test.sh

    $ bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic
    start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
    2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012

      虽然输出格式有所差别,但该脚本也会打印出消费者的吞吐量数据。比如本例中的 1723MB/s。有点令人遗憾的是,它没有计算不同分位数下的分布情况。因此,在实际使用过程中,这个脚本的使用率要比生产者性能测试脚本的使用率低。

  • 相关阅读:
    python argparse sys.argv
    python __all__
    一些方便系统诊断的bash函数
    yarn集群客户端gateway常用限制
    xcrun: error: invalid active developer path (/Library/Developer/CommandLineTools), missing xcrun
    Hadoop fs -put bandwidth 暴力版
    PYTHON SOCKET编程简介
    java实现点选汉字验证码(转)
    springboot配置log4j
    vue文字跑马灯效果
  • 原文地址:https://www.cnblogs.com/wushaoyu/p/11486551.html
Copyright © 2011-2022 走看看