参考:
笔记
原理:
https://www.cnblogs.com/yinzhengjie/p/9780976.html
部署:
www.cnblogs.com/yinzhengjie/p/9209319.html
官网;
kafka的详细参数看官网,找到相应版本.
kafka.apache.org 点documentation
查kafka版本:
/data1/kafka/libs
#注意,生产环境中要把kafka的文件放到空间大的目录下,如/data
wget https://archive.apache.org/dist/kafka/2.3.1/kafka_2.11-2.3.1.tgz
tar -zxf kafka_2.11-2.3.1.tgz -C /hongfeng/software/
vim /etc/profile
KAFKA_HOME=/hongfeng/software/kafka_2.11-2.3.1
PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
vim /data/kafka_2.11-2.3.1/bin/kafka-server-start.sh
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
#默认的KAFKA的HEAP内存为1G,在实际生产环境中显然是不够的,在《kafka权威指南》书中说是配置5G,在《Apache Kafka实战》书中说配置6G,其实差距并不是很大,我们这里暂且配置6G吧,当时书中的知识是死的,如果Kafka配置了6G的Heap内存严重发现Full GC的话,到时候我们应该学会变通,将其在扩大,但在实际生产环境中,我就是这样配置的。注意,这样配置如果你的虚拟机可用内存如果不足6G可能会直接抛出OOM异常哟~
export KAFKA_HEAP_OPTS="-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
export JMX_PORT="9999"
fi
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
##从这行命令不难看出,该脚本会调用kafka-run-class.sh,如果我们在该配置文件中配置HEAP内存,就不要在Kafka-run-class.sh脚本里再去配置了哟,否则当前脚本配置的HEAP将无效!
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
分发/etc/profile和kafka应用程序到其它节点
scp /etc/profile 10.10.77.8:/etc/profile
scp /etc/profile 10.10.24.86:/etc/profile
scp -r kafka_2.11-2.3.1 10.10.77.8:/hongfeng/software/
scp -r kafka_2.11-2.3.1 10.10.24.86:/hongfeng/software/
修改kafka的配置文件(server.properties)
#各个节点都要改,不同的地方
vim /data/kafka_2.11-2.3.1/config/server.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=101 #各个节点id要不一样
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://10.10.24.86:9092 #改成本节点的ip
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://10.10.24.86:9092 #改成本节点的ip
# Hostname and port the broker will advertise to producers and consumers. If not set,
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=30 #处理网络请求的最大线程数
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=30 #处理磁盘I/O的线程数
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
num.partitions=20 #每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
# The interval at which log segments are checked to see if they can be deleted according
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/data/kafka-logs #可自己定义log的目录,生产中需要改
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168 # 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
# The interval at which log segments are checked to see if they can be deleted according
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=10.10.37.27:2181,10.10.77.8:2181,10.10.24.86:2181/kafka01
#生产中如果有多个kafka,需要在ZK中为每个kafka建一个目录,采取下面的方式
#zookeeper.connect=10.52.110.48:2182,10.52.48.92:2182,10.52.60.235:2182/kafka01
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
#这就是说,这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。
delete.topic.enable=true
启动kafka:
每个节点上执行:
nohup kafka-server-start.sh /data/kafka_2.11-2.3.1/config/server.properties >> /dev/null &
检查:
用ansible每个节点执行jps, 看有没有kafka的进程
#能否建立topic
kafka-topics.sh --zookeeper 10.52.5.208:2181,10.52.5.209:2181,10.52.5.210:2181/kafka01 --create --partitions 2 --replication-factor 2 --topic test-hong
kafka-topics.sh --zookeeper 10.52.5.208:2181,10.52.5.209:2181,10.52.5.210:2181/kafka01 --list
新加节点:
新加节点后数据没有. 一个是kafka配置文件设置了均衡,过一段时间会知道均衡
二是手动用调副本的命令调leader.
新节点一台台加,再旧节点一台台下.
退役: 一台一台的下
kafka-server-stop.sh
#停kafka服务不要用kill方式, 用kafka-server-stop.sh方式, 生产需要等久一些.
附: 用supervsior管理kafka启动
https://www.cnblogs.com/hongfeng2019/p/11949073.html