(一)Kafka
Kafka是一个基于zookeeper的高吞吐低延迟的分布式的发布、订阅消息系统【消息队列】。它能实时处理大量的消息数据以满足各种需求。Kafka是中间件的一种。
在java高并发中有 生产者-消费者-仓库模式。为解决生产能力和消费能力不匹配的问题。其中,仓库是为了降低生产者和消费者之间的耦合,而设计的消息中间件。而这里的Kafka 也是中间件的一种。本质解决的问题是一样的。只是Kafka 在解决这个问题中能力很突出。
(一)消息队列
Kafka是消息队列的一种。功能有4点:解耦【消除生产者和消费者的耦合】、异步【生产者和消费者不需要相互等待】、广播【生产者只需要通知将消息发送一次】、消峰【处理任务的高峰,不重要的相关任务可以使用最大承载量异步处理,只需要核心的操作在高峰处理。从而降低高峰期的处理】
1. 传统消息队列的使用场景
同步处理:工作处理链路长了,以前是同步处理。所以处理时间会很长。
异步处理:将核心任务同步处理。关联任务使用消息队列处理。核心业务流程为数据单元的生产者,将数据消息发送到消息队列中,关联任务处理为订阅消息队列的消费者。异步的处理关联操作。
2. 消息队列的两种模式
点对点模式和发布/订阅模式。
(1) 点对点模式
特点:一对一【一个消息对应一个消费者,只能被消费一次】,消费者主动拉取,消息收到后消息清除。
消息生产者生产消息发送到Queue中,消息消费者从Queue中获取消息并消费。消息被消费后,Queue不再存储。所以,消费者不可能消费到已经被消费的数据。
Queue支持多个消费者,但是对一个消息而言,只有一个消费者可以消费。
(2) 发布、订阅模式
特点:一对多【一个消息可以被多个消费者消费。消费者消费数据后不会被清除】
消息生产者发布消息到topic上。同时有多个消费者【订阅过该主题】消费该消息。和点对点的形式不同,发送到topic的消息会被所有订阅者消费。
(二)架构深入
1. 基础架构
2. 架构详解
- 为了方便扩展,并提高吞吐量、一个topic分为多个partition。
- 配合分区的设计,提出了消费者组的概念。组内每个消费者并行消费。
- 提高可用性,为每个partition增加若干副本。类似NameNode HA
- Producer:生产者。就是向Kafka发消息的客户端。
- Consumer:消费者。就是向Kafka取消息的客户端。
- Consumer Group:GC,消费者组,是有多个消费者组成的。消费者组内每个消费者负责消费不同分区的数据。一个分区的数据对于同一个消费者组而言,只能由一个消费者消费。消费者组之间互不影响。所有的消费者读属于某个消费者组。即消费者组是逻辑上的一个订阅者。
- Broker:一台Kafka的机器就是也broker,一个集群就是由多个Broker组成。一个broker可以容纳多个topic。
- Topic:可以理解为一个队列。生产者和消费者都是面向一个topic的
- Partition:为了实现扩展性,一个topic可以分布在多个broker上。一个topic可以分为多个partition。每个partition都是一个有序的队列。
- Replica:副本,为了保证集群中某个节点发生故障时,该节点上的partition数据不会缺失,Kafka仍然能继续工作。Kafka提供了副本机制。一个topic的每个分区都有若干个分区。一个leader和若干个follower。
- Leader:每个分区多个副本的”主”,生产者发送数据的对象和消费者消费数据的对象都是leader。
- Follower:每个翻去多个副本中的”从”,实时的从leader中同步数据。Leader发生故障时,某个follower会成为新的leader。
(三)数据存储机制
Topic是逻辑概念。Partition是物理概念、每个partition都对应一个文件。该log文件存储producer生产的逻辑数据。Producer生产出数据会不断追加到log的末端。且每行数据都有自己的offset【offset是分区中的偏移量】。消费者组中的消费者都会实时记录自己消费到哪个offset,以便出错恢复,接着上次消费的顺序继续消费。
(1) Topic的存储结构
一个topic有多个分区,每个分区在不同的broker上。每个分区中使用log存储数据,
生产者不断添加数据到log数据的尾部,为了防止log文件过大而造成定位效率低,所以Kafka采用分片和索引的机制。把每个log分为多个segment。每个segment对应两个文件,分别是.log文件和.index文件。这些文件位于一个文件夹下。文件夹的命名规则为:topic名称+分区序号
(2) Log和index的文件详解
Index中存储的是索引消息。Log文件中存储的是数据。索引文件中的元素指向对应数据文件中message的偏移量。
在索引中的查找使用二分法查找。因为所以文件结构统一,所以查询速度比较快。
(四)生产者
1. 分区策略
① 分区原因
1) 方便在集群中扩展【通过增加broker,可以增加分区数,从而扩展Kafka的容量】。
2) 可以提高并发。【因为是以partition为单位进行读写】
② 分区的原则
Producer将需要发送的数据封装成producer Record。
1) 指明partition的情况下,可以直接将指明的partition作为partition的值。
2) 没有指明partition但有Key的情况下,partition=key的hash值%topic的partition。
3) 既没有指明partition又没有key值的情况下。第一次调用随机产生一个整数【以后每次调用在这个数上加一】。然后将这个数%topic的partition。算法称之为round-robin算法。
2. 数据可靠性保证
为了保证producer发送数据,能可靠的发送到指定的topic上。Topic的每个分区收到producer发送的数据后,都会给producer发送ack【acknowledgement确认收到】如果收到ack,就会进行下一轮发送。否则,重新发送。
① 副本数据同步策略
方案 |
优点 |
缺点 |
半数以上完成同步,就发送ack |
延迟低 |
选举新的leader时,容忍n台节点的故障,需要2n+1个副本 |
全部完成同步,才发送ack |
选举新的leader时,容忍n台节点的故障,需要n+1个副本 |
延迟高 |
Kafka选择了第二种方案,原则如下:
- 同样容忍N台机子的节点故障。第一个方案需要2n+1个副本,第二个方案只需要n个副本。因为Kafka分区文件存储大量的数据。第一种方案会造成大量的数据冗余。
- 第二种方案有网络延迟,但是对Kafka的影响还是比较小的。
② ISR
Leader维护一个动态的ISR集合。存储的是和leader保持同步的follower集合。当ISR中的follower完成数据同步之后,leader会向生产者发送ACK。如果follower长时间未完成同步,则暂时会被踢出ISR,该事件阈值有参数replica.lag.time.max.ms设置。Leader发生故障后,会从ISR队列中选出新的leader。
个人理解ISR。实时和leader数据保持同步的follower集合。如果leader发生故障,也是从ISR中选出leader,避免了选中的leader结果数据还不是最完整的。
(五)消费者
一个主题从生产和消费的角度来看,对应一个生产者和多个订阅者。生产者将数据发布push到Kafka的一个主题上,根据主题的分区,具体会push到一个确定的分区。而每个订阅了当前主题的订阅者。在生产者将数据发布到主题上,订阅者中的一个消费者会去消费数据。只能是订阅者中的一个消费者得到数据。而此处的订阅者相当于消费者组CG。
1. 消费方式
(1) 两种消费方式比较
Consumer采用pull模式从broker中读取数据。【设想采用push模式,由broker推给消费者,这中方式的缺点在于,不能适应不同的消费速度的消费者。因为推的速度是有broker决定的】。Pull模式是按照consumer的消费速度消费数据。
(2) pull方式的缺点及解决思路
Pull模式的缺点是,如果没有数据,消费者陷入循环,会一直返回空数据。【解决的思路:Kafka在读数据的时候传入一个参数 timeout,如果没有数据可消费,Consumer会等待一段时间后再返回】
2. 分区分配策略
一个主题有多个分区,一个消费者组有多个consumer。所以涉及到partition的分配的问题。即哪个partition由哪个consumer消费。分配策略有两种,是round robin 和 range
(1) Round robin
(2) Range
3. offset维护
由于consumer消费的过程中可能存在断电宕机的情况。在恢复之后需要从故障前的位置继续消费。所以,consumer需要实时记录自己消费到哪个offset。以便故障恢复后继续消费。
Kafka在0.9版本之前,offset是保存在zookeeper中的。从0.9版本之后,Kafka将offset保存在内置的一个topic中。该topic为_consumer_offsets
(六)Kafka高效读写
Kafka中生产者写数据速度很快。主要因为以下几个原因。
1. 生产者写文件高效---顺写磁盘
Producer生产数据写入到log文件中,写的过程一直在末尾追加。为顺写。【官网介绍:同样的写文件,顺写速度能到600M/s,随机写智能到100K/s】顺写省略了大量的磁头寻址的时间。
2. 消费者读数据高效---零复制技术
(1) CPU的两种运行模式
- Kernel 内核模式
内核模式下,代码具有对硬件所有的控制权。可以执行所有的CPU指令。可以访问任何地址的内存。内核模式为操作系统的最底层。最可信的数据服务。内核模式下任何异常都是灾难性的。
- 用户模式
用户模式下,代码没有直接对硬件的控制权。也不能直接访问内存。一般通过系统接口system API来访问硬件和内存。这种模式下。程序奔溃是可恢复的。
(2) 消费者消费消息数据时,采用零拷贝的技术,直接从Kafka的broker节点上kernel模式中直接拷贝数据。所以,效率很高。
(七)Zookeeper在Kafka中的作用
Kafka集群中有一个Broker被选中为controller,负责管理集群Broker的上下线。所有topic的分区副本 的分配和leader的选举。
Controller的管理工作都依赖zookeeper的。
以下是partition的leader的选举过程。
(八)命令行操作
Topic 命令
1. 创建topic
bin/kafka-topics.sh --zookeeper hadoop102:2181
--create --replication-factor 3 --partitions 1 --topic first
选项说明:
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
2. 查询topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
3. 删除topic
bin/kafka-topics.sh --zookeeper hadoop102:2181
--delete --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除。
4. Topic修改分区数
kafka]$bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
5. 查看topic
bin/kafka-topics.sh --zookeeper hadoop102:2181
--describe --topic first
生产者发送消息
6. producer发送消息
bin/kafka-console-producer.sh
--broker-list hadoop102:9092 --topic first
>hello world
>atguigu atguigu
7. consumer消费消息
bin/kafka-console-consumer.sh
--bootstrap-server hadoop102:9092 --from-beginning --topic first
(九)拦截器Interceptor
- 拦截器原理
Kafka的拦截器实在Kafka的1.0版本之后引入的。目的是为了客户端client定制化控制逻辑。
对producer而言,使得用户在消息发送之前或者producer回调逻辑【消息的发送状态、所在分区和偏移量等信息】之前有机会对消息做一些定制化需求,比如:修改消息等。同时producer允许用户指定多个interceptor按照一定的顺序作用同一条信息,从而形成拦截链。
Interceptor实现接口org.apache.kafka.clients.producer.ProducerInterceptor。其中实现的方法有4个,分别是configure(configs)、onSend(ProducerRecord)、onAcknowledgement(RecordMetadata, Exception)和close。
(1) configure(configs)
获取生产者的配置信息和初始化数据时使用。其中configs为Map,可以通过get方法获得需要的配置信息。
(2) onSend(ProducerRecord)
该方法在消息序列化之前使用。并传递要发送的消息记录。用户可以在该方法中对消息记录进行任意的修改。包括消息的Key和value以及要发送的topic和分区等。
ProducerRecord 封装了topic、key、value。其中topic指定消息的分区。Key会计算出消息的分区。知道这条消息应该保存在主题topic的哪个分区上。
(3) onAcknowledgement(RecordMetadata, Exception)
该方法是发送消息之后异步得到的结果确认。
该方法在发送给服务器的消息已经被确认或者记录发送失败的情况下调用【生产者回调逻辑触发之前调用】,可以通过metadata中获取消息的主题、分区以及偏移量。或者在Exception获取消息异常的信息。
(4) Close
拦截器关闭的时候调用。
(十)Flume对接Kafka
(十一)面试常见的问题
1. Kafka中的ISR和AR分别代表什么?
ISR:和leader保持数据同步的follower的列表。
AR:代表分区的所有副本。
2. Kafka中的HW和LEO分别代表什么?
HW:一个分区中所有副本的最小offset。
LEO:没有副本的最后条消息的Offset。
3. Kafka怎么体现消息的顺序性?
Kafka中每个分区的数据都有offset代表在分区中的顺序,所以,Kafka只能在分区中有序,整体不能保证有序。
4. Kafka中有拦截器、序列化器和分区器,他们的作用及顺序?
拦截器是producer发送消息数据之前或者在获得确认结果之后调用拦截去做的一些定制化的处理。
序列化器是对消息进行序列化。
分区器是将消息分区,确定将消息保存在topic的哪个分区中。
他们的作用顺序是:拦截器》序列化器》分区器
5. “消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?
正确【根据消费者的分区分配策略得知、不论是round robin还是range模式,如果分区数小于消费者组中的消费者,就会有consumer分不到topic的分区】
6. 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
Offset+1
7. Kafka有内部的topic吗?如果有是什么?有什么所用?
有,_consumer_offsets,保存消费者的offset
8. Kafka的分区分配是什么概念?
Kafka中的一个topic有多个分区,一个消费者组有多个消费者。所以需要将topic的分区分配给消费者组内的消费者。称之为分区分配。其中,分区分配的策略有两种,round robin和range。
9. Kafka的日志(数据存储)文件的结构?
分区数据中,log数据根据设置大小阈值分为不同的segment,每个segment分为.log和.index文件。其中.log文件中保存真是的消息数据。.index文件中保存消息的offset和消息的偏移量。
10. 如果我指定了一个offset,Kafka Controller怎么查找到对应的消息?
因为index文件中的数据是统一结构的数据。所以很容易使用二分法获取到指定的offset在index中的数据。从数据单元中能得到在log文件中起始的偏移量和消息大小,可以去log文件中得到数据。
11. Kafka Controller的作用?
负责broker的上下线和topic分区副本分配和leader选举。【controller也是zk在broker中选举产生的】
12. Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
partition leader(ISR),controller(先到先得)
13. 失效副本是指什么?有那些应对措施?
失效副本【长时间不能同步leader的数据】,暂时踢出ISR,等追上leader之后,再加入ISR。
14. Kafka那些设计rang性能比较高?
分区、顺序写和零拷贝
分区:消费者可以并行的消费消息数据。
顺序写:生产者写入log数据的时候,采用顺序写,直接追加到log文件的最后,相比随机写提高很多效率。
零拷贝:消费者消费消息的时候,log数据直接加载到kernel中,然后消费者从kernel内存中得到数据。而不通过用户内存得到。提高读效率。