本文主要讲解kafka日常运维的命令,包括topic管理、性能测试脚本。
kafka版本0.10.0,安装步骤见大数据平台搭建-kafka集群的搭建
常用脚本
如下所有的命令均基于KAFKA_HOME=/wls/oracle/kafka
,服务器列表如下:
10.20.112.59
10.20.112.64
10.20.112.65
10.20.116.129
10.20.116.175
创建topic
/wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --create --topic TEST --replication-factor 2 --partitions 3
其中replication-factor
后数字表示副本个数,partitions
的数字表示分区个数。
查看topic
/wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --list
更改topic配置
配置topic
级别参数时,相同(参数)属性topic
级别会覆盖全局的,否则默认为全局配置属性值,即/wls/oracle/kafka/config/server.properties
中topic
属性配置。
topic
创建完成后,随着项目的进展,可能存在对特定topic
配置的更改,涉及到的常用更改项如下。
单个消息比较大,需要调整broker能接收消息的最大字节数
/wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --alter --topic TEST --config max.message.bytes=128000
或者使用/wls/oracle/kafka/bin/kafka-configs.sh脚本也行
/wls/oracle/kafka/bin/kafka-configs.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --entity-type topics --entity-name TEST --alter --add-config max.message.bytes=128000
kafka扩容或者读取性能遇到瓶颈时,可能会考虑增加分区数
/wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --alter --topic TEST --partitions 6
如果涉及到多个配置的更改,则依次用 --conf key=value
并列配置即可
/wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --alter --topic TEST --config max.message.bytes=128000 --config flush.messages=1
其他可选配置
Property | Default | Server Default Property | note |
cleanup.policy | delete | log.cleanup.policy | 日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖 |
delete.retention.ms | 86400000 (24 hours) | log.cleaner.delete.retention.ms | 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖 |
flush.messages | None | log.flush.interval.messages | log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失 |
flush.ms | None | log.flush.interval.ms | 仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发. |
index.interval.bytes | 4096 | log.index.interval.bytes | 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数 |
message.max.bytes | 1,000,000 | message.max.bytes | 表示消息的最大大小,单位是字节 |
min.cleanable.dirty.ratio | 0.5 | log.cleaner.min.cleanable.ratio | 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖 |
retention.bytes | None | log.retention.bytes | topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖 |
retention.ms | None | log.retention.minutes | 数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据 log.retention.bytes和log.retention.minutes达到要求,都会执行删除,会被topic创建时的指定参数覆盖 |
segment.bytes | 1 GB | log.segment.bytes | topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖 |
segment.index.bytes | 10 MB | log.index.size.max.bytes | 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖 |
log.roll.hours | 7 days | log.roll.hours | 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖 |
查看topic
查看topic
分区和副本分布情况
/wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --describe --topic TEST
查看topic配置
/wls/oracle/kafka/bin/kafka-configs.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --entity-type topics --entity-name TEST --describe
删除topic
/wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --delete --topic TEST
需要注意的是执行这段命令后控制台输出
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
也就是说执行删除命令,不是真正删除,而是标记删除,实际只是在zookeeper
上添加/admin/delete_topics/test
节点,需要确认提前打开delete.topic.enable
开关。
如果遇到标记删除失败,可以考虑手工删除,步骤如下:
1.登录zookeeer
客户端,删除zookeeper
上TEST
的topic
节点
/wls/oracle/zookeeper/bin/zkCli.sh
rmr /kafka/brokers/topics/TEST
其中/kafka
为我们自己kafka在zookeeper的根目录,不同集群可能不太一致。
2.删除kafka
数据文件
rm -rf /wls/oracle/bigdata/kafka/kafka-logs-*/TEST*
其中/wls/oracle/bigdata/kafka/kafka-logs-*
为server.properties
中配置的log.dirs
目录,具体可参考
大数据平台搭建-kafka集群的搭建
发送消息
/wls/oracle/kafka/bin/kafka-console-producer.sh --broker-list 10.20.112.59:9092,10.20.112.64:9092,10.20.112.65:9092,10.20.116.129:9092,10.20.116.175:9092 --topic TEST
接收消息
/wls/oracle/kafka/bin/kafka-console-consumer.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --topic TEST --from-beginning
其中--from-beginning
表示从头开始消费kafka
队列TEST
中的消息,如果没有该选项,则消费最新的消息。
查看topic消费者offset
/wls/oracle/kafka/bin/kafka-consumer-offset-checker.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --topic TEST --group mygroup
性能测试
kafka
官方提供了优化参数的性能测试脚本
生产者
/wls/oracle/kafka/bin/kafka-producer-perf-test.sh --num-records 100000000 --record-size 10 --topic TEST --producer-props bootstrap.servers=10.20.112.59:9092,10.20.112.64:9092,10.20.112.65:9092,10.20.116.129:9092,10.20.116.175:9092 acks=all
num-records
表示发送的总消息量,
record-size
表示消息的大小,
producer-props
表示生产者的配置,可以并列写多项
消费者
/wls/oracle/kafka/bin/kafka-consumer-perf-test.sh --messages 100000000 --topic TEST --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --threads 3 --num-fetch-threads 3 --compression-codec 0 --group mygroup --message-size
messages
表示消费的消息总量
threads
表示处理消息的线程数
num-fetch-threads
表示拉取消息的线程数
compression-codec
表示压缩方式,0-不压缩,1-GZIP
,2-Snappy
,3-LZ4
此外还可以使用consumer.config
指定其他配置,具体参考http://kafka.apache.org/0100/documentation.html
以物理机(非本kafka集群)测试运行结果如下:
start.time,end.time,data.consumed.in.MB,MB.sec,data.consumed.in.nMsg,nMsg.sec
2017-06-14 17:55:42:312,2017-06-14 17:59:26:754,216230.5107,963.4138,30000000,133664.8221
本文详细讲述了kafka日常运维的命令,包括topic管理、性能测试,一一记录,以免忘记。
本文参考:
http://kafka.apache.org/0100/documentation.html
关于作者
爱编程、爱钻研、爱分享、爱生活
关注分布式、高并发、数据挖掘
如需捐赠,请扫码