1、安装+配置(集群)
192.168.0.10、192.168.0.11、192.168.0.12(每台服务器kafka+zookeeper)
# kafka依赖java环境,需要提前安装好jdk。下面使用自己安装的zookeeper(也可以使用kafka自带的zk)
cd /data/server tar -zxv -f kafka_2.10-0.9.0.0.tgz ln -s kafka_2.10-0.9.0.0 kafka
server.propeties
broker.id=0 # broker唯一标识,依次累加 delete.topic.enable=true listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://10.3.33.162:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/logs/kafka-logs # 存放日志和消息的目录 num.partitions=1 # 主题默认的分区数 num.recovery.threads.per.data.dir=1 log.retention.hours=168 # 日志的过期时间,超过会被删除 log.segment.bytes=1073741824 # 日志文件的最大体积,超过会新建 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.0.10:2181,192.168.0.11:2181,192.168.0.12:2181 # zk的连接配置 zookeeper.connection.timeout.ms=6000
consumer.properties
zookeeper.connect=192.168.0.10:2181,192.168.0.11:2181,192.168.0.12:2181 zookeeper.connection.timeout.ms=6000
producer.properties
bootstrap.servers=192.168.0.10:9092,192.168.0.11:9092,192.168.0.12:9092 compression.type=none
启动(需要先启动zk)
/data/server/kafka/bin/kafka-server-start.sh -daemon /data/server/kafka/config/server.properties
停止
/data/server/kafka/bin/kafka-server-stop.sh
ps: 如果使用kafka自带的zk,zk的配置文件为zookeeper.properties,启动脚本为./bin/zookeeper-server-start.sh
2、查看主题以及主题信息
查看所有的主题
./kafka-topics.sh --zookeeper zk_host:port --list
查看主题描述
./kafka-topics.sh --zookeeper zk_host:port --describe --topic my_topic_name
3、查看消费者
consumers that use the Java consumer API
./kafka-consumer-groups.sh --new-consumer --bootstrap-server kf_host:port --list
consumers that use ZooKeeper
./kafka-consumer-groups.sh --zookeeper zk_host:port --list
4、查看消费者信息和offset lag
consumers that use the Java consumer API
./kafka-consumer-groups.sh --new-consumer --bootstrap-server kf_host:port --group test_group --describe
consumers that use ZooKeeper
./kafka-consumer-groups.sh --zookeeper zk_host:port --group test-group --describe
5、重新分配主题分区个数
每个主题默认一个分区
num.partitions=1
example: 主题my_topic_name分区扩展到3个
kafka-topics.sh --zookeeper zk_host:port --alter --topic my_topic_name --partitions 3