Kafka部署实战案例
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.搭建zookeeper集群
博主推荐阅读: https://www.cnblogs.com/yinzhengjie2020/p/12501023.html
二.下载kafka软件包
1>.打开kafka官网
官网地址: http://kafka.apache.org/
2>.选择二进制的软件包
下载地址: http://kafka.apache.org/downloads 温馨提示: kafka的服务端是使用scala语言编写,而客户端确是用java语言编写。 kafka的软件包命名规则如下(此处我们以"kafka_2.13-2.5.0.tgz"为例): 2.13是scala语言的版本号; 2.5.0是kafka的版本号;
3>.下载kafka二进制软件包
三.kafka集群搭建
1>.集群规划
kafka201.yinzhengjie.com节点分配角色如下:
zookeeper,kafka
kafka201.yinzhengjie.com节点分配角色如下:
zookeeper,kafka
kafka201.yinzhengjie.com节点分配角色如下:
zookeeper,kafka
2>.解压软件包并创建符号链接
[root@kafka201.yinzhengjie.com ~]# tar zxf kafka_2.13-2.5.0.tgz -C /yinzhengjie/softwares/ [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# ln -sv /yinzhengjie/softwares/kafka_2.13-2.5.0/ /yinzhengjie/softwares/kafka ‘/yinzhengjie/softwares/kafka’ -> ‘/yinzhengjie/softwares/kafka_2.13-2.5.0/’ [root@kafka201.yinzhengjie.com ~]#
3>.配置环境变量并分配到其它节点
[root@kafka201.yinzhengjie.com ~]# vim /etc/profile.d/kafka.sh [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# cat /etc/profile.d/kafka.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com KAFKA_HOME=/yinzhengjie/softwares/kafka PATH=$PATH:$KAFKA_HOME/bin [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# source /etc/profile.d/kafka.sh [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# kafka- #如果按2下"tab"键能有如下提示说明环境变量配置成功 kafka-acls.sh kafka-consumer-groups.sh kafka-leader-election.sh kafka-reassign-partitions.sh kafka-streams-application-reset.sh kafka-broker-api-versions.sh kafka-consumer-perf-test.sh kafka-log-dirs.sh kafka-replica-verification.sh kafka-topics.sh kafka-configs.sh kafka-delegation-tokens.sh kafka-mirror-maker.sh kafka-run-class.sh kafka-verifiable-consumer.sh kafka-console-consumer.sh kafka-delete-records.sh kafka-preferred-replica-election.sh kafka-server-start.sh kafka-verifiable-producer.sh kafka-console-producer.sh kafka-dump-log.sh kafka-producer-perf-test.sh kafka-server-stop.sh [root@kafka201.yinzhengjie.com ~]# kafka-
[root@kafka201.yinzhengjie.com ~]# scp /etc/profile.d/kafka.sh kafka202.yinzhengjie.com:/etc/profile.d/ kafka.sh 100% 169 72.2KB/s 00:00 [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# scp /etc/profile.d/kafka.sh kafka203.yinzhengjie.com:/etc/profile.d/ kafka.sh 100% 169 64.5KB/s 00:00 [root@kafka201.yinzhengjie.com ~]#
4>.分发软件包
温馨提示:
我们可以把软件包传输到其它2个节点重复上面的操作,但为了方便起见,我们可以直接将解压的软件包文件远程拷贝到其他节点哟~
[root@kafka201.yinzhengjie.com ~]# scp -r /yinzhengjie/softwares/kafka kafka202.yinzhengjie.com:/yinzhengjie/softwares/ [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# scp -r /yinzhengjie/softwares/kafka kafka203.yinzhengjie.com:/yinzhengjie/softwares/ [root@kafka201.yinzhengjie.com ~]#
5>.各节点修改kafka的配置文件
[root@kafka201.yinzhengjie.com ~]# vim /yinzhengjie/softwares/kafka/config/server.properties [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# cat /yinzhengjie/softwares/kafka/config/server.properties # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# #每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况 broker.id=201 #这就是说,这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 delete.topic.enable=true #是否允许自动创建topic,若是false,就需要通过命令创建topic auto.create.topics.enable=false ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #Socket服务器侦听的地址。如果没有配置,它将获得从Java.NET.InAddio.GETCANONICALITHAMEMENE()返回的值 #listeners=PLAINTEXT://10.1.3.116:9092 #broker server服务端口 port=9092 #broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置 host.name=kafka201.yinzhengjie.com # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #kafka 0.9.x以后的版本新增了advertised.listeners配置,kafka 0.9.x以后的版本不要使用 advertised.host.name 和 advertised.host.port 已经deprecated.如果配置的话,它使用 "listeners" 的值。否则, 它将使用从java.net.InetAddress.getCanonicalHostName()返回的值。#advertised.listeners=PLAINTEXT://your.host.name:9092 #将侦听器(listener)名称映射到安全协议,默认情况下它们是相同的。有关详细信息,请参阅配置文档。 #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL #处理网络请求的最大线程数 num.network.threads=30 #处理磁盘I/O的线程数 num.io.threads=30 #套接字服务器使用的发送缓冲区(SOYSNDBUF) socket.send.buffer.bytes=5242880 #套接字服务器使用的接收缓冲区(SOYRCVBUF) socket.receive.buffer.bytes=5242880 #套接字服务器将接受的请求的最大大小(对OOM的保护) socket.request.max.bytes=104857600 #I/O线程等待队列中的最大的请求数,超过这个数量,network线程就不会再接收一个新的请求。应该是一种自我保护机制。 queued.max.requests=1000 ############################# Log Basics ############################# #日志存放目录,多个目录使用逗号分割,如果你有多块磁盘,建议配置成多个目录,从而达到I/O的效率的提升。 log.dirs=/yinzhengjie/data/kafka #每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖 num.partitions=20 #在启动时恢复日志和关闭时刷盘日志时每个数据目录的线程的数量,默认1 num.recovery.threads.per.data.dir=1 # 默认副本数 default.replication.factor=2 #服务器接受单个消息的最大大小,即消息体的最大大小,单位是字节 message.max.bytes=104857600 # 自动负载均衡,如果设为true,复制控制器会周期性的自动尝试,为所有的broker的每个partition平衡leadership,为更优先(preferred)的replica分配leadership。 # auto.leader.rebalance.enable=false ############################# Log Flush Policy ############################# #在强制fsync一个partition的log文件之前暂存的消息数量。调低这个值会更频繁的sync数据到磁盘,影响性能。通常建议人家使用replication来确保持久性,而不是依靠单机上的fsync,但是这可以带来更多的可 靠性,默认10000。#log.flush.interval.messages=10000 #2次fsync调用之间最大的时间间隔,单位为ms。即使log.flush.interval.messages没有达到,只要这个时间到了也需要调用fsync。默认3000ms. #log.flush.interval.ms=10000 ############################# Log Retention Policy ############################# # 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。 log.retention.hours=168 #日志数据存储的最大字节数。超过这个时间会根据policy处理数据。 #log.retention.bytes=1073741824 #控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制) log.segment.bytes=536870912 # 当达到下面时间,会强制新建一个segment #log.roll.hours = 24*7 # 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes) log.retention.check.interval.ms=600000 #是否开启压缩 #log.cleaner.enable=false #日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖 #log.cleanup.policy=delete # 日志压缩运行的线程数 #log.cleaner.threads=2 # 压缩的日志保留的最长时间 #log.cleaner.delete.retention.ms=3600000 ############################# Zookeeper ############################# #zookeeper集群的地址,可以是多个,多个之间用逗号分割. zookeeper.connect=172.200.4.201:2181,172.200.4.202:2181,172.200.4.203:2181/yinzhengjie-kafka #ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大 zookeeper.session.timeout.ms=180000 #指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息,连接zk的超时时间 zookeeper.connection.timeout.ms=6000 #请求的最大大小为字节,请求的最大字节数。这也是对最大记录尺寸的有效覆盖。注意:server具有自己对消息记录尺寸的覆盖,这些尺寸和这个设置不同。此项设置将会限制producer每次批量发送请求的数目,以 防发出巨量的请求。max.request.size=104857600 #每次fetch请求中,针对每次fetch消息的最大字节数。这些字节将会督导用于每个partition的内存中,因此,此设置将会控制consumer所使用的memory大小。这个fetch请求尺寸必须至少和server允许的最大消息尺 寸相等,否则,producer可能发送的消息尺寸大于consumer所能消耗的尺寸。fetch.message.max.bytes=104857600 #ZooKeeper集群中leader和follower之间的同步时间,换句话说:一个ZK follower能落后leader多久。 #zookeeper.sync.time.ms=2000 ############################# Replica Basics ############################# # leader接收follower的"fetch请求"的超时时间,默认是10秒。 # replica.lag.time.max.ms=30000 # 如果relicas落后太多,将会认为此partition relicas已经失效。而一般情况下,因为网络延迟等原因,总会导致replicas中消息同步滞后。如果消息严重滞后,leader将认为此relicas网络延迟较大或者消息吞吐能力 有限。在broker数量较少,或者网络不足的环境中,建议提高此值.follower落后于leader的最大message数,这个参数是broker全局的。设置太大 了,影响真正“落后”follower的移除;设置的太小了,导致follower的频繁进出。无法给定一个合适的replica.lag.max.messages的值,因此不推荐使用,据说新版本的Kafka移除了这个参数。#replica.lag.max.messages=4000 # follower与leader之间的socket超时时间 #replica.socket.timeout.ms=30000 # follower每次fetch数据的最大尺寸 replica.fetch.max.bytes=104857600 # follower的fetch请求超时重发时间 replica.fetch.wait.max.ms=2000 # fetch的最小数据尺寸 #replica.fetch.min.bytes=1 # 是否允许控制器关闭broker ,默认值为true,它会关闭所有在这个broker上的leader,并转移到其他broker,建议启用,增加集群稳定性。 # controlled.shutdown.enable = false #0.11.0.0版本开始unclean.leader.election.enable参数的默认值由原来的true改为false,可以关闭unclean leader election,也就是不在ISR(IN-Sync Replica)列表中的replica,不会被提升为新的leader part ition。kafka集群的持久化力大于可用性,如果ISR中没有其它的replica,会导致这个partition不能读写。unclean.leader.election=false # follower中开启的fetcher线程数, 同步速度与系统负载均衡 num.replica.fetchers=5 # partition leader与replicas之间通讯时,socket的超时时间 #controller.socket.timeout.ms=30000 # partition leader与replicas数据同步时,消息的队列尺寸. #controller.message.queue.size=10 #指定将使用哪个版本的 inter-broker 协议。 在所有经纪人升级到新版本之后,这通常会受到冲击。升级时要设置 #inter.broker.protocol.version=0.10.1 #指定broker将用于将消息添加到日志文件的消息格式版本。 该值应该是有效的ApiVersion。 一些例子是:0.8.2,0.9.0.0,0.10.0。 通过设置特定的消息格式版本,用户保证磁盘上的所有现有消息都小于或等于 指定的版本。 不正确地设置这个值将导致使用旧版本的用户出错,因为他们将接收到他们不理解的格式的消息。#log.message.format.version=0.10.1 [root@kafka201.yinzhengjie.com ~]#
[root@kafka202.yinzhengjie.com ~]# vim /yinzhengjie/softwares/kafka/config/server.properties [root@kafka202.yinzhengjie.com ~]# [root@kafka202.yinzhengjie.com ~]# cat /yinzhengjie/softwares/kafka/config/server.properties # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# #每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况 broker.id=202 #这就是说,这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 delete.topic.enable=true #是否允许自动创建topic,若是false,就需要通过命令创建topic auto.create.topics.enable=false ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #Socket服务器侦听的地址。如果没有配置,它将获得从Java.NET.InAddio.GETCANONICALITHAMEMENE()返回的值 #listeners=PLAINTEXT://10.1.3.116:9092 #broker server服务端口 port=9092 #broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置 host.name=kafka202.yinzhengjie.com # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #kafka 0.9.x以后的版本新增了advertised.listeners配置,kafka 0.9.x以后的版本不要使用 advertised.host.name 和 advertised.host.port 已经deprecated.如果配置的话,它使用 "listeners" 的值。否则, 它将使用从java.net.InetAddress.getCanonicalHostName()返回的值。#advertised.listeners=PLAINTEXT://your.host.name:9092 #将侦听器(listener)名称映射到安全协议,默认情况下它们是相同的。有关详细信息,请参阅配置文档。 #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL #处理网络请求的最大线程数 num.network.threads=30 #处理磁盘I/O的线程数 num.io.threads=30 #套接字服务器使用的发送缓冲区(SOYSNDBUF) socket.send.buffer.bytes=5242880 #套接字服务器使用的接收缓冲区(SOYRCVBUF) socket.receive.buffer.bytes=5242880 #套接字服务器将接受的请求的最大大小(对OOM的保护) socket.request.max.bytes=104857600 #I/O线程等待队列中的最大的请求数,超过这个数量,network线程就不会再接收一个新的请求。应该是一种自我保护机制。 queued.max.requests=1000 ############################# Log Basics ############################# #日志存放目录,多个目录使用逗号分割,如果你有多块磁盘,建议配置成多个目录,从而达到I/O的效率的提升。 log.dirs=/yinzhengjie/data/kafka #每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖 num.partitions=20 #在启动时恢复日志和关闭时刷盘日志时每个数据目录的线程的数量,默认1 num.recovery.threads.per.data.dir=1 # 默认副本数 default.replication.factor=2 #服务器接受单个消息的最大大小,即消息体的最大大小,单位是字节 message.max.bytes=104857600 # 自动负载均衡,如果设为true,复制控制器会周期性的自动尝试,为所有的broker的每个partition平衡leadership,为更优先(preferred)的replica分配leadership。 # auto.leader.rebalance.enable=false ############################# Log Flush Policy ############################# #在强制fsync一个partition的log文件之前暂存的消息数量。调低这个值会更频繁的sync数据到磁盘,影响性能。通常建议人家使用replication来确保持久性,而不是依靠单机上的fsync,但是这可以带来更多的可 靠性,默认10000。#log.flush.interval.messages=10000 #2次fsync调用之间最大的时间间隔,单位为ms。即使log.flush.interval.messages没有达到,只要这个时间到了也需要调用fsync。默认3000ms. #log.flush.interval.ms=10000 ############################# Log Retention Policy ############################# # 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。 log.retention.hours=168 #日志数据存储的最大字节数。超过这个时间会根据policy处理数据。 #log.retention.bytes=1073741824 #控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制) log.segment.bytes=536870912 # 当达到下面时间,会强制新建一个segment #log.roll.hours = 24*7 # 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes) log.retention.check.interval.ms=600000 #是否开启压缩 #log.cleaner.enable=false #日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖 #log.cleanup.policy=delete # 日志压缩运行的线程数 #log.cleaner.threads=2 # 压缩的日志保留的最长时间 #log.cleaner.delete.retention.ms=3600000 ############################# Zookeeper ############################# #zookeeper集群的地址,可以是多个,多个之间用逗号分割. zookeeper.connect=172.200.4.201:2181,172.200.4.202:2181,172.200.4.203:2181/yinzhengjie-kafka #ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大 zookeeper.session.timeout.ms=180000 #指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息,连接zk的超时时间 zookeeper.connection.timeout.ms=6000 #请求的最大大小为字节,请求的最大字节数。这也是对最大记录尺寸的有效覆盖。注意:server具有自己对消息记录尺寸的覆盖,这些尺寸和这个设置不同。此项设置将会限制producer每次批量发送请求的数目,以 防发出巨量的请求。max.request.size=104857600 #每次fetch请求中,针对每次fetch消息的最大字节数。这些字节将会督导用于每个partition的内存中,因此,此设置将会控制consumer所使用的memory大小。这个fetch请求尺寸必须至少和server允许的最大消息尺 寸相等,否则,producer可能发送的消息尺寸大于consumer所能消耗的尺寸。fetch.message.max.bytes=104857600 #ZooKeeper集群中leader和follower之间的同步时间,换句话说:一个ZK follower能落后leader多久。 #zookeeper.sync.time.ms=2000 ############################# Replica Basics ############################# # leader接收follower的"fetch请求"的超时时间,默认是10秒。 # replica.lag.time.max.ms=30000 # 如果relicas落后太多,将会认为此partition relicas已经失效。而一般情况下,因为网络延迟等原因,总会导致replicas中消息同步滞后。如果消息严重滞后,leader将认为此relicas网络延迟较大或者消息吞吐能力 有限。在broker数量较少,或者网络不足的环境中,建议提高此值.follower落后于leader的最大message数,这个参数是broker全局的。设置太大 了,影响真正“落后”follower的移除;设置的太小了,导致follower的频繁进出。无法给定一个合适的replica.lag.max.messages的值,因此不推荐使用,据说新版本的Kafka移除了这个参数。#replica.lag.max.messages=4000 # follower与leader之间的socket超时时间 #replica.socket.timeout.ms=30000 # follower每次fetch数据的最大尺寸 replica.fetch.max.bytes=104857600 # follower的fetch请求超时重发时间 replica.fetch.wait.max.ms=2000 # fetch的最小数据尺寸 #replica.fetch.min.bytes=1 # 是否允许控制器关闭broker ,默认值为true,它会关闭所有在这个broker上的leader,并转移到其他broker,建议启用,增加集群稳定性。 # controlled.shutdown.enable = false #0.11.0.0版本开始unclean.leader.election.enable参数的默认值由原来的true改为false,可以关闭unclean leader election,也就是不在ISR(IN-Sync Replica)列表中的replica,不会被提升为新的leader part ition。kafka集群的持久化力大于可用性,如果ISR中没有其它的replica,会导致这个partition不能读写。unclean.leader.election=false # follower中开启的fetcher线程数, 同步速度与系统负载均衡 num.replica.fetchers=5 # partition leader与replicas之间通讯时,socket的超时时间 #controller.socket.timeout.ms=30000 # partition leader与replicas数据同步时,消息的队列尺寸. #controller.message.queue.size=10 #指定将使用哪个版本的 inter-broker 协议。 在所有经纪人升级到新版本之后,这通常会受到冲击。升级时要设置 #inter.broker.protocol.version=0.10.1 #指定broker将用于将消息添加到日志文件的消息格式版本。 该值应该是有效的ApiVersion。 一些例子是:0.8.2,0.9.0.0,0.10.0。 通过设置特定的消息格式版本,用户保证磁盘上的所有现有消息都小于或等于 指定的版本。 不正确地设置这个值将导致使用旧版本的用户出错,因为他们将接收到他们不理解的格式的消息。#log.message.format.version=0.10.1 [root@kafka202.yinzhengjie.com ~]#
[root@kafka203.yinzhengjie.com ~]# vim /yinzhengjie/softwares/kafka/config/server.properties [root@kafka203.yinzhengjie.com ~]# [root@kafka203.yinzhengjie.com ~]# cat /yinzhengjie/softwares/kafka/config/server.properties # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# #每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况 broker.id=203 #这就是说,这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 delete.topic.enable=true #是否允许自动创建topic,若是false,就需要通过命令创建topic auto.create.topics.enable=false ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #Socket服务器侦听的地址。如果没有配置,它将获得从Java.NET.InAddio.GETCANONICALITHAMEMENE()返回的值 #listeners=PLAINTEXT://10.1.3.116:9092 #broker server服务端口 port=9092 #broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置 host.name=kafka203.yinzhengjie.com # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #kafka 0.9.x以后的版本新增了advertised.listeners配置,kafka 0.9.x以后的版本不要使用 advertised.host.name 和 advertised.host.port 已经deprecated.如果配置的话,它使用 "listeners" 的值。否则, 它将使用从java.net.InetAddress.getCanonicalHostName()返回的值。#advertised.listeners=PLAINTEXT://your.host.name:9092 #将侦听器(listener)名称映射到安全协议,默认情况下它们是相同的。有关详细信息,请参阅配置文档。 #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL #处理网络请求的最大线程数 num.network.threads=30 #处理磁盘I/O的线程数 num.io.threads=30 #套接字服务器使用的发送缓冲区(SOYSNDBUF) socket.send.buffer.bytes=5242880 #套接字服务器使用的接收缓冲区(SOYRCVBUF) socket.receive.buffer.bytes=5242880 #套接字服务器将接受的请求的最大大小(对OOM的保护) socket.request.max.bytes=104857600 #I/O线程等待队列中的最大的请求数,超过这个数量,network线程就不会再接收一个新的请求。应该是一种自我保护机制。 queued.max.requests=1000 ############################# Log Basics ############################# #日志存放目录,多个目录使用逗号分割,如果你有多块磁盘,建议配置成多个目录,从而达到I/O的效率的提升。 log.dirs=/yinzhengjie/data/kafka #每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖 num.partitions=20 #在启动时恢复日志和关闭时刷盘日志时每个数据目录的线程的数量,默认1 num.recovery.threads.per.data.dir=1 # 默认副本数 default.replication.factor=2 #服务器接受单个消息的最大大小,即消息体的最大大小,单位是字节 message.max.bytes=104857600 # 自动负载均衡,如果设为true,复制控制器会周期性的自动尝试,为所有的broker的每个partition平衡leadership,为更优先(preferred)的replica分配leadership。 # auto.leader.rebalance.enable=false ############################# Log Flush Policy ############################# #在强制fsync一个partition的log文件之前暂存的消息数量。调低这个值会更频繁的sync数据到磁盘,影响性能。通常建议人家使用replication来确保持久性,而不是依靠单机上的fsync,但是这可以带来更多的可 靠性,默认10000。#log.flush.interval.messages=10000 #2次fsync调用之间最大的时间间隔,单位为ms。即使log.flush.interval.messages没有达到,只要这个时间到了也需要调用fsync。默认3000ms. #log.flush.interval.ms=10000 ############################# Log Retention Policy ############################# # 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。 log.retention.hours=168 #日志数据存储的最大字节数。超过这个时间会根据policy处理数据。 #log.retention.bytes=1073741824 #控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制) log.segment.bytes=536870912 # 当达到下面时间,会强制新建一个segment #log.roll.hours = 24*7 # 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes) log.retention.check.interval.ms=600000 #是否开启压缩 #log.cleaner.enable=false #日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖 #log.cleanup.policy=delete # 日志压缩运行的线程数 #log.cleaner.threads=2 # 压缩的日志保留的最长时间 #log.cleaner.delete.retention.ms=3600000 ############################# Zookeeper ############################# #zookeeper集群的地址,可以是多个,多个之间用逗号分割. zookeeper.connect=172.200.4.201:2181,172.200.4.202:2181,172.200.4.203:2181/yinzhengjie-kafka #ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大 zookeeper.session.timeout.ms=180000 #指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息,连接zk的超时时间 zookeeper.connection.timeout.ms=6000 #请求的最大大小为字节,请求的最大字节数。这也是对最大记录尺寸的有效覆盖。注意:server具有自己对消息记录尺寸的覆盖,这些尺寸和这个设置不同。此项设置将会限制producer每次批量发送请求的数目,以 防发出巨量的请求。max.request.size=104857600 #每次fetch请求中,针对每次fetch消息的最大字节数。这些字节将会督导用于每个partition的内存中,因此,此设置将会控制consumer所使用的memory大小。这个fetch请求尺寸必须至少和server允许的最大消息尺 寸相等,否则,producer可能发送的消息尺寸大于consumer所能消耗的尺寸。fetch.message.max.bytes=104857600 #ZooKeeper集群中leader和follower之间的同步时间,换句话说:一个ZK follower能落后leader多久。 #zookeeper.sync.time.ms=2000 ############################# Replica Basics ############################# # leader接收follower的"fetch请求"的超时时间,默认是10秒。 # replica.lag.time.max.ms=30000 # 如果relicas落后太多,将会认为此partition relicas已经失效。而一般情况下,因为网络延迟等原因,总会导致replicas中消息同步滞后。如果消息严重滞后,leader将认为此relicas网络延迟较大或者消息吞吐能力 有限。在broker数量较少,或者网络不足的环境中,建议提高此值.follower落后于leader的最大message数,这个参数是broker全局的。设置太大 了,影响真正“落后”follower的移除;设置的太小了,导致follower的频繁进出。无法给定一个合适的replica.lag.max.messages的值,因此不推荐使用,据说新版本的Kafka移除了这个参数。#replica.lag.max.messages=4000 # follower与leader之间的socket超时时间 #replica.socket.timeout.ms=30000 # follower每次fetch数据的最大尺寸 replica.fetch.max.bytes=104857600 # follower的fetch请求超时重发时间 replica.fetch.wait.max.ms=2000 # fetch的最小数据尺寸 #replica.fetch.min.bytes=1 # 是否允许控制器关闭broker ,默认值为true,它会关闭所有在这个broker上的leader,并转移到其他broker,建议启用,增加集群稳定性。 # controlled.shutdown.enable = false #0.11.0.0版本开始unclean.leader.election.enable参数的默认值由原来的true改为false,可以关闭unclean leader election,也就是不在ISR(IN-Sync Replica)列表中的replica,不会被提升为新的leader part ition。kafka集群的持久化力大于可用性,如果ISR中没有其它的replica,会导致这个partition不能读写。unclean.leader.election=false # follower中开启的fetcher线程数, 同步速度与系统负载均衡 num.replica.fetchers=5 # partition leader与replicas之间通讯时,socket的超时时间 #controller.socket.timeout.ms=30000 # partition leader与replicas数据同步时,消息的队列尺寸. #controller.message.queue.size=10 #指定将使用哪个版本的 inter-broker 协议。 在所有经纪人升级到新版本之后,这通常会受到冲击。升级时要设置 #inter.broker.protocol.version=0.10.1 #指定broker将用于将消息添加到日志文件的消息格式版本。 该值应该是有效的ApiVersion。 一些例子是:0.8.2,0.9.0.0,0.10.0。 通过设置特定的消息格式版本,用户保证磁盘上的所有现有消息都小于或等于 指定的版本。 不正确地设置这个值将导致使用旧版本的用户出错,因为他们将接收到他们不理解的格式的消息。#log.message.format.version=0.10.1 [root@kafka203.yinzhengjie.com ~]#
6>.编写kafka启动脚本
[root@kafka201.yinzhengjie.com ~]# vim /yinzhengjie/softwares/kafka/bin/kafka-server-start.sh [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# cat /yinzhengjie/softwares/kafka/bin/kafka-server-start.sh #!/bin/bash # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. if [ $# -lt 1 ]; then echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" exit 1 fi base_dir=$(dirname $0) if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties" fi if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G" fi EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'} COMMAND=$1 case $COMMAND in -daemon) EXTRA_ARGS="-daemon "$EXTRA_ARGS shift ;; *) ;; esac exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" [root@kafka201.yinzhengjie.com ~]#
[root@kafka201.yinzhengjie.com ~]# vim /usr/local/bin/kafka.sh [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# cat /usr/local/bin/kafka.sh #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #判断用户是否传参 if [ $# -ne 1 ];then echo "无效参数,用法为: $0 {start|stop}" exit fi #获取用户输入的命令 cmd=$1 for (( i=201 ; i<=203 ; i++ )) ; do tput setaf 2 echo ========== kafka${i}.yinzhengjie.com $cmd ================ tput setaf 9 case $cmd in start) ssh kafka${i}.yinzhengjie.com "source /etc/profile ; kafka-server-start.sh -daemon /yinzhengjie/softwares/kafka/config/server.properties" echo kafka${i}.yinzhengjie.com "服务已启动" ;; stop) ssh kafka${i}.yinzhengjie.com "source /etc/profile ; kafka-server-stop.sh" echo kafka${i}.yinzhengjie.com "服务已停止" ;; *) echo "无效参数,用法为: $0 {start|stop}" exit ;; esac done [root@kafka201.yinzhengjie.com ~]#
[root@kafka201.yinzhengjie.com ~]# kafka.sh start ========== kafka201.yinzhengjie.com start ================ kafka201.yinzhengjie.com 服务已启动 ========== kafka202.yinzhengjie.com start ================ kafka202.yinzhengjie.com 服务已启动 ========== kafka203.yinzhengjie.com start ================ kafka203.yinzhengjie.com 服务已启动 [root@kafka201.yinzhengjie.com ~]#
[root@kafka201.yinzhengjie.com ~]# kafka.sh stop ========== kafka201.yinzhengjie.com stop ================ kafka201.yinzhengjie.com 服务已停止 ========== kafka202.yinzhengjie.com stop ================ kafka202.yinzhengjie.com 服务已停止 ========== kafka203.yinzhengjie.com stop ================ kafka203.yinzhengjie.com 服务已停止 [root@kafka201.yinzhengjie.com ~]#
7>.使用ansible查看各节点进程是否存在
[root@kafka201.yinzhengjie.com ~]# yum -y install epel-release Loaded plugins: fastestmirror Loading mirror speeds from cached hostfile * base: mirrors.huaweicloud.com * extras: mirrors.bfsu.edu.cn * updates: mirrors.bfsu.edu.cn Resolving Dependencies --> Running transaction check ---> Package epel-release.noarch 0:7-11 will be installed --> Finished Dependency Resolution Dependencies Resolved ============================================================================================================================================================================================= Package Arch Version Repository Size ============================================================================================================================================================================================= Installing: epel-release noarch 7-11 extras 15 k Transaction Summary ============================================================================================================================================================================================= Install 1 Package Total download size: 15 k Installed size: 24 k Downloading packages: epel-release-7-11.noarch.rpm | 15 kB 00:00:00 Running transaction check Running transaction test Transaction test succeeded Running transaction Installing : epel-release-7-11.noarch 1/1 Verifying : epel-release-7-11.noarch 1/1 Installed: epel-release.noarch 0:7-11 Complete! [root@kafka201.yinzhengjie.com ~]#
[root@kafka201.yinzhengjie.com ~]# yum -y install ansible Loaded plugins: fastestmirror Loading mirror speeds from cached hostfile epel/x86_64/metalink | 3.9 kB 00:00:00 * base: mirrors.huaweicloud.com * epel: mirrors.njupt.edu.cn * extras: mirrors.bfsu.edu.cn * updates: mirrors.bfsu.edu.cn epel | 5.4 kB 00:00:00 (1/3): epel/x86_64/group_gz | 95 kB 00:00:00 epel/x86_64/updateinfo FAILED https://hkg.mirror.rackspace.com/epel/7/x86_64/repodata/bb658e425356e5e4571a6353d6e437b2f396801ee35972107b4da19a97831290-updateinfo.xml.bz2: [Errno 14] HTTPS Error 404 - Not Found--:-- ETA Trying other mirror. To address this issue please refer to the below wiki article https://wiki.centos.org/yum-errors If above article doesn't help to resolve this issue please use https://bugs.centos.org/. (2/3): epel/x86_64/updateinfo | 1.0 MB 00:00:00 epel/x86_64/primary_db FAILED 14% [========== ] 3.6 B/s | 1.2 MB 546:28:12 ETA http://sg.fedora.ipserverone.com/epel/7/x86_64/repodata/cede34f70d016762089672bbbc35329cad728c5e9d529acef8f311cf0dd22f2b-primary.sqlite.bz2: [Errno 12] Timeout on http://sg.fedora.ipservero ne.com/epel/7/x86_64/repodata/cede34f70d016762089672bbbc35329cad728c5e9d529acef8f311cf0dd22f2b-primary.sqlite.bz2: (28, 'Operation too slow. Less than 1000 bytes/sec transferred the last 30 seconds')Trying other mirror. (3/3): epel/x86_64/primary_db | 6.8 MB 00:00:00 Resolving Dependencies --> Running transaction check ---> Package ansible.noarch 0:2.9.9-1.el7 will be installed --> Processing Dependency: PyYAML for package: ansible-2.9.9-1.el7.noarch --> Processing Dependency: python-httplib2 for package: ansible-2.9.9-1.el7.noarch --> Processing Dependency: python-jinja2 for package: ansible-2.9.9-1.el7.noarch --> Processing Dependency: python-paramiko for package: ansible-2.9.9-1.el7.noarch --> Processing Dependency: python-setuptools for package: ansible-2.9.9-1.el7.noarch --> Processing Dependency: python-six for package: ansible-2.9.9-1.el7.noarch --> Processing Dependency: python2-cryptography for package: ansible-2.9.9-1.el7.noarch --> Processing Dependency: python2-jmespath for package: ansible-2.9.9-1.el7.noarch --> Processing Dependency: sshpass for package: ansible-2.9.9-1.el7.noarch --> Running transaction check ---> Package PyYAML.x86_64 0:3.10-11.el7 will be installed --> Processing Dependency: libyaml-0.so.2()(64bit) for package: PyYAML-3.10-11.el7.x86_64 ---> Package python-httplib2.noarch 0:0.9.2-1.el7 will be installed ---> Package python-jinja2.noarch 0:2.7.2-4.el7 will be installed --> Processing Dependency: python-babel >= 0.8 for package: python-jinja2-2.7.2-4.el7.noarch --> Processing Dependency: python-markupsafe for package: python-jinja2-2.7.2-4.el7.noarch ---> Package python-paramiko.noarch 0:2.1.1-9.el7 will be installed --> Processing Dependency: python2-pyasn1 for package: python-paramiko-2.1.1-9.el7.noarch ---> Package python-setuptools.noarch 0:0.9.8-7.el7 will be installed --> Processing Dependency: python-backports-ssl_match_hostname for package: python-setuptools-0.9.8-7.el7.noarch ---> Package python-six.noarch 0:1.9.0-2.el7 will be installed ---> Package python2-cryptography.x86_64 0:1.7.2-2.el7 will be installed --> Processing Dependency: python-idna >= 2.0 for package: python2-cryptography-1.7.2-2.el7.x86_64 --> Processing Dependency: python-cffi >= 1.4.1 for package: python2-cryptography-1.7.2-2.el7.x86_64 --> Processing Dependency: python-ipaddress for package: python2-cryptography-1.7.2-2.el7.x86_64 --> Processing Dependency: python-enum34 for package: python2-cryptography-1.7.2-2.el7.x86_64 ---> Package python2-jmespath.noarch 0:0.9.4-2.el7 will be installed ---> Package sshpass.x86_64 0:1.06-2.el7 will be installed --> Running transaction check ---> Package libyaml.x86_64 0:0.1.4-11.el7_0 will be installed ---> Package python-babel.noarch 0:0.9.6-8.el7 will be installed ---> Package python-backports-ssl_match_hostname.noarch 0:3.5.0.1-1.el7 will be installed --> Processing Dependency: python-backports for package: python-backports-ssl_match_hostname-3.5.0.1-1.el7.noarch ---> Package python-cffi.x86_64 0:1.6.0-5.el7 will be installed --> Processing Dependency: python-pycparser for package: python-cffi-1.6.0-5.el7.x86_64 ---> Package python-enum34.noarch 0:1.0.4-1.el7 will be installed ---> Package python-idna.noarch 0:2.4-1.el7 will be installed ---> Package python-ipaddress.noarch 0:1.0.16-2.el7 will be installed ---> Package python-markupsafe.x86_64 0:0.11-10.el7 will be installed ---> Package python2-pyasn1.noarch 0:0.1.9-7.el7 will be installed --> Running transaction check ---> Package python-backports.x86_64 0:1.0-8.el7 will be installed ---> Package python-pycparser.noarch 0:2.14-1.el7 will be installed --> Processing Dependency: python-ply for package: python-pycparser-2.14-1.el7.noarch --> Running transaction check ---> Package python-ply.noarch 0:3.4-11.el7 will be installed --> Finished Dependency Resolution Dependencies Resolved ============================================================================================================================================================================================= Package Arch Version Repository Size ============================================================================================================================================================================================= Installing: ansible noarch 2.9.9-1.el7 epel 17 M Installing for dependencies: PyYAML x86_64 3.10-11.el7 base 153 k libyaml x86_64 0.1.4-11.el7_0 base 55 k python-babel noarch 0.9.6-8.el7 base 1.4 M python-backports x86_64 1.0-8.el7 base 5.8 k python-backports-ssl_match_hostname noarch 3.5.0.1-1.el7 base 13 k python-cffi x86_64 1.6.0-5.el7 base 218 k python-enum34 noarch 1.0.4-1.el7 base 52 k python-httplib2 noarch 0.9.2-1.el7 extras 115 k python-idna noarch 2.4-1.el7 base 94 k python-ipaddress noarch 1.0.16-2.el7 base 34 k python-jinja2 noarch 2.7.2-4.el7 base 519 k python-markupsafe x86_64 0.11-10.el7 base 25 k python-paramiko noarch 2.1.1-9.el7 base 269 k python-ply noarch 3.4-11.el7 base 123 k python-pycparser noarch 2.14-1.el7 base 104 k python-setuptools noarch 0.9.8-7.el7 base 397 k python-six noarch 1.9.0-2.el7 base 29 k python2-cryptography x86_64 1.7.2-2.el7 base 502 k python2-jmespath noarch 0.9.4-2.el7 epel 41 k python2-pyasn1 noarch 0.1.9-7.el7 base 100 k sshpass x86_64 1.06-2.el7 extras 21 k Transaction Summary ============================================================================================================================================================================================= Install 1 Package (+21 Dependent packages) Total download size: 22 M Installed size: 124 M Downloading packages: (1/22): PyYAML-3.10-11.el7.x86_64.rpm | 153 kB 00:00:00 (2/22): libyaml-0.1.4-11.el7_0.x86_64.rpm | 55 kB 00:00:00 (3/22): python-backports-ssl_match_hostname-3.5.0.1-1.el7.noarch.rpm | 13 kB 00:00:00 (4/22): python-babel-0.9.6-8.el7.noarch.rpm | 1.4 MB 00:00:00 (5/22): python-cffi-1.6.0-5.el7.x86_64.rpm | 218 kB 00:00:00 (6/22): python-enum34-1.0.4-1.el7.noarch.rpm | 52 kB 00:00:00 (7/22): python-backports-1.0-8.el7.x86_64.rpm | 5.8 kB 00:00:00 (8/22): python-idna-2.4-1.el7.noarch.rpm | 94 kB 00:00:00 (9/22): python-markupsafe-0.11-10.el7.x86_64.rpm | 25 kB 00:00:00 (10/22): python-paramiko-2.1.1-9.el7.noarch.rpm | 269 kB 00:00:00 (11/22): python-ply-3.4-11.el7.noarch.rpm | 123 kB 00:00:00 (12/22): python-ipaddress-1.0.16-2.el7.noarch.rpm | 34 kB 00:00:00 (13/22): python-pycparser-2.14-1.el7.noarch.rpm | 104 kB 00:00:00 (14/22): python-six-1.9.0-2.el7.noarch.rpm | 29 kB 00:00:00 (15/22): python-httplib2-0.9.2-1.el7.noarch.rpm | 115 kB 00:00:00 (16/22): python2-cryptography-1.7.2-2.el7.x86_64.rpm | 502 kB 00:00:00 (17/22): python-setuptools-0.9.8-7.el7.noarch.rpm | 397 kB 00:00:00 (18/22): python-jinja2-2.7.2-4.el7.noarch.rpm | 519 kB 00:00:01 warning: /var/cache/yum/x86_64/7/epel/packages/ansible-2.9.9-1.el7.noarch.rpm: Header V3 RSA/SHA256 Signature, key ID 352c64e5: NOKEY Public key for ansible-2.9.9-1.el7.noarch.rpm is not installed (19/22): ansible-2.9.9-1.el7.noarch.rpm | 17 MB 00:00:02 (20/22): python2-jmespath-0.9.4-2.el7.noarch.rpm | 41 kB 00:00:00 (21/22): sshpass-1.06-2.el7.x86_64.rpm | 21 kB 00:00:00 (22/22): python2-pyasn1-0.1.9-7.el7.noarch.rpm | 100 kB 00:00:00 --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Total 8.3 MB/s | 22 MB 00:00:02 Retrieving key from file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-7 Importing GPG key 0x352C64E5: Userid : "Fedora EPEL (7) <epel@fedoraproject.org>" Fingerprint: 91e9 7d7c 4a5e 96f1 7f3e 888f 6a2f aea2 352c 64e5 Package : epel-release-7-11.noarch (@extras) From : /etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-7 Running transaction check Running transaction test Transaction test succeeded Running transaction Installing : python2-pyasn1-0.1.9-7.el7.noarch 1/22 Installing : python-ipaddress-1.0.16-2.el7.noarch 2/22 Installing : python-six-1.9.0-2.el7.noarch 3/22 Installing : python-httplib2-0.9.2-1.el7.noarch 4/22 Installing : sshpass-1.06-2.el7.x86_64 5/22 Installing : libyaml-0.1.4-11.el7_0.x86_64 6/22 Installing : PyYAML-3.10-11.el7.x86_64 7/22 Installing : python-backports-1.0-8.el7.x86_64 8/22 Installing : python-backports-ssl_match_hostname-3.5.0.1-1.el7.noarch 9/22 Installing : python-setuptools-0.9.8-7.el7.noarch 10/22 Installing : python-babel-0.9.6-8.el7.noarch 11/22 Installing : python2-jmespath-0.9.4-2.el7.noarch 12/22 Installing : python-ply-3.4-11.el7.noarch 13/22 Installing : python-pycparser-2.14-1.el7.noarch 14/22 Installing : python-cffi-1.6.0-5.el7.x86_64 15/22 Installing : python-markupsafe-0.11-10.el7.x86_64 16/22 Installing : python-jinja2-2.7.2-4.el7.noarch 17/22 Installing : python-idna-2.4-1.el7.noarch 18/22 Installing : python-enum34-1.0.4-1.el7.noarch 19/22 Installing : python2-cryptography-1.7.2-2.el7.x86_64 20/22 Installing : python-paramiko-2.1.1-9.el7.noarch 21/22 Installing : ansible-2.9.9-1.el7.noarch 22/22 Verifying : python-backports-ssl_match_hostname-3.5.0.1-1.el7.noarch 1/22 Verifying : python-enum34-1.0.4-1.el7.noarch 2/22 Verifying : python-setuptools-0.9.8-7.el7.noarch 3/22 Verifying : python-jinja2-2.7.2-4.el7.noarch 4/22 Verifying : python-six-1.9.0-2.el7.noarch 5/22 Verifying : python-idna-2.4-1.el7.noarch 6/22 Verifying : python-markupsafe-0.11-10.el7.x86_64 7/22 Verifying : python-ply-3.4-11.el7.noarch 8/22 Verifying : python-paramiko-2.1.1-9.el7.noarch 9/22 Verifying : python2-jmespath-0.9.4-2.el7.noarch 10/22 Verifying : python-babel-0.9.6-8.el7.noarch 11/22 Verifying : python-backports-1.0-8.el7.x86_64 12/22 Verifying : python-cffi-1.6.0-5.el7.x86_64 13/22 Verifying : python-pycparser-2.14-1.el7.noarch 14/22 Verifying : libyaml-0.1.4-11.el7_0.x86_64 15/22 Verifying : ansible-2.9.9-1.el7.noarch 16/22 Verifying : python-ipaddress-1.0.16-2.el7.noarch 17/22 Verifying : sshpass-1.06-2.el7.x86_64 18/22 Verifying : python-httplib2-0.9.2-1.el7.noarch 19/22 Verifying : python2-pyasn1-0.1.9-7.el7.noarch 20/22 Verifying : PyYAML-3.10-11.el7.x86_64 21/22 Verifying : python2-cryptography-1.7.2-2.el7.x86_64 22/22 Installed: ansible.noarch 0:2.9.9-1.el7 Dependency Installed: PyYAML.x86_64 0:3.10-11.el7 libyaml.x86_64 0:0.1.4-11.el7_0 python-babel.noarch 0:0.9.6-8.el7 python-backports.x86_64 0:1.0-8.el7 python-backports-ssl_match_hostname.noarch 0:3.5.0.1-1.el7 python-cffi.x86_64 0:1.6.0-5.el7 python-enum34.noarch 0:1.0.4-1.el7 python-httplib2.noarch 0:0.9.2-1.el7 python-idna.noarch 0:2.4-1.el7 python-ipaddress.noarch 0:1.0.16-2.el7 python-jinja2.noarch 0:2.7.2-4.el7 python-markupsafe.x86_64 0:0.11-10.el7 python-paramiko.noarch 0:2.1.1-9.el7 python-ply.noarch 0:3.4-11.el7 python-pycparser.noarch 0:2.14-1.el7 python-setuptools.noarch 0:0.9.8-7.el7 python-six.noarch 0:1.9.0-2.el7 python2-cryptography.x86_64 0:1.7.2-2.el7 python2-jmespath.noarch 0:0.9.4-2.el7 python2-pyasn1.noarch 0:0.1.9-7.el7 sshpass.x86_64 0:1.06-2.el7 Complete! [root@kafka201.yinzhengjie.com ~]#
[root@kafka201.yinzhengjie.com ~]# vim /etc/ansible/hosts [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# tail -3 /etc/ansible/hosts #add by yinzhengjie for kafka [kafka] kafka[201:203].yinzhengjie.com [root@kafka201.yinzhengjie.com ~]#
[root@kafka201.yinzhengjie.com ~]# ansible kafka -m shell -a 'ln -sv /yinzhengjie/softwares/jdk1.8.0_201/bin/jps /usr/local/bin/jps' [WARNING]: Consider using the file module with state=link rather than running 'ln'. If you need to use command because file is insufficient you can add 'warn: false' to this command task or set 'command_warnings=False' in ansible.cfg to get rid of this message. kafka201.yinzhengjie.com | CHANGED | rc=0 >> ‘/usr/local/bin/jps’ -> ‘/yinzhengjie/softwares/jdk1.8.0_201/bin/jps’ kafka203.yinzhengjie.com | CHANGED | rc=0 >> ‘/usr/local/bin/jps’ -> ‘/yinzhengjie/softwares/jdk1.8.0_201/bin/jps’ kafka202.yinzhengjie.com | CHANGED | rc=0 >> ‘/usr/local/bin/jps’ -> ‘/yinzhengjie/softwares/jdk1.8.0_201/bin/jps’ [root@kafka201.yinzhengjie.com ~]#
[root@kafka201.yinzhengjie.com ~]# ansible kafka -m shell -a 'jps' kafka202.yinzhengjie.com | CHANGED | rc=0 >> 8577 Kafka 7149 QuorumPeerMain 8973 Jps kafka203.yinzhengjie.com | CHANGED | rc=0 >> 8464 Kafka 7050 QuorumPeerMain 8861 Jps kafka201.yinzhengjie.com | CHANGED | rc=0 >> 10531 Jps 7933 QuorumPeerMain 9727 Kafka [root@kafka201.yinzhengjie.com ~]#
8>.Kafka命令行操作案例
博主推荐阅读: https://www.cnblogs.com/yinzhengjie2020/p/13052883.html