Kafka架构概述
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
kafka是一款基于发布与订阅的消息系统。它一般被称为"分布式提交日志"或者"分布式流平台"。
一.消息队列(Message Queue)概述
在正式讨论Apache kafka之前,先来了解消息队列的概念,并认识这个系统的重要性。
数据(消息)的发送者(发布者)不会直接把消息发送给接收者,这是发布于订阅消息系统的一个特点。发布者以某种方式对消息进行分类,接收者(订阅者)订阅他们,以便接收特定数据的消息。发布与订阅一般会有一个broker,也就是发布消息的中心点。
消息队列(Message Queue)的应用场景:
(1)异步处理;
(2)消峰;
(3)解耦;
消息队列的两种模式:
(1)点对点模式
即一对一,消费者主动来去数据,消费者收到消息后,broker的消息自动清除。
消息生产者生产消息发送到队列(Queue)中,然后消息消费者从Queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
(2)发布/订阅模式
即一对多,消费者消费数据之后不会清楚消息。
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
二.为什么选择kafka
1>.多个生产者
Kafka可以无缝地支持多个生产者,不管客户段在使用单个topic还是多个topic。所以它很适合用来从多个前端系统收集数据并以统一的格式对外提供数据。
例如,一个包含了多个微服务的网站,可以为页面视图创建一个单独的主题(topic),所有服务都以相同的消息格式向该主题写入数据。消费者应用程序会获得统一的页面视图,而无需协调来自不同生产者的数据流。
2>.多个消费者
除了支持多个生产者外,kafka也支持多个消费者从一个单独的消息流上读取数据,而且消费者之间互不影响。这与其它消息队列系统(点对点)不同,其它队列的消息一旦被一个客户端读取,其它客户端就无法在读取它。
另外,多个消费者可以组成一个群组,他们共享一个消息流,并保证整个群组对每个给定的消息只处理一次。
3>.基于磁盘的数据存储
Kafka不仅支持多个消费者,还允许消费者非实时地读取消息,这要归功于kafka的数据保留特性。消息被提交到磁盘,根据设置的保留规则进行保存。
每个topic可以设置单独的保留规则,以便满足不同消费者的需求,各个topic可以保留不同数量的消息。
消费者可能会因为处理速度慢或突发的流量高峰导致无法及时读取消息,而持久化数据可以保证数据不会丢失。
消费者可以在进行应用重新维护时离开一小段时间,而无需担心消息丢失或堵塞在生产者端。
消费者可以被关闭,但消息会继续保留在kafka里。消费者可以从上次中断的地方继续处理消息。
4>.伸缩性
为了能够轻松处理大量数据,kafka从一开始就被设计成了一个具有灵活伸缩性的系统。
用户在开发阶段可以先使用单个broker,再扩展到包含3各broker的小型开发集群,然后随着数据量不断增长,部署到生产环境的集群可能包含上百个broker。对在线集群进行扩展丝毫不影响整体系统的可用性。
也就是说,一个包含多个broker集群,即使有个别broker失效,仍然可以继续为客户提供服务。要提高集群的容错能力,需要配置较高的复制系数。
5>.高性能
上面提到的所有特性让kafka成为了一个高性能的发布与订阅消息系统。
通过横向扩展生产者,消费者和broker,kafka可以轻松处理巨大的消息流。
在处理大量数据的同时,它还能保证压秒级的消息延迟。
6>.kafka的使用场景
Kafka为数据生态系统带来了循环系统,它在基础设施的各个组件之间传递消息,为所有客户端提供一致的接口。
当与提供消息模式的系统集成时,生产者和消费者之间不再有紧密的耦合,也不需要在他们之间建立任何类型的直连。我们可以根据业务需要添加活移除组件,因为生产者不在关心谁在使用数据,也不关心有多少各消费者。
Kafka的使用场景如下:
(1)活动跟踪
(2)传递消息
(3)度量指标和日志记录
(4)提交日志
(5)流处理
三.kafka的工作流程及文件存储机制
1>.Kafka相关术语
Kafka架构特色
(1)为了方便扩展,并提高吞吐量,一个topic分为多个partition;
(2)配合分区的设计,提出消费者组的概念,组内每个消费者并行消费;
(3)为提高可用性,为每个partition增加若干个副本;
Kafka相关术语:
Producer:
消息生产者,就是向kafka broker发消息的客户端; Consumer:
消息消费者,向kafka broker取消息的客户端; Consumer Group(CG):
消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。 Broker:
一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。 Topic:
可以理解为一个队列,生产者和消费者面向的都是一个topic; Partition:
为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列; Replica:
副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。 leader:
每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。 follower:
每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的follower。
2>.kafka工作流程概述
Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。Producer生产的数据会被不断追加到该log文件末端。
每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。 温馨提示: 下图引用自:"https://www.cnblogs.com/kevingrace/p/9443270.html"
3>.文件存储机制
由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment对应"*.index"和"*.log"这两个文件,如下图所示。
index和log文件以当前segment的第一条消息的offset命名。"*.index"文件存储大量的索引信息,"*.log"文件存储大量的数据,索引文件中的元数据指向对应数据文件中message的物理偏移地址。
温馨提示:
如果基于offset找到message呢?其大致定位数据流程如下:
(1)基于offset先定位在哪个segment中,这个可以基于"*.index"的文件名来判断;
(2)其次在"*.index"中找到offset对应的物理文件的偏移量;
(3)根据偏移量读取"*.log"指定的数据进行消费;
四.kafka生产者
1>.分区策略
分区的原因: (1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了; (2)可以提高并发,因为可以基于Partition为单位读写了;
分区的原则(我们需要将producer发送的数据封装成一个ProducerRecord对象): (1)指明partition(一般是一个Integer类型)的情况下,直接将指明的值直接作为partiton值; (2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值; (3)partition和key均没有指定的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin算法。
2>.数据可靠性保证
为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。 副本数据同步策略: (1)半数以上完成同步(防止脑裂,即spilt brain),就发送ack 优点: 延迟低,自动过滤掉性能较低的机器 确定: 选举新的leader时,容忍n台节点的故障,需要2n+1个副本 (2)全部完成同步,才发送ack 优点: 选举新的leader时,容忍n台节点的故障,需要n+1个副本 缺点: 延迟高,需要等待性能最低的机器响应 kafka官方选择了第二种方案,原因如下: (1)同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余; (2)虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小; ISR: 综上所述,采用第二种方案之后,设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢? (1)Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合; (2)当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定; (3)Leader发生故障之后,就会从ISR中选举新的leader; 温馨提示: (1)一个partition可以配置N个replica,那么这是否就意味着对副本因子为N的topic,可容忍最多N-1个服务器失效,不会丢失任何已提交到broker的记录呢? 答案是:“否”!(网上有资料说是可以保证数据不丢失,如果在min.insync.replicas使用默认值为1且unclean.leader.election.enable的值为true时,可能会导致数据丢失,而Producer的acks设置的为1时,也就是当leader的parition写入成功就认为数据是写入成功的,我们知道follower是有可能和leader节点不一致的!因此当leader节点挂掉的话,此时的数据就丢失啦!) (2)副本数对Kafka的吞吐率是有一定的影响,但极大的增强了可用性。默认情况下Kafka的replica数量为1,即每个partition都有一个唯一的leader,为了确保消息的可靠性,通常应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1,比如3。 (3)所有的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟(早期kafka版本包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 从kafka 0.10.x版本开始只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower踢出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。相反的,当这些replicas重新“追上”了leader的进度时,那么Kafka会将他们加回到ISR中。这一切都是自动维护的,不需要用户进行人为干预,因而在保证了消息交付语义的同时,还简化了用户的操作成本。 ACK应答机制: 对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的acks参数配置。 0: producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据; 1: producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据; -1(all): producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。 故障处理细节: (1)follower故障 follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。 (2)leader故障 leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。 需要注意的是,这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。 HW和LEO概述: HW: 如下图所示,High watermark(高水位线)以下简称HW,表示消息被commit的部分,leader(和followers,看情况)确认写入本地log并返回ack后这些数据才被认为是committed。 kafka的参数min.insync.replicas可以控制多少个副本ack后才算commit,默认为1,即只需要leader写成功该段log就committed,此时ISR可能最小只有1,如果设为n,则表示leader和n-1个followers都确认写入才算committed),也就是可以被消费,所以在HW位置以下的消息都可以被消费。 在min.insync.replicas大于1时,又设置了unclean.leader.election.enable=true,则会有一个非ISR内的follower成为新的leader,此时HW会缩小,因此consumer的lag值可能出现负数!这些配置在Apache Kafka 0.10都是默认值,因此很可能出现) LEO: Log end offset(日志结束位置)以下简称LEO,表示消息的最后位置。 当LEO>=HW时,一般会有没提交(uncommitted)的部分,除了ISR中最慢的follower,事实上HW=min(ISR中各副本的LEO)。uncommitted部分的消息,需要等待如何向producer进行ack,对应的请求会停留在一块称为purgatory(炼狱)的区域中。具体怎么ack,取决于producer的acks设置: 如果设置为0,则producer发出消息后就不等待ack,是at most once的一致性; 如果设为1(默认值),则只要leader的log成功写入请求就返回了,但如果在ack发送给producer前leader就挂了(或丢包了),producer仍然收不到ack,会对请求进行重发,此时数据就有可能重,是at least once的一致性。但在leader挂掉时,leader有但未被followers同步的数据又无法同步,会丢一部分数据; 如果设置为all或-1,则需要所有ISR中的followers确认写入后请求才返回(注意,如果允许ISR最少只有1个成员,则和acks=1又没有实质区别了!),也是at least once的一致性,且基本不存在丢数据的可能; 温馨提示: 对于producer来说,保证数据不丢失需要满足以下条件:min.insync.replicas至少设为2(保证可用性)、unclean.leader.election.enable设为false(保证数据不丢失)、producer的acks设为all(保证所有broker成功写入数据,但会增大延迟)。但这种设置会对kafka的吞吐率、故障恢复时间造成巨大影响,究竟要不要保证这么强的一致性,就需要你评估了。
3>.Exactly Once语义
对于某些比较重要的消息,我们需要保证exactly once语义,即保证每条消息被发送且仅被发送一次。
在0.11版本之后,Kafka引入了幂等性机制(idempotent,本质上是"生产者id + 消息id"组成,当有重复的数据发送到broker时就不会再次写入),配合acks = -1时的at least once语义,实现了producer到broker的exactly once语义。
idempotent + at least once = exactly once
使用时,只需将enable.idempotence属性设置为true,kafka自动将acks属性设为-1。
温馨提示:
At Least Once:
最少一次,可能会重复数据
At Most Onec:
表示最多一次,可能会丢失数据
Exactly Onec:
表示有且仅有一次,实现起来比较费劲,kafka在0.11版本之后引入幂等性机制配合acks=-1时才实现的。
五.kafka消费者
1>.消费方式
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
pull模式则可以根据consumer的消费能力以适当的速率消费消息。pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
2>.分区分配策略
一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。
Kafka有两种分配策略,一是roundrobin,一是range。
温馨提示:
roundrobin:
同一个消费者组的消费者分区数是尽量均匀的。
例如一个topic有50个partition,此时一个消费者组有5个消费者,那么基于轮询分配策略每个消费者组会分配10个parition。
range:
默认的分配策略,可能存在数据不均衡的情况,即会导致个别消费者会比其它消费者多partition数量。
例如一个topic有50个parition,可能前2个分配了20个parition,第3个分配了10个partition,而剩下的2个partition可能连一个partition数量都分配不到的情况。
3>.offset的维护
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。
六.kafka高效读取数据
1>.顺序写磁盘
Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。
官网有数据表明,同样的磁盘,顺序写能到到600M/s,而随机写只有100k/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
2>.零拷贝技术
数据不用从内核空间(kernel space)拷贝到用户空间(user space),再由用户空间拷贝到内核空间,而是数据直接在内核空间进行处理。
七.zookeeper在kafka中的作用
Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。
Controller的管理工作都是依赖于Zookeeper的。所有的broker启动后均会在zookeeper集群进行注册,先抢占创建临时znode的broker称为controller,当controller节点下线时该临时znode也会自动删除,此时其它的broker会watch到该事件,并再一次创建临时znode,产生新的controller节点。
以下为partition的leader选举过程:
(1)broker集群启动时会在zookeeper的"/brokers/ids"下进行注册,即创建相应的临时znode;
(2)kafka在zookeeper对应的chroot目录(我集群搭建时将kafka的数据存储在"/yinzhengjie-kafka")写入leader角色(通过命令"get /yinzhengjie-kafka/brokers/topics/yinzhengjie-kafka/partitions/0/state"查看)
(3)当某个parition所在的broker节点挂掉后,zookeeper会通知kafkaController;
(4)kafkaController选取新的parition的leader(需要注意的是,zookeeper保存的元数据信息其实所有的broker均有一份哟~);
(5)选举出新的leader后会更新zookeeper中的leader及ISR
[root@kafka201.yinzhengjie.com ~]# zkCli.sh -server kafka201.yinzhengjie.com Connecting to kafka201.yinzhengjie.com 2020-06-16 14:30:17,744 [myid:] - INFO [main:Environment@98] - Client environment:zookeeper.version=3.6.1--104dcb3e3fb464b30c5186d229e00af9f332524b, built on 04/21/2020 15:01 GMT 2020-06-16 14:30:17,747 [myid:] - INFO [main:Environment@98] - Client environment:host.name=kafka201.yinzhengjie.com 2020-06-16 14:30:17,748 [myid:] - INFO [main:Environment@98] - Client environment:java.version=1.8.0_201 2020-06-16 14:30:17,749 [myid:] - INFO [main:Environment@98] - Client environment:java.vendor=Oracle Corporation 2020-06-16 14:30:17,749 [myid:] - INFO [main:Environment@98] - Client environment:java.home=/yinzhengjie/softwares/jdk1.8.0_201/jre 2020-06-16 14:30:17,749 [myid:] - INFO [main:Environment@98] - Client environment:java.class.path=/yinzhengjie/softwares/zookeeper/bin/../zookeeper-server/target/classes:/yinzhengjie/softw ares/zookeeper/bin/../build/classes:/yinzhengjie/softwares/zookeeper/bin/../zookeeper-server/target/lib/*.jar:/yinzhengjie/softwares/zookeeper/bin/../build/lib/*.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/zookeeper-prometheus-metrics-3.6.1.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/zookeeper-jute-3.6.1.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/zookeeper-3.6.1.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/snappy-java-1.1.7.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/slf4j-log4j12-1.7.25.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/slf4j-api-1.7.25.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/simpleclient_servlet-0.6.0.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/simpleclient_hotspot-0.6.0.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/simpleclient_common-0.6.0.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/simpleclient-0.6.0.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/netty-transport-native-unix-common-4.1.48.Final.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/netty-transport-native-epoll-4.1.48.Final.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/netty-transport-4.1.48.Final.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/netty-resolver-4.1.48.Final.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/netty-handler-4.1.48.Final.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/netty-common-4.1.48.Final.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/netty-codec-4.1.48.Final.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/netty-buffer-4.1.48.Final.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/metrics-core-3.2.5.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/log4j-1.2.17.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/json-simple-1.1.1.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/jline-2.11.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/jetty-util-9.4.24.v20191120.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/jetty-servlet-9.4.24.v20191120.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/jetty-server-9.4.24.v20191120.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/jetty-security-9.4.24.v20191120.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/jetty-io-9.4.24.v20191120.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/jetty-http-9.4.24.v20191120.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/javax.servlet-api-3.1.0.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/jackson-databind-2.10.3.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/jackson-core-2.10.3.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/jackson-annotations-2.10.3.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/commons-lang-2.6.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/commons-cli-1.2.jar:/yinzhengjie/softwares/zookeeper/bin/../lib/audience-annotations-0.5.0.jar:/yinzhengjie/softwares/zookeeper/bin/../zookeeper-*.jar:/yinzhengjie/softwares/zookeeper/bin/../zookeeper-server/src/main/resources/lib/*.jar:/yinzhengjie/softwares/zookeeper/bin/../conf:2020-06-16 14:30:17,750 [myid:] - INFO [main:Environment@98] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2020-06-16 14:30:17,750 [myid:] - INFO [main:Environment@98] - Client environment:java.io.tmpdir=/tmp 2020-06-16 14:30:17,750 [myid:] - INFO [main:Environment@98] - Client environment:java.compiler=<NA> 2020-06-16 14:30:17,750 [myid:] - INFO [main:Environment@98] - Client environment:os.name=Linux 2020-06-16 14:30:17,750 [myid:] - INFO [main:Environment@98] - Client environment:os.arch=amd64 2020-06-16 14:30:17,750 [myid:] - INFO [main:Environment@98] - Client environment:os.version=3.10.0-957.el7.x86_64 2020-06-16 14:30:17,750 [myid:] - INFO [main:Environment@98] - Client environment:user.name=root 2020-06-16 14:30:17,750 [myid:] - INFO [main:Environment@98] - Client environment:user.home=/root 2020-06-16 14:30:17,750 [myid:] - INFO [main:Environment@98] - Client environment:user.dir=/root 2020-06-16 14:30:17,750 [myid:] - INFO [main:Environment@98] - Client environment:os.memory.free=480MB 2020-06-16 14:30:17,752 [myid:] - INFO [main:Environment@98] - Client environment:os.memory.max=491MB 2020-06-16 14:30:17,752 [myid:] - INFO [main:Environment@98] - Client environment:os.memory.total=491MB 2020-06-16 14:30:17,759 [myid:] - INFO [main:ZooKeeper@1005] - Initiating client connection, connectString=kafka201.yinzhengjie.com sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKee perMain$MyWatcher@379619aa2020-06-16 14:30:17,762 [myid:] - INFO [main:X509Util@77] - Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation 2020-06-16 14:30:17,768 [myid:] - INFO [main:ClientCnxnSocket@239] - jute.maxbuffer value is 1048575 Bytes 2020-06-16 14:30:17,775 [myid:] - INFO [main:ClientCnxn@1703] - zookeeper.request.timeout value is 0. feature enabled=false Welcome to ZooKeeper! 2020-06-16 14:30:17,780 [myid:kafka201.yinzhengjie.com:2181] - INFO [main-SendThread(kafka201.yinzhengjie.com:2181):ClientCnxn$SendThread@1154] - Opening socket connection to server kafka2 01.yinzhengjie.com/172.200.4.201:2181.2020-06-16 14:30:17,780 [myid:kafka201.yinzhengjie.com:2181] - INFO [main-SendThread(kafka201.yinzhengjie.com:2181):ClientCnxn$SendThread@1156] - SASL config status: Will not attempt to au thenticate using SASL (unknown error)JLine support is enabled 2020-06-16 14:30:17,851 [myid:kafka201.yinzhengjie.com:2181] - INFO [main-SendThread(kafka201.yinzhengjie.com:2181):ClientCnxn$SendThread@986] - Socket connection established, initiating s ession, client: /172.200.4.201:62453, server: kafka201.yinzhengjie.com/172.200.4.201:21812020-06-16 14:30:17,872 [myid:kafka201.yinzhengjie.com:2181] - INFO [main-SendThread(kafka201.yinzhengjie.com:2181):ClientCnxn$SendThread@1420] - Session establishment complete on server k afka201.yinzhengjie.com/172.200.4.201:2181, session id = 0xc900033eddbb0000, negotiated timeout = 30000 WATCHER:: WatchedEvent state:SyncConnected type:None path:null [zk: kafka201.yinzhengjie.com(CONNECTED) 0] ls /yinzhengjie-kafka [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification] [zk: kafka201.yinzhengjie.com(CONNECTED) 1] [zk: kafka201.yinzhengjie.com(CONNECTED) 1] ls /yinzhengjie-kafka [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification] [zk: kafka201.yinzhengjie.com(CONNECTED) 2] [zk: kafka201.yinzhengjie.com(CONNECTED) 2] [zk: kafka201.yinzhengjie.com(CONNECTED) 2] ls /yinzhengjie-kafka/brokers [ids, seqid, topics] [zk: kafka201.yinzhengjie.com(CONNECTED) 3] [zk: kafka201.yinzhengjie.com(CONNECTED) 3] ls /yinzhengjie-kafka/brokers/topics [__consumer_offsets, yinzhengjie-kafka] [zk: kafka201.yinzhengjie.com(CONNECTED) 4] [zk: kafka201.yinzhengjie.com(CONNECTED) 4] [zk: kafka201.yinzhengjie.com(CONNECTED) 4] ls /yinzhengjie-kafka/brokers/topics [__consumer_offsets, yinzhengjie-kafka] [zk: kafka201.yinzhengjie.com(CONNECTED) 5] [zk: kafka201.yinzhengjie.com(CONNECTED) 5] [zk: kafka201.yinzhengjie.com(CONNECTED) 5] ls /yinzhengjie-kafka/brokers/topics/yinzhengjie-kafka [partitions] [zk: kafka201.yinzhengjie.com(CONNECTED) 6] [zk: kafka201.yinzhengjie.com(CONNECTED) 6] ls /yinzhengjie-kafka/brokers/topics/yinzhengjie-kafka/partitions [0, 1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 2, 3, 4, 5, 6, 7, 8, 9] [zk: kafka201.yinzhengjie.com(CONNECTED) 7] ls /yinzhengjie-kafka/brokers/topics/yinzhengjie-kafka/partitions/0 [state] [zk: kafka201.yinzhengjie.com(CONNECTED) 8] [zk: kafka201.yinzhengjie.com(CONNECTED) 8] ls /yinzhengjie-kafka/brokers/topics/yinzhengjie-kafka/partitions/0/state [] [zk: kafka201.yinzhengjie.com(CONNECTED) 9] [zk: kafka201.yinzhengjie.com(CONNECTED) 9] get /yinzhengjie-kafka/brokers/topics/yinzhengjie-kafka/partitions/0/state {"controller_epoch":8,"leader":202,"version":1,"leader_epoch":0,"isr":[202,201]} [zk: kafka201.yinzhengjie.com(CONNECTED) 10] [zk: kafka201.yinzhengjie.com(CONNECTED) 10] [zk: kafka201.yinzhengjie.com(CONNECTED) 10]
八.博主推荐阅读
Kafka部署实战案例: https://www.cnblogs.com/yinzhengjie2020/p/13046902.html Kafka命令行操作案例: https://www.cnblogs.com/yinzhengjie2020/p/13052883.html Kafka的API实战案例: https://www.cnblogs.com/yinzhengjie2020/p/13057627.html