集群搭建:
[root@localhost kafka_2.11-0.10.2.1]# cat config/server.properties | grep -v ^$ | grep -v ^#
broker.id=0
listeners=PLAINTEXT://node1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
listeners=PLAINTEXT://node1:9092, 配置物理机器的hostname, 一定要是hostname, 每个节点单独配置自己的
脚本启动不管用的, 把环境变量配置在 ~/.bashrc下, 因为ssh分为登陆和非登陆, 读取配置文件的顺序不同
也可以使用如下配置, 公司一个大牛做的
broker.id=2 delete.topic.enable=true port=9092 advertised.host.name=10.183.222.194 num.network.threads=6 num.io.threads=6 message.max.bytes=10485760 log.index.size.max.bytes=104857600 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 replica.fetch.max.bytes=104857600 log.dirs=/data/hadoop/data1/kafka/log,/data/hadoop/data2/kafka/log,/data/hadoop/data3/kafka/log,/data/hadoop/data4/kafka/log,/data/hadoop/data5/kafka/log,/data/hadoop/data6/kafka/log num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=48 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 replica.socket.receive.buffer.bytes=1048576 num.replica.fetchers=6 replica.lag.max.messages=100000 zookeeper.connect=10.183.222.192:2181,10.183.222.193:2181,10.183.222.194:2181/rdp_test_kafka zookeeper.connection.timeout.ms=15000 auto.create.topics.enable=true auto.leader.rebalance.enable=true compression.type=gzip
配置zookeeper的路径, 以便在同一个zookeeper下进行区分, 方便管理
如果配置 了路径, 那么创建topic之类的 时候, 需要指定路径
/opt/install/kafka_2.13-2.4.1/bin/kafka-topics.sh --zookeeper 10.144.91.9:2181,10.144.91.10:2181,10.144.91.11:2181/cdn_kafka --create --topic test1 --partitions 3 --replication-factor 2
启动脚本:
#!/bin/bash brokers="node1 node2 node3" kafka_home="/usr/local/kafka_2.11-0.10.2.1" for i in $brokers do echo "Starting kafka on ${i} ... " ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-start.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &" if [[ $? -ne 0 ]]; then echo "Start kafka on ${i} is OK !" fi done echo kafka kafka are started !
停止脚本:
#!/bin/bash brokers="node1 node2 node3" kafka_home="/usr/local/kafka_2.11-0.10.2.1" for i in $brokers do echo "Stopping kafka on ${i} ..." ssh ${i} "source /etc/profile;bash ${kafka_home}/bin/kafka-server-stop.sh" if [[ $? -ne 0 ]]; then echo "Stopping ${kafka_home} on ${i} is down" fi done echo all kafka are stopped ! exit 0
脚本启动不管用的, 把环境变量配置在 ~/.bashrc下, 因为ssh分为登陆和非登陆, 读取配置文件的顺序不同
整合启动脚本
#!/bin/bash #set -x brokers="node1 node2 node3 node4" kafka_home="/usr/local/kafka_2.11-0.10.2.1" start() { for i in $brokers do echo "Starting kafka on ${i} ... " ssh root@$i "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-start.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &" if [[ $? -ne 0 ]]; then echo "Start kafka on ${i} is OK !" fi done echo kafka kafka are started ! } stop() { for i in $brokers do echo "Stopping kafka on ${i} ..." ssh root@$i "source /etc/profile;bash ${kafka_home}/bin/kafka-server-stop.sh" if [[ $? -ne 0 ]]; then echo "Stopping ${kafka_home} on ${i} is down" fi done echo all kafka are stopped ! } case "$1" in start) start ;; stop) stop ;; *) echo "Usage: start|stop" ;; esac