整体架构
生产过程由两个线程协调运行,分别为主线程和sender线程(发送线程)。
主线程中,由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用,缓存消息到消息加载器(RecordAccumulator,也称为消息收集器)中,Sender线程负责从消息加载器(RecordAccumulator)中获取消息并将其发送到Kafka中。
消息加载器
消息加载器(RecordAccumulator)主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
消息加载器(RecordAccumulator)缓存的大小可以通过生产者参数buffer.memory配置,默认值为33444432b,即32mb。
如果生产者发送消息的速度大于发送到服务器的速度,也就是RecordAccumulator缓存不够,此时kafkaproducer的send方法调用要被被阻塞,要么抛出异常,这个取决于参数max.block.ms参数,此参数的默认值为60000ms,60s。
主线程发送过来的消息会被追加到RecordAccumulator的某个双端队列中,在RecordAccumulator内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque
注意!ProducerBath不是ProducerRecord,ProducerBatch中可以包含一个或多个ProducerRecord。通俗的说ProducerRecord是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProdicerBatch中。较小的ProducerRecord拼凑成一个较大的PeoducerBatch,可以减少网络请求的次数提升整体的吞吐量。
消息在网络上以字节的形式传输,在发送之前需要创建一块内存区域来保存对应消息,在kafka生产端配置中,使用java.io.ByteBuffer来实现消息内存的创建和释放。频繁的创建和释放是消耗资源的,在RecordAccumulator内部还有一个BufferPool,主要用来实现ByteBuffer的复用,实现缓存的高效利用。而BufferPool只对特定大小的ByteBuffer进行管理,其他大小的ByteBuffer不会缓存进BufferPool中,我们可以通过调整batch.size参数,以便多缓存消息。
ProducerBatch大小和batch.size参数也有密切联系。当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区相对应的双端队列(没有则新建),查看Producer中是否还可以写入这个ProducerRrcord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size的大小,如果不超过,就以batch.size的大小创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,就以评估大小来创建ProducerBatch,这段内存区域不会被复用,
Sender从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque
在转换成<Node,List
请求在从sender发往Kafka之前还会保存到InFlightRequest中,InFlightRequest保存对象的具体形式为Map<Nodeid,deque
相关参数:
batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。
linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据
生产者客户端可靠性保证
- 1、(可选,在效率和可靠性之间进行取舍,配置参数acks)为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
- 2、设想以下情景:leader 收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,直到它完成同步,才能发送 ack。这个问题怎么解决呢?
Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower长时间未 向 leader 同 步 数 据 , 则 该 follower 将 被 踢 出 ISR , 该 时 间 阈 值 由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。 - 3、ack 应答机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡。
acks 参数配置:
0:producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
1:producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据;
-1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复。 - 4、 Exactly Once 语义
将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义。相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次,即 At Most Once 语义。 At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的,At Most Once
可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义。
在 0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。幂等性结合 At Least Once 语义,就构成了 Kafka 的 Exactly Once 语义。即:
At Least Once + 幂等性 = Exactly Once
要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。
但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。 - 5、 Kafka 事务
Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID 获得原来的 PID。
> 为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。Transaction Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。