部署配置
1.准备部署包(自行下载)
vim conf/zoo.cfg
dataDir=/data/vfan/zk/data/
dataLogDir=/data/vfan/zk/logs/
startLogDir=/data/vfan/zk/logs/
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=10.61.194.34:2801:3801
server.2=10.61.199.15:2802:3802
server.3=10.61.202.16:2803:3803
# server.A=B:C:D 其中A是一个数字,代表这是第几号服务器;B是服务器的IP地址;C表示服务器与群集中的“领导者”交换信息的端口;当领导者失效后,D表示用来执行选举时服务器相互通信的端口
snapCount=20
autopurge.snapRetainCount =3
autopurge.purgeInterval =1
zk集群配置如上,如果是单台,从server.1开始往下都注释即可
启动zk
bin/zkServer.sh start
## ps 检查进程
2.配置kafka
vim kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://10.153.204.28:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/vfan/kfk/logs/
# 当topic不存在系统自动创建时的分区数
num.partitions=3
# 当topic不存在系统自动创建时的副本数
default.replication.factor=3
# offset topic的replicas数量
offsets.topic.replication.factor=3
# 每个数据目录的线程数,用于启动时的日志恢复和关闭时的刷新
num.recovery.threads.per.data.dir=1
# 事务主题的复制因子
transaction.state.log.replication.factor=3
# 覆盖事务主题的min.insync.replicas配置
transaction.state.log.min.isr=3
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.61.194.34:2181,10.61.199.15:2181,10.61.202.16
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
集群模式时,修改每个配置文件的 broker.id listeners 即可,zookeeper.connect若为单机就写一个
启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
## ss -tnlp|grep 9092 检查端口
常用命令总结
topic相关
## 查看所有topic
./kafka-topics.sh --zookeeper localhost:2181 --list
## 查看所有topic详情(副本、分区、ISR等)
./kafka-topics.sh --zookeeper localhost:2181 --describe
## 查看某个topic详情
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
## 创建topic,3副本 3分区
./kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3
## 调整分区数量
./kafka-topics.sh --alter --zookeeper localhost:2181 --topic test --partitions 3
## 删除topic,需要将参数设置为delete.topic.enable=true,如果还是删不了则删除kafka中的所有分区log,及通过zk客户端删除
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
## 查看topic各个分区的消息数量
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic test
模拟kafka生产消费
## 生产
./kafka-console-producer.sh --broker-list 10.153.204.28:9092 --topic test
## 消费,--from-beginning参数表示从头开始
./kafka-console-consumer.sh --bootstrap-server 10.153.204.28:9092 --topic test --from-beginning
此处需要注意,生产者和测试者指定的broker必须和配置文件中zookeeper.connect和listeners中的地址一至,如写localhost生产者会类似如下信息:
WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
消费者会报错类似错误:
WARN [Consumer clientId=consumer-1, groupId=console-consumer-8350] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
消费者相关
## 显示所有消费者
./kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
## 获取某消费者消费某个topic的offset
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer
## 调整消费者对某个topic的offset,发生阻塞等情况时可使用
.kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupName --reset-offsets --to-offset 1000 --topic topicName --execute
调整默认分区副本数
## 配置文件中指定默认分区 副本数
num.partitions=3 ;当topic不存在系统自动创建时的分区数
default.replication.factor=3 ;当topic不存在系统自动创建时的副本数
offsets.topic.replication.factor=3 ;表示kafka的内部topic consumer_offsets副本数,默认为1
调整topic分区副本数
目前 guoqing 的topic副本和分区都为1
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic guoqing
Topic:guoqing PartitionCount:1 ReplicationFactor:1 Configs:
Topic: guoqing Partition: 0 Leader: 1 Replicas: 1 Isr: 1
将分区数调整为3
## 扩容
./kafka-topics.sh --alter --zookeeper localhost:2181 --topic guoqing --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
## 检查
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic guoqing
Topic:guoqing PartitionCount:3 ReplicationFactor:1 Configs:
Topic: guoqing Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: guoqing Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: guoqing Partition: 2 Leader: 3 Replicas: 3 Isr: 3
注意:分区数只能增加,不能减少
将副本数调整为3,首先准备json文件,格式如下:
vim guoqing.json
{
"version": 1,
"partitions": [
{
"topic": "guoqing",
"partition": 0,
"replicas": [
1,
2,
3
]
},
{
"topic": "guoqing",
"partition": 1,
"replicas": [
2,
1,
3
]
},
{
"topic": "guoqing",
"partition": 2,
"replicas": [
3,
2,
1
]
}
]
}
执行调整命令
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file /tmp/guoqing.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"guoqing","partition":0,"replicas":[1],"log_dirs":["any"]},{"topic":"guoqing","partition":2,"replicas":[3],"log_dirs":["any"]},{"topic":"guoqing","partition":1,"replicas":[2],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
检查调整进度
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file /tmp/guoqing.json --verify
Status of partition reassignment:
Reassignment of partition guoqing-0 completed successfully
Reassignment of partition guoqing-1 completed successfully
Reassignment of partition guoqing-2 completed successfully
检查调整后的状态
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic guoqing
Topic:guoqing PartitionCount:3 ReplicationFactor:3 Configs:
Topic: guoqing Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: guoqing Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: guoqing Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1