zoukankan      html  css  js  c++  java
  • kafka核心技术与实战

    消息引擎系统

    定义

    消息引擎系统是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递。

    消息格式

    纯二进制的字节序列

    传输协议

    点对点模型(电话)、发布订阅模型(报纸)

    好处

    削峰填谷、发送方和接收方松耦合减少不必要的交互

    KAFKA术语

    • 消息(record) — kafka处理的对象
    • 主题(topic) — 发布订阅的对象,承载消息的逻辑容器
    • 生产者(producer) — 向主题发布消息的客户端应用程序
    • 消费者(consumer) — 订阅主题消息的客户端应用程序 
    • 服务器端(broker) — 接收和处理客户端请求,对消息进行持久化
    • 分区(partition) — 一个有序不变的消息序列。一topic多分区,消息只在某一个分区 
    • 备份(replication) — 相同的数据拷贝在多台机器上 
    • 副本(replica) — 相同的数据拷贝,冗余实现高可用,副本是分区维度的副本,分区内配置若干副本,领导者副本对外提供服务,追随者副本不与外界交互
    • 消息位移(offset) — 分区中每条消息的位置信息
    • 消费者位移(consumer offset) — 消费者消费进度
    • 消费者组(consumer group) — 多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐,每个分区都只会被组内一个消费者实例消费
    • 重平衡(rebalance) — 消费者组内新增实例or某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程
    • 生产者->领导者副本<-消费者
                          I
                   追随者副本(异步请求领导者副本获取消息)

    KAFKA版本号

    • 0.7 上古版本,只提供最基础的消息队列功能,没有副本机制
    • 0.8 引入副本机制,老版本客户端api(指定zookeeper地址而不是broker地址)
    • 0.8.2.0 引入新版producer api(不稳定)
    • 0.8.2.2 老版消费者api较为稳定
    • 0.9 新版producer api较为稳定,引入新版consumer api(不稳定)
    • 0.10 引入kafka streams,对消息引擎方面没有太大功能提升
    • 0.10.2.2 新版consumer api较为稳定
    • 0.11 引入提供幂等性producer api和事务api(不稳定),重构消息格式
    • 0.11.0.3 消息引擎功能较为完善
    • 1.0 and 2.0 kafka streams各种改进
    • 公司kafka版本0.10.1.2

    分区

    作用

    1. 提供负载均衡能力,实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,还可以通过添加新的节点机器来增加整体系统的吞吐量 
    1. 实现业务级别的消息顺序问题。将一组特定有序的消息发到同一分区,保证顺序处理。

    分区策略

    轮询策略

    保证消息最大限度地被平均分配到所有分区上,默认情况它是最合理的分区策略。 

    随机策略

    老版本生产者使用的分区策略,实际表现略逊于轮询策略

    消息键保序策略

    同一个key的所有消息都进入到相同的分区。消息指定key,默认实现该策略,未指定key默认实现轮询策略

    可靠性保障

    至少一次(at least once)

    Broker的应答没有成功发送回producer时,producer不重试

    至多一次(at most once)

    Broker的应答没有成功发送回producer时,producer进行重试

    精确一次(exactly once)

    消费者组

    • Consumer Group下可以有一个或多个Consumer实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。 
    • Group ID是一个字符串,在一个Kafka集群中,它标识唯一的一个Consumer Group。 
    • Consumer Group下所有实例订阅的主题的单个分区,只能分配给组内的某个Consumer实例消费。这个分区当然也可以被其他的Group消费。 
    • 消费者位移,老版zookeeper,由于不适合频繁写操作,新版保存在broker的内部主题(位移主题)中。key<Group ID,主题名,分区号> ,value有三种格式(位移值、保存消费者组信息,注册时使用、删除消费者组过期位移甚至是删除消费者组)

    重平衡

    定义

    重平衡(rebalance)规定了一个Consumer Group下的所有Consumer如何达成一致,来分配订阅 Topic的每个分区

    触发原因

    1. 组成员数发生变更。比如有新的Consumer实例加入组或者离开组,或是有Consumer实例崩溃被“踢 出”组。
    1. 订阅主题数发生变更。Consumer Group可以使用正则表达式的方式订阅主题。在运行过程中,新创建了一个满足条件的主题,那么该Group就会发生Rebalance。 
    1. 订阅主题的分区数发生变更。当分区数增加时,就会触发订阅该主题的所有Group开启Rebalance。 

    消费者端流程

    加入组(JoinGroup)

    1. 每个成员将自己订阅的主题上报
    1. 协调者从中选择一个成员担任消费者组的领导者(第一个JoinGroup的消费者)
    1. 协调者把消费者组订阅信息封装进JoinGroup请求的响应体中发给领导者
    1. 领导者做出分配方案

    等待领导者消费者分配方案(SyncGroup)

    1. 领导者向协调者发送SyncGroup请求,将分配方案发送给协调者(其他消费者也会发送该请求,请求体内没有实际内容)
    1. 协调者以SyncGroup响应的方式将分配方案分发给所有消费者成员

    协调者端流程

    新成员入组(Stable状态时,有新成员加入)

    当协调者收到新的JoinGroup请求后,它会通过心跳请求响应的方式通知组内所有成员,强制它们开启新一轮的重平衡

    组成员主动离组(主动调用close)

    当协调者收到LeaveGroup请求,依旧时通过心跳请求响应的方式通知组内所有成员

    组成员崩溃离组

    协调者在session.timeout.ms内未收到成员心跳请求,判定该成员离组,开启重平衡

    重平衡时协调者对组内成员提交位移的处理

    消费者成员会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启rebalance

    影响

    在Rebalance过程中,所有Consumer实例都会停止消费,等待Rebalance完成。几百个consumer时要几个小时。不在原有方案基础上改动,而是全部重新打散分配。

    如何规避

    后两种操作通常是运维的主动操作,不可避免。消费者组下的消费者数量变化时rebalance最常见的原因。
    增加消费者实例一般是计划内的,可能是出于增加tps或者提高伸缩性。因此主要分析如何规避消费者实例被动减少导致的rebalance。
    • session.timeout.ms,接收心跳最大时长,默认10s
    • heartbeat.interval.ms,发送心跳的频率,默认3s
    • max.poll.interval.ms,消费消息最大时长,默认5分钟
    • max.poll.records,下游一次消费的消息数,默认500条
    1. 第一类非必要Rebalance是因为未能及时发送心跳,导致Consumer被“踢出”Group而引发的。比较合理的配置为能保证Consumer实例在被判定为“dead”之前,能够发送至少3轮的心跳请求。
    1. 第二类非必要Rebalance是Consumer消费时间过长导致的。设置max.poll.interval.ms要大于可能处理的最大时长,减小max.poll.records数量。
    1. consumer消费时有错误导致重启触发rebalance

    消费者位移提交

    max.poll.records一次poll最大返回消息数 默认500
    自动提交enable.auto.commit
    手动同步提交commitSync
    手动异步提交commitAsync

    消费者TCP管理

    何时连接

    1. FindCoordinator请求,确定协调者和获取集群元数据。
    1. 连接协调者,令其执行组成员管理操作。
    1. 连接分区领导者副本对应broker,执行实际的消息获取。

    何时关闭

    第三种链接建立后,第一种链接会废弃,后续定期请求元数据也该为使用第三种链接。
    主动关闭:KafkaConsumer.close() or kill
    kafka自动关闭:connection.max.idle.ms默认9分钟

    副本机制

    • In-sync Replicas(ISR副本集合)落后leader副本小于replica.lag.time.max.ms的副本集合,kafka认为该集合中的副本是与leader同步的。
    • Unclean领导者选举(是否允许非同步副本被选举为leader)

    broker处理请求流程

    num.network.threads设置网络线程池的线程数,其默认值是 3,表示每台Broker启动时会创建3个网络线程,专门处理客户端发送的请求
    Acceptor线程采用轮询的方式将入站请求公平地发到所有网络线程中
    网络线程拿到请求后,它不是自己处理,而是将请求放入共享请求队列。IO 线程池负责从该队列中取出请求,执行真正的处理。IO线程处理完请求后,会将响应发送到网络线程池的响应队列中,然后由对应的网络线程负责返还给客户端。
    请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。Acceptor只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送Response给客户端。
    Purgatory用来缓存延时请求。所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比如设置了acks=all的PRODUCE请求,一旦设置了acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的IO线程就必须等待其他Broker的写入结果。当请求不能立刻处理时,它就会暂存在Purgatory中。稍后一旦满足了完成条件,IO线程会继续处理该请求,并将Response放入对应网络线程的响应队列中。

    控制器组件(Controller)

    控制器组件(Controller)是Kafka的核心组件,它负责管理和协调整个Kafka集群

    控制器是如何被选出来的

    Broker在启动时,会尝试去ZooKeeper中创建/controller节点。第一个创建该节点的Broker会被指定为控制器。

    控制器是做什么的

    主题管理(创建、删除、增加分区)

    kafka-topics

    分区重分配

    kafka-reassign-partitions

    Preferred领导者选举

    Kafka为了避免部分Broker负载过重而提供的一种换Leader的方案

    集群成员管理(自动检测新增Broker、Broker主动关闭、Broker宕机)

    当有新Broker启动后,它会在/brokers下创建专属的znode节点,当Broker宕机或主动关闭后,这个znode节点会被自动删除。ZooKeeper会通过Watch机制将消息通知推送给控制器,控制器感知到这个变化后进行对应操作。

    数据服务

    控制器上保存了最全的集群元数据信息,其他所有Broker会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。

    控制器保存了什么数据

    这些数据在ZooKeeper中也保存了一份。控制器初始化时会从ZooKeeper上读取对应的元数据并填充到自己的缓存中。

    控制器故障转移(Failover)

    故障转移指的是,当运行中的控制器突然宕机或意外终止时,Kafka 能够快速地感知到,并立即启用备用控制器来代替之前失败的控制器。
    1. Broker 0宕机
    1. ZooKeeper通过Watch机制感知并删除/controller临时节点
    1. 所有存活的Broker竞选新的控制器身份
    1. Broker 3赢得了选举,在ZooKeeper重建了/controller节点
    1. Broker 3从ZooKeeper中读取集群元数据信息,并初始化到自己的缓存中

    副本同步机制

    什么是高水位

    水位

    • 在时刻T,任意创建时间(Event Time)为T’,且T’≤T的所有事件都已经到达或被观测到,那么T就被定义为水位。 -- 经典定义
    • 水位是一个单调增加且表征最早未完成工作(oldest work not yet completed)的时间戳。                -- Streaming System
    蓝色部分代表已完成的工作,红色部分代表正在进行中的工作,两者的边界就是水位线。

    KAFKA水位

    Kafka的水位不是时间戳,更与时间无关,它是用消息位移来表征的。另外,Kafka源码使用的表述是高水位(High Watermark),简写是HW。
    • 在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。消费者只能消费已提交消息,即图中位移小于8的所有消息。
    • 位移值等于高水位的消息也属于未提交消息。也就是说,高水位上的消息是不能被消费者消费的。图中还有一个日志末端位移(Log End Offset)的概念,简写是LEO。它表示副本写入下一条消息的位移值。介于高水位和LEO之间的消息就属于未提交消息。同一个副本对象,其高水位值不会大于LEO值。
    • 高水位和LEO是副本对象的两个重要属性。Kafka所有副本都有对应的高水位和LEO值,而分区的高水位就是其Leader副本的高水位。

    高水位的作用

    1. 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
    1. 帮助Kafka完成副本同步。

    高水位更新机制

    每个副本都保存了一组高水位值和LEO值,而leader副本比较特殊,在Leader副本所在的Broker上,还保存了其他Follower副本的LEO值。
    在Broker 0上保存远程副本的作用是帮助Leader副本确定其高水位,也就是分区高水位。

    Leader副本更新机制

    处理生产者请求的逻辑如下:
    1. 写入消息到本地磁盘并更新LEO值。
    1. 获取Leader副本所在Broker保存的所有与Leader同步的远程副本LEO值{LEO-1,LEO-2,……,LEO-n}。
    1. 更新高水位为min(LEO, LEO-1,LEO-2,……,LEO-n)。
    处理 Follower 副本拉取消息的逻辑如下:
    1. 读取磁盘(或页缓存)中的消息数据。
    1. 使用Follower副本发送请求中的LEO值更新远程副本LEO值。
    1. 更新分区高水位值(具体步骤与处理生产者请求的步骤相同)。

    Follower副本更新机制

    从 Leader 拉取消息的处理逻辑如下:
    1. 写入消息到本地磁盘并更新LEO值。
    1. 更新高水位为min(LeaderHW, currentLEO)。
    在这里,我稍微解释一下,什么叫与 Leader 副本保持同步。判断的条件有两个。
    1. 该远程Follower副本在ISR中。
    1. 该远程Follower副本LEO值落后于Leader副本LEO值的时间,不超过replica.lag.time.max.ms(默认10s)。

    副本同步机制解析

    以一个单分区且有两个副本的主题为例:
    • 在初始状态时,所有值都是0
    • 当生产者给主题分区发送一条消息后,状态变更为:
    此时,Leader副本成功将消息写入了本地磁盘,故LEO值被更新为1。
    • Follower再次尝试从Leader拉取消息,状态进一步变更为: 
    这时,Follower副本LEO也成功地更新为1。Leader和Follower副本的LEO都是1,但各自的高水位依然是0,还没有被更新。它们需要在下一轮的拉取中被更新,如下图所示:
    • 在新一轮的拉取请求中,Follower副本请求拉取位移值为1的消息。
    • Leader副本收到请求后,更新远程副本LEO为1,然后更新Leader高水位为1。最后将高水位值1发送给Follower副本。
    • Follower副本接收到以后,也将自己的高水位值更新成1。
    至此,一次完整的消息同步周期就结束了。

    Leader Epoch

    上述流程存在的问题

    Follower副本的高水位更新需要一轮额外的拉取请求才能实现。如果把上面那个例子扩展到多个 Follower副本,就需要多轮拉取请求。也就是说,Leader副本高水位更新和Follower副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。

    数据丢失

    • 开始时,副本A和副本B都处于正常状态,A是Leader副本。某个使用了min.insync.replicas=1(Leader副本成功写入即算发送成功)的生产者程序向A发送了两条消息,A全部写入成功,此时Kafka会通知生产者两条消息全部发送成功。
    • 现在假设Leader和Follower都写入了这两条消息,而且Leader副本的高水位也已经更新了,但Follower副本高水位还未更新。
    • 倘若此时副本B所在的Broker宕机,当它重启回来后,副本B会执行日志截断操作,将LEO值调整为之前的高水位值,也就是1。也就是位移值为1的那条消息被副本B从磁盘中删除,此时副本B的磁盘中只保存了位移值为0的那条消息。
    • 执行完截断操作后,副本B开始从A拉取消息,执行正常的消息同步。如果在此时,副本A所在的Broker宕机了,那么Kafka就别无选择,只能让副本B成为新的Leader。
    • 当A回来后,需要执行相同的日志截断操作,即将高水位调整为与B相同的值,也就是1。
    • 这样操作之后,位移值为1的那条消息就从这两个副本中被永远地抹掉了。这就是这张图要展示的数据丢失场景。

    数据不一致

    • 这种情况的初始状态与上面有一些不同:A依然是Leader,写入了2条消息,min.insync.replicas依旧为1,但B只写入了1条消息且不在ISR中。分区HW更新到2,但B的HW还是1。
    • 这次我们让A和B所在机器同时挂掉,然后假设B先重启回来,假设允许Unclean领导者选举,B因此成为leader,分区HW=1。假设此时producer发送了第3条消息(绿色框表示)给B,于是B的log中offset=1的消息变成了绿色框表示的消息,同时分区HW更新到2(A还没有回来,就B一个副本,故可以直接更新HW而不用理会A)。之后A重启回来,发现此时分区HW=2,与自己的HW值相同,故不做任何调整。此后A和B将以这种状态继续正常工作。
    • 这种场景下,A和B在offset=1的消息是不同的记录,从而导致数据不一致。
    基于此,社区在0.11版本引入了Leader Epoch概念,来规避因高水位更新错配导致的各种不一致问题。

    介绍

    Leader Epoch,我们大致可以认为是Leader版本。它由两部分数据组成。
    1. Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的Leader被认为是过期Leader,不能再行使Leader权力。
    1. 起始位移(Start Offset)。Leader副本在该Epoch值上写入的首条消息的位移。
    • 举例来说明一下Leader Epoch。假设现在有两个Leader Epoch<0, 0>和<1, 120>,那么,第一个Leader Epoch表示版本号是0,这个版本的Leader从位移0开始保存消息,一共保存了120条消息。之后Leader发生了变更,版本号增加到1,新版本的起始位移是120。
    • Kafka Broker会在内存中为每个分区都缓存Leader Epoch数据,同时定期持久化到checkpoint文件中。Leader副本写入消息到磁盘时,Broker会尝试更新这部分缓存。当发生Leader变更时,新的Leader副本会查询这部分缓存,取出对应的Leader Epoch的起始位移,以避免数据丢失和不一致的情况。
    • OffsetsForLeaderEpochRequest请求:副本重启后,会读取checkpoint文件获取当前的Epoch版本,并向Leader副本请求下一版本的起始Offset。若当前Leader副本的Epoch版本与该副本相同,则返回当前LEO值。若相差版本较多,恢复的副本会一步步把自己和Leader之间缺少的版本信息和数据补齐。

    问题解决

    数据丢失

    • 场景和之前大致类似,只不过副本B重启后,从checkpoint文件中获取Leader Epoch,得知自己的Epoch是0,需要向A发送OffsetsForLeaderEpochRequest请求由于A的版本也是0,返回当前A的LEO值,该值为2。当获知到Leader LEO=2后,B发现该LEO值与自己的LEO值相同,因此B无需执行日志截断操作。这是对高水位机制的一个明显改进,即副本是否执行日志截断不再依赖于高水位进行判断。
    • 现在,副本A宕机,B成为Leader。A重启后,执行与B相同的逻辑判断,发现也不用执行日志截断,至此位移值为1的那条消息在两个副本中均得到保留。后面当生产者向B写入新消息时,副本B所在的Broker缓存中,会生成新的Leader Epoch条目:[Epoch=1, Offset=2]。之后,副本B会使用这个条目帮助判断后续是否执行日志截断操作。

    数据不一致

  • 相关阅读:
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    Security and Cryptography in Python
    《EffectiveJava中文第二版》 高清PDF下载
    《MoreEffectiveC++中文版》 pdf 下载
    《啊哈c语言》 高清 PDF 下载
  • 原文地址:https://www.cnblogs.com/d-e-v-i-l/p/12143532.html
Copyright © 2011-2022 走看看