zoukankan      html  css  js  c++  java
  • kafka知识体系-日常运维命令

    本文主要讲解kafka日常运维的命令,包括topic管理、性能测试脚本。

    kafka版本0.10.0,安装步骤见大数据平台搭建-kafka集群的搭建

    常用脚本

    如下所有的命令均基于KAFKA_HOME=/wls/oracle/kafka ,服务器列表如下:

    10.20.112.59
    10.20.112.64
    10.20.112.65
    10.20.116.129
    10.20.116.175
    

    创建topic

    /wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --create --topic TEST --replication-factor 2 --partitions 3  
    

    其中replication-factor后数字表示副本个数,partitions的数字表示分区个数。

    查看topic

    /wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --list
    

    更改topic配置

    配置topic级别参数时,相同(参数)属性topic级别会覆盖全局的,否则默认为全局配置属性值,即/wls/oracle/kafka/config/server.propertiestopic属性配置。
    topic创建完成后,随着项目的进展,可能存在对特定topic配置的更改,涉及到的常用更改项如下。

    单个消息比较大,需要调整broker能接收消息的最大字节数

    /wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --alter --topic TEST --config max.message.bytes=128000
    
    或者使用/wls/oracle/kafka/bin/kafka-configs.sh脚本也行
    /wls/oracle/kafka/bin/kafka-configs.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --entity-type topics --entity-name TEST --alter --add-config max.message.bytes=128000
    

    kafka扩容或者读取性能遇到瓶颈时,可能会考虑增加分区数

    /wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --alter --topic TEST --partitions 6  
    

    如果涉及到多个配置的更改,则依次用 --conf key=value并列配置即可

    /wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --alter --topic TEST --config max.message.bytes=128000  --config flush.messages=1
    

    其他可选配置

    Property Default Server Default Property note
    cleanup.policy delete log.cleanup.policy 日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
    delete.retention.ms 86400000 (24 hours) log.cleaner.delete.retention.ms 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
    flush.messages None log.flush.interval.messages log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失
    flush.ms None log.flush.interval.ms 仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.
    index.interval.bytes 4096 log.index.interval.bytes 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
    message.max.bytes 1,000,000 message.max.bytes 表示消息的最大大小,单位是字节
    min.cleanable.dirty.ratio 0.5 log.cleaner.min.cleanable.ratio 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
    retention.bytes None log.retention.bytes topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
    retention.ms None log.retention.minutes 数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据 log.retention.bytes和log.retention.minutes达到要求,都会执行删除,会被topic创建时的指定参数覆盖
    segment.bytes 1 GB log.segment.bytes topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
    segment.index.bytes 10 MB log.index.size.max.bytes 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
    log.roll.hours 7 days log.roll.hours 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖

    查看topic

    查看topic分区和副本分布情况

    /wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --describe --topic TEST
    

    查看topic配置

    /wls/oracle/kafka/bin/kafka-configs.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --entity-type topics --entity-name TEST --describe
    

    删除topic

    /wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --delete --topic TEST
    

    需要注意的是执行这段命令后控制台输出

    Topic test is marked for deletion. 
    Note: This will have no impact if delete.topic.enable is not set to true. 
    

    也就是说执行删除命令,不是真正删除,而是标记删除,实际只是在zookeeper上添加/admin/delete_topics/test节点,需要确认提前打开delete.topic.enable开关。
    如果遇到标记删除失败,可以考虑手工删除,步骤如下:
    1.登录zookeeer客户端,删除zookeeperTESTtopic节点

    /wls/oracle/zookeeper/bin/zkCli.sh
    rmr /kafka/brokers/topics/TEST
    

    其中/kafka为我们自己kafka在zookeeper的根目录,不同集群可能不太一致。

    2.删除kafka数据文件

    rm -rf /wls/oracle/bigdata/kafka/kafka-logs-*/TEST*
    

    其中/wls/oracle/bigdata/kafka/kafka-logs-*server.properties中配置的log.dirs目录,具体可参考
    大数据平台搭建-kafka集群的搭建

    发送消息

    /wls/oracle/kafka/bin/kafka-console-producer.sh --broker-list 10.20.112.59:9092,10.20.112.64:9092,10.20.112.65:9092,10.20.116.129:9092,10.20.116.175:9092 --topic TEST
    

    接收消息

    /wls/oracle/kafka/bin/kafka-console-consumer.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka  --topic TEST --from-beginning
    

    其中--from-beginning表示从头开始消费kafka队列TEST中的消息,如果没有该选项,则消费最新的消息。

    查看topic消费者offset

    /wls/oracle/kafka/bin/kafka-consumer-offset-checker.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka  --topic TEST --group mygroup
    

    性能测试

    kafka官方提供了优化参数的性能测试脚本

    生产者

    /wls/oracle/kafka/bin/kafka-producer-perf-test.sh --num-records 100000000 --record-size 10 --topic TEST  --producer-props bootstrap.servers=10.20.112.59:9092,10.20.112.64:9092,10.20.112.65:9092,10.20.116.129:9092,10.20.116.175:9092 acks=all
    

    num-records表示发送的总消息量,
    record-size表示消息的大小,
    producer-props表示生产者的配置,可以并列写多项

    消费者

    /wls/oracle/kafka/bin/kafka-consumer-perf-test.sh --messages 100000000 --topic TEST --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka  --threads 3 --num-fetch-threads 3 --compression-codec 0 --group mygroup --message-size
    

    messages表示消费的消息总量
    threads表示处理消息的线程数
    num-fetch-threads表示拉取消息的线程数
    compression-codec表示压缩方式,0-不压缩,1-GZIP,2-Snappy,3-LZ4
    此外还可以使用consumer.config指定其他配置,具体参考http://kafka.apache.org/0100/documentation.html

    以物理机(非本kafka集群)测试运行结果如下:

    start.time,end.time,data.consumed.in.MB,MB.sec,data.consumed.in.nMsg,nMsg.sec
    2017-06-14 17:55:42:312,2017-06-14 17:59:26:754,216230.5107,963.4138,30000000,133664.8221
    

    本文详细讲述了kafka日常运维的命令,包括topic管理、性能测试,一一记录,以免忘记。

    本文参考:
    http://kafka.apache.org/0100/documentation.html


    关于作者
    爱编程、爱钻研、爱分享、爱生活
    关注分布式、高并发、数据挖掘
    如需捐赠,请扫码

  • 相关阅读:
    机器分配
    搭建免费私有音乐云
    ngnix相关
    idea常用插件
    notepad++ 实用插件
    liunx 新建自启服务
    scala 语法特性小计
    spring boot 静态资源 访问 配置
    SVN-Unable to create pristine install stream
    idea 编译 错误 Error:java: Compilation failed: internal java compiler error 解决方案
  • 原文地址:https://www.cnblogs.com/aidodoo/p/9264961.html
Copyright © 2011-2022 走看看