使用场景:
某些时候,当几个topic生产者突发产生大量消息时,会造成磁盘空间紧张,这时,除了增加磁盘,另一个方法就是修改配置文件,将日志的保存时间修改小一点,但这两种方案,都必须停机和重启kafka,显然,这在生产集群上,是不能这么处理的。这里,可以通过在线修改单个topic的配置,以覆盖默认配置,临时解决磁盘空间紧张的问题。
优点:在线修改,不需要重启和停机
修改后,新的配置会在 log.retention.check.interval.ms 时间内被检查并应用到整个集群,该值在 kafka/config/server.properties 中配置,默认为 300 秒
注意,修改前日志保存时长,必然会清除掉超过这个时长的旧数据,在生产环境中,这需要和业务方共同评估和确认
下面以修改名为 my_test_topic 的 topic 为例
#1,查看当前topic配置
./kafka-topics.sh --describe --topic my_test_topic --zookeeper test.myzk.com:2181/kafkacluster
#2,调整topic配置
./kafka-topics.sh --topic my_test_topic --zookeeper test.myzk.com:2181/kafkacluster --alter --config retention.ms=43200000
# 时长毫秒 43200000ms=12h
#3,检查修改的配置是否生效
同第一步,查看输出的第一行,类似如下:
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,delete.retention.ms=86400000,retention.ms=43200000,cleanup.policy=delete,compression.type=producer
其他可选的调整参数:
delete.retention.ms=86400000 #对于压缩日志保留的最长时间,也是客户端消费消息的最长时间,与retention.ms的区别在于一个控制未压缩数据,一个控制压缩后的数据
retention.ms=86400000 #如果使用“delete”的retention策略,这项配置就是指删除日志前日志保存的时间
cleanup.policy=delete #默认方式 delete 将会丢弃旧的部分 compact 将会进行日志压缩
compression.type=producer #压缩类型,此配置接受标准压缩编码 gzip, snappy, lz4 ,另外接受 uncompressed 相当于不压缩, producer 意味着压缩类型由producer指定
./zookeeper-shell.sh test.myzk.com:2181/kafkacluster #查看zk中kafka集群信息
另外,需要注意的是:
kafka 0.10+ 之后的版本,有个 __consumer_offsets 的topic也是需要清理的,需要定期注意该topic占用空间情况
生产环境kafka内核优化参数
vm.min_free_kbytes=4194304 即4G 系统16C32G内存
cat /proc/sys/vm/min_free_kbytes
该值表示强制Linux VM最低保留多少空闲内存 单位Kbytes
当可用内存低于该参数时,系统开始回收cache内存,以释放内存,直到可用内存大于该值
目的:让系统更加积极的回收cache内存
vm.zone_reclaim_mode=1
cat /proc/sys/vm/zone_reclaim_mode
管理当一个内存区域zone内部的内存耗尽时,是从其内部进行内存回收还是可以从其他zone进行回收
0 关闭zone_reclaim模式,允许从其他zone或NUMA节点回收内存[默认]
1 打开zone_reclaim模式,这样内存回收只会发生在本地节点内
2 在本地回收内存时,可以将cache中的脏数据写回硬盘,以回收内存
4 可以用swap方式回收内存
目的:限制内存回收不跨zone
清空cache (可选)
echo 1 > /proc/sys/vm/drop_caches
生产集群参数参考
zookeeper
zoo.cfg 配置文件
1
2
3
4
5
6
7
8
9
10
11
|
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/server/zkdata
clientPort=2181
maxClientCnxns=600
autopurge.snapRetainCount=60
autopurge.purgeInterval=24
server.1=192.168.1.100:2888:3888
server.2=yyyy:2888:3888
server.3=zzzz:2888:3888
|
echo "1">/data/server/zkdata/myid
./zkServer.sh status
./zkCleanup.sh /data/server/zookeeper/data -n 100
./zkCleanup.sh 参数1 -n 参数2
参数1,zk data目录,即zoo.cfg文件中dataDir值
参数2,保存最近的多少个快照
kafka
server.properties 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
broker.id=0
listeners=PLAINTEXT://192.168.1.100:9092
port=9092
host.name=192.168.1.100
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1024000
socket.receive.buffer.bytes=1024000
socket.request.max.bytes=104857600
log.dirs=/data/server/kafkadata
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=72
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.100:2181,yyyy:2181,zzzz:2181/kafkacluster
zookeeper.connection.timeout.ms=30000
default.replication.factor=3
delete.topic.enable=true
auto.create.topics.enable=true
|
在kafka启动脚本中,需要添加JMX的支持,方便在kafkamanager中查看到更加丰富的数据
kafka-server-start.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
#!/bin/bash
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
export JMX_PORT="8999"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Djava.rmi.server.hostname=192.168.1.100
-Djava.net.preferIPv4Stack=true"
fi
EXTRA_ARGS="-name kafkaServer -loggc"
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
|
使用supervisor管理zookeeper和kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
[program:zookeeper]
directory=/data/server/zookeeper
command= sh bin/zkServer.sh start-foreground
numprocs=1
user=kafka
autostart=true
autorestart=true
stdout_logfile=/data/server/zookeeper/logs/zookeeper.log
redirect_stderr=true
priority=5
[program:kafka]
directory=/data/server/kafka
command=/bin/bash bin/kafka-server-start.sh config/server.properties
numprocs=1
user=kafka
autostart=true
autorestart=true
stdout_logfile=/data/server/kafka/logs/kafka.log
redirect_stderr=true
stdout_logfile_maxbytes=1GB
priority=6
|
kafka-manager
项目地址:https://github.com/yahoo/kafka-manager
让一般用户免密登录,且只有查看权限:
修改conf/application.conf
1
2
3
4
5
6
|
application.features=[""]
#application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]
...
basicAuthentication.enabled=false
#basicAuthentication.enabled=true
#basicAuthentication.enabled=${?KAFKA_MANAGER_AUTH_ENABLED}
|
topic 操作
Delete Topic 删除 topic
Reassign Partitions 平衡集群负载
Add Partitions 增加分区
Update Config Topic 配置信息更新
Manual Partition Assignments 手动为每个分区下的副本分配 broker
Generate Partition Assignments 系统自动为每个分区下的副本分配 broker
一般而言,手动调整、系统自动分配分区和添加分区之后,都需要调用 Reassign Partition
1
2
3
4
5
6
7
8
9
10
11
|
[program:kafka-manager]
directory=/data/server/kafka-manager
command=sh bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000
numprocs=1
user=kafka
autostart=true
autorestart=true
stdout_logfile=/data/server/kafka-manager/logs/kafka-manager.log
redirect_stderr=true
stdout_logfile_maxbytes=1GB
priority=6
|
转载请注明:轻风博客 » Kafka在线修改topic日志保存时长(不停机,不重启)