zoukankan      html  css  js  c++  java
  • 【原创】kafka server源代码分析(二)

    十四、AbstractFetcherManager.scala
    该scala定义了两个case类和一个抽象类。两个case类很简单:
    1. BrokerAndFectherId:封装了一个broker和一个fetcher的数据结构
    2. BrokerAndInitialOffset:封装了broker和初始位移的一个数据结构
    该scala中最核心的还是那个抽象类:AbstractFetcherManager。它维护了一个获取线程的map,主要保存broker id + fetcher id对应的线程信息。其定义的方法如下:
    1. getFetcherId:给定topic和分区号计算fetcher id,主要就是根据计算哈希值与由属性num.consumer.fetchers或num.replica.fetchers指定的线程数(默认是1,也就是默认使用一个线程获取数据)求模
    2. createFetcherThread:抽象方法,需要子类实现。就是创建一个Fetcher线程
    3. addFetcherForPartitions:为给定的多个分区增加fetcher线程并更新到对应的fetcher映射集合中
    4. removeFetcherForPartitions:把给定的分区集合对应的fetch移除掉
    5. shutdownIdelFetcherThreads:关闭那些空闲的fetcher线程。具体做法就是如果某个fecther对应的分区数是0,直接关闭该fetcher,并把其从fetcher线程map中移除
    6. closeAllFetchers:关闭所有fetcher线程,同时清空fetcher线程map
    十五、FetchDataInfo.scala
    封装了日志位移元数据信息和消息集合的一个数据结构
    十六、RequestPurgatory.scala
        单看名字看不出这个类是做什么的。Purgatory:炼狱?暂时受苦的地方?目前还看不出这个是什么意思。不过该scala定义了两个类(包括一个抽象类)都是很多类会继承的。首先来说说DelayedRequest类。DelayedRequest定义了一个延时处理请求,延迟的时间单位是ms,在构造函数中需要传入这个值。构造函数还要求传入一组key集合来触发一些action,比如检查该请求是否已经满足某些条件等。这些key可以是一对(topic, 分区)。
        该scala定义的第二个类是一个抽象类:RequestPurgatory——它是一个帮助类用于处理带超时的异步请求。上面说到的DelayedRequest是一个延时请求并且定义了一组key能够触发某种操作。在具体实现的时候可以添加自定义的逻辑来定义对于一个给定的请求而已满足条件表示什么含义。比如说,可能是等待用户指定的acks数或者是等待积累了指定数目的字节数。DelayedRequest的key通常都是一个(topic, 分区)对。
        该类定义了一个嵌套类:Watchers——一个DelayedRequest的链表来监听每个请求是否已满足条件(satisfied)。该类定义的方法有:
    1. watched:按返回监听列表的个数
    2. addIfNotSatisfied:如果给定的请求未satisfied,则将其加入到监听列表中
    3. purgeSatisfield:遍历整个列表删除satisfied的请求
    4. collectSatisfiedRequests:遍历整个列表尝试为每个被监听的请求检查是否满足条件,主要使用checkSatisfied方法
    还有一个嵌套类:ExpiredRequestReaper,是一个Runnable对象,会将那些已经很长时间没能满足条件的请求置于过期状态。具体做法就是在内部维护了一个延时队列DelayQueue,提供了一些方法可以来操作此线程:
    1. enqueue:将请求T入队
    2. delayed:获取当前delayed的请求数
    3. pollExpired:获取下一条已过期的请求
    4. purgeSatisfied:从延时队列和监控列表中删除所有已满足条件的请求
        回到RequestPurgatory类, 该类维护了一个请求列表,每个key都对应了一个Watchers。另外该类还定义了一个过期请求处理器和一个专门的线程来执行这个过期请求处理器中指定的工作,并在构造函数时启动该线程。该类还提供了一些方法:
    1. isSatisfiedByMe:判断这个请求是否是由调用者线程满足的条件——即是否由调用者线程设置satisfied为true
    2. checkAndMaybeWatch:尝试添加请求以监听所有列表中的key。如果该请求本身已经satisfied了且是由调用者线程做出的则返回true
    3. update:更新watchers并返回刚刚satisfied请求的列表
    4. watched:返回watch列表的个数
    5. delayed:返回在过期请求处理器队列中的请求数
    6. watchersFor:根据给定的watch key返回对应的watch列表
    7. checkSatisfied:检查这个请求是否已经satisfied
    8. expire:处理过期请求
    9. shutdown:关闭过期请求处理器
    十七、DelayedFetch.scala
    延迟获取请求,出现以下情况时该请求才不会被阻塞:
    • 这个请求要获取的目标分区的leader已不再是这个broker了——这种情况下应该返回其他分区的可用数据
    • broker中无请求中有些目标分区的记录——这种情况下应该返回其他分区的可用数据
    • 位移信息没有保存在日志的最后一个日志段中——这种情况下应该返回日志段中所有的数据
    • 从所有目标分区中读取的字节数已超过最小字节数——这种情况下应该返回可用的数据
    该类定义的方法有:
    1. isSatisfied:判断目标请求是否没有被阻塞。从代码中来看貌似永远返回true,除非上述前三个条件都没满足。
    2. respond:创建新的FetchResponse返回
    十八、DelayedProduce.scala
    一个延迟produce请求,继承了DelayedRequest。该请求不会被阻塞,如果对于它要生产消息的每个分区:
    • 该broker都不是leader,那么就不会阻塞——应该返回一个错误
    • 如果这个broker是leader:
      • 如果有一个本地错误(比如在写本地log时),那么该请求也不会被阻塞——而是返回一个错误
      • 否则,至少应该匹配requiredAcks个副本
    该类还定义了一个case类:DelayedProduceResponseStatus封装了ProduceResponse的状态,包括offset和错误码。该类维护了一个volatile的状态标识位acksPending
    在该类的构造函数中,会根据错误码来更新acks pending值。该类也定义了两个方法:respond和isSatisfied:
    1. respond:如果response中无错误的话,将位移信息加入到位移管理器的缓存中,然后构建一个response返回
    2. isSatisfied:检查每个分区是否仍然有未响应的应答。如果没有则置为unblocked状态
    十九、FetchRequestPurgatory.scala
    保存延迟Fetch请求的临时场所。该类定义了一个嵌套类DelayedFetchRequestMetrics和recordDelayedFetchExpired都是关于更新统计信息的。提供的其他方法还有:
    1. checkSatisfied:检查一个给定的DelayedFetch请求是否是非阻塞状态
    2. respond:把FetchRequest请求对应的响应发回到Socket server
    3. expire:当一个请求FetchRequest过期时,把它原样发回去
    二十、KafkaRequestHandler.scala
    一个线程来响应Kafka请求。该scala定义的类包括:
    1. BrokerTopicMetrics:主要定义了一些统计度量元,比如每秒消息数、每秒入站请求字节数、每秒出站请求字节数、每秒被拒绝请求字节数等
    2. BrokerTopicStats:获取统计度量元的类
    3. KafkaRequestHandlePool:一个线程池,线程个数由属性num.io.threads指定——该属性指定了服务器使用多少个IO线程来执行请求。该线程池会创建出这些线程并依次启动它们。
    4. KafkaRequestHandler:处理请求的线程类。核心的run方法逻辑如下:
    • 不断地循环直至获取到一个kafka的请求
    • 如果是关闭命令的请求则直接退出,否则交给KafkaApis的handle方法用来处理请求
    二十一、ReplicaFetcherThread.scala
    副本获取器线程,主要定义了以下几个方法:
    1. handlePartitionsWithErrors:处理有错误(leader已经发生编程)的分区,当前什么都不做因为controller会应对这些变更
    2. handleOffsetOutOfRange:处理一个位移越界的分区返回新的获取位移值(fetch offset),具体逻辑如下:
    • 获取给定topic分区在该broker上的副本
    • 获取该分区leader的结束位移值,如果leader的结束位移值比该副本的结束位移还小的话,先判断一下是否启用了unclean leader选举。若没有启用,直接报错;否则就将follower副本的位移直接截取成leader的结束位移
    • 若follower位移比leader的还小,直接截取所有位移并设置leader的初始位移处开始读取leader
    如果启用了unclean leader选举,那么就有可能出现这样的情景:一个follower宕机了,而同时leader还在不停地写入消息。当这个follower重启回来的时候它需要完整地追上leader的进度。就在这个过程中,ISR中所有的副本都宕掉了。那么此时这个follower就会被unclean leader选举为新的leader,然后它开始写入从客户端发来的消息。之后旧的leader恢复,成为了一个follower,它会发现当前leader的最新位移居然比自己的还要小。这种情况下,只能截断自己的位移使之与当前leader的最新位移保持一致然后继续处理。
    3. processPartitionData:处理获取到的数据。主要逻辑就是将给定的response数据解析出来并更新到该broker上的副本对象中,比如获取到的消息集合以及更新高水位值
    二十二、ReplicaFetcherManager.scala
    副本获取数据管理器,主要就是通过创建获取器线程来实现数据的获取
    二十三、ReplicaManager.scala
        Kafka的副本管理器。这个文件中定义了一个object、一个case类和一个ReplicaManager类。其中ReplicaManager object比较简单,就是定义了一个高水位文件名:replication-offset-checkpoint。后面在遍历log.dirs下的每个目录时都会创建以它为名称的一个文件。而case类PartitionDataAndOffset就是封装了Fetch请求响应数据以及位移元数据的一个数据结构。
        下面说下最重要的类:ReplicaManager——这是kafka定义的副本管理器。它接收一个Kafka配置类、以及一个调度器用于执行一些定时任务。该类定义的字段有:
    1. controllerEpoch:一个volatile变量, 用于标识controller最近一次变更leader的epoch值
    2. localBrokerId:本地broker id
    3. allPartitions:所有的topic的分区信息
    4. replicaFetcherManager:副本请求leader数据的管理器实例,副本使用该类实例从leader请求数据与leader保持一致
    5. highWatermarkCheckpoints:为log.dirs下每个目录都创建一个高水位文件保存位移检查点
    6. producerRequestPurgatory/fetchRequestPurgatory:分别保存延时的producer请求和fetch请求
    该类定义的方法有:
    1. getLeaderPartitions:获取leader分区——即那些leader在该broker上的分区
    2. underReplicatedPartitionCount:副本数量不足的分区数量
    3. checkpointHighWatermarks:写入所有分区的高水位值到各自的高水位文件中。
    4. shutdown:关闭副本管理器——即关闭副本获取器管理器然后更新各分区的高水文件
    5. startHighWaterMarksCheckPointThread:开启高水位检查点线程,并创建一个定期任务,定期地写入高水位检查点文件,默认间隔是5秒。
    6. initWithRequestPurgatory:初始化副本管理器的ProducerPurgatory和FetchPurgatory两个类实例
    7. unblockDelayedFetchRequests:为给定的分区解禁延时Fetch请求,并发送回response
    8. maybeShrinkIsr:重新计算分区的ISR已查看是否需要从ISR中移除一些副本。主要标准就是判断是否一个follower是否在replica.lag.time.max.ms时间内没有发送Fetch请求或如果一个follower比leader相差的消息数已经超过了replica.lag.max.messages值。
    8. startup:开启ISR过期线程,定期执行maybeShrinkIsr任务,默认是10s
    9. getPartition:从副本管理器的allPartitions缓存中获取给定分区id的分区对象
    10. stopReplica:停止给定分区在本broker上的副本。如果还需要删除分区的话,那么就清除底层的log文件。具体逻辑如下:
    • 获取给定分区号对应的分区对象
    • 如果存在的话,判断一下是否需要删除分区,如果是就将其从副本管理器的缓存中移除并删除本地日志
    • 如果不存在的话,但还需要删除分区——这种情况可能是因为topic正在被删除但该topic宕掉了且正在恢复中——这种情况下就删除该分区的日志。
    11. stopReplicas:解析传入的StopReplica请求,将其中所包含的所有副本都停止掉。具体逻辑如下:
    • 获取给定请求的controller_epoch值,如果发现比当前的epoch值小的话,那么打出一条警告;否则
    • 首先要停止这些分区副本再向leader发出fetch请求获取数据,然后调用stopReplica方法停止对应的副本
    12. getOrCreatePartition:获取给定的分区对象,如果不存在直接创建一个新的
    13. getReplicaOrException:获取给定分区在该broker上的副本,如果不存在直接抛出异常
    14. getLeaderReplicaIfLocal:获取给定分区在该broker上的leader副本,如果不存在直接抛出异常
    15. readMessageSet:从给定的topic分区中指定的offset处读取指定大小的字节。具体逻辑如下:
    • 首先检查一下当前的broker是否是给定分区的leader
    • 如果是目标副本是follower的话,初始化最大位移值为空,否则初始化为本地副本的高水位值
    • 如果有本地log的话读取本地log,否则记录错误然后创建一个空的分区数据信息
    • 最后返回分区数据信息与对应的高水位值
    16. readMessageSets:读取FetchRequest请求并返回请求中分区对应的分区信息映射记录。具体逻辑如下:
    • 首先判断一下这个请求是否是从某个follower发过来的
    • 然后解析请求中的数据信息,对应每一个分区及其对应的获取到的分区信息(位移+获取大小)而言,读取获取到的分区数据及对应的高水位值,之后创建对应的response分区数据与对应的位移的映射记录
    • 最后遍历完整个请求后返回映射记录集合
    17. maybeUpdateMetadataCache:更新元数据缓存,即更新本地broker上的元数据信息
    18. becomeLeaderOrFollower:处理给定的LeaderAndIsrRequest请求。具体逻辑如下:
    • 如果给定的LeaderAndIsrRequest请求中的controller_epoch值比当前的controller_epoch值小,返回一个空的response表示该请求中包含了过期的epoch
    • 否则先检查一下分区leader的epoch。做法就是遍历请求中的分区状态map,对于每一个topic分区及其对应的分区状态信息而言,获取分区当前的leader的epoch
    • 如果它比请求中leader的epoch小,判断一下该分区的所有副本中是否包含该broker——如果是,加入到分区状态缓存中
    • 如果epoch比请求中该分区的leader的epoch大,记录下这个错误
    • 分别找出那些需要成为leader和followerDefaultEncoder分区,然后分别调用makeLeaders和makeFollowers来处理它们。
    • 开启一个高水位检查点线程以确保所有分区都已经处理完毕
    • 关闭空闲线程,返回对应的response
    19. makeLeaders:使当前broker成为给定分区的leader。具体做法就是:
    • 停止这些分区的fetcher
    • 更新缓存中的分区元数据信息
    • 将这些分区增加到leader分区集合中
    如果该方法执行期间抛出错误,该错误会被传递给KafkaApis设置错误消息。该方法具体逻辑如下:
    • 在response中增加所有分区的记录
    • 停止给定所有分区的fetch
    更新分区leader信息
    20. makeFollowers:让当前broker成为目标分区的follower,具体做法就是:
    • 将这些分区从leader分区集合中移除
    • 标记副本为Follower,这样producer不会增加更多的数据
    • 停止这些分区的fetch,这样replica fetch线程就不能增加更多的数据
    • 截取这些分区的日志和检查点位移
    • 如果broker没有关闭,增加fetcher到新的leader中
    按照这些顺序执行操作可以保证事务中的副本将不再接受任何检查点位移之前的消息,这样检查点之前的所有消息都能确保被写入磁盘。
    21. updateReplicaLEOAndPartitionHW:更新leader的高水位位移值
    二十四、ProducerRequestPurgatory.scala
    包含延时producer请求。内定义了一个嵌套类用于保存一些度量元信息,另外还定义了一些方法:
    1. recordDelayedProducerKeyExpired:记录延迟Producer过期
    2. checkSatisfied:检查一个特定的延时Fetch请求是否未被阻塞
    3. expire:将延迟Producer请求过期
    4. respond:将response发回
    二十五、KafkaServer.scala
    管理一个Kafka broker的生命周期。处理所有的功能,包括启动和关闭一个Kafka节点。这是最核心的类。它提供的方法有:
    1. startup:开启一个Kafka server,初始化LogManager, SocketServer和请求处理器。具体逻辑如下:
    • 设置broker状态为STARTING状态,开启Kafka调度器,设置zookeeper,并开启日志管理器
    • 创建broker上的SocketServer并启动之
    • 创建副本管理器,并创建位移管理器
    • 创建KafkaController
    • 开始处理请求并设置broker状态为RUNNINGASBROKER
    • 开启副本管理器,开启controller
    • 创建topic配置管理器并启动之
    • 告诉外界新的broker已经启动
    2. initZK:初始化Zookeeper,主要是创建一个ZkClient供以后使用
    3. registerStats:注册动态的JMX bean
    4. controlledShutdown:执行托管关闭操作,具体逻辑如下:
    • 能够执行这个方法的前提是broker已完整启动并且开启了由属性controlled.shutdown.enable指定的开关——开启这个开关的话,broker会将所有位于其上的leader都搬到其他的broker上然后再关闭。这可以减少不可用时间窗口
    • 首先将broker状态设置为PendingControlledShutdown
    • 获取当前的controller并连接它
    • 发起controller的托管关闭操作,如果没有关闭等待controlled.shutdown.retry.backoff.ms设置的时间再试,总共试controlled.shutdown.max.metries
    5. shutdown:关闭Kafka server,包括关闭LogManager,SocketServer和log cleaner调度器线程。具体逻辑如下:
    • 判断下是否正处于关闭状态,如果是,
    • 调用controlledShutdown关闭controller,设置broker状态为BrokerShuttingDown
    • 关闭Socket Server、关闭请求处理器、位移管理器、调度器、api、副本管理器、日志管理器、zk客户端
    • 设置broker状态为NotRunning
    • 设置相应标识位
    6. createLogManager:创建一个日志管理器
    7. createOffsetManager:创建一个位移管理器
    二十六、KafkaApis.scala
    封装了处理各种Kafka请求的逻辑及功能,提供了一个顶层的handle方法用于处理所有种类的请求,包括:
    • ProducerOrOffsetCommitRequest
    • FetchRequest
    • OffsetRequest
    • MetadataRequest
    • LeaderAndIsrRequest
    • StopReplicaRequest
    • UpdateMetadataRequest
    • ControlledShutdownRequest
    • OffsetCommitRequst
    • OffsetFetchRequest
    • ConsumerMetadataRequest
    下面一个一个说:
    1. appendToLocalLog:处理producer请求时的helper方法,将消息集合追加写入本地log中,然后创建一个ProduceRequest实例返回——ProducerRequest是一个case类,表示producer请求处理的结果,包含topic分区信息,起始位移和结束位移以及相应的错误码
    2. handleProducerOrOffsetCommitRequest:处理producer请求以及offset commit请求——后者也是一种特殊的producer请求。前面说到过,producer请求就是将消息发送给kafka server。为了效率上的考虑,producer可以封装多个topic分区的消息集合到一个请求中。具体逻辑如下:
    • 首先需要判断一下这是普通的producer请求还是一个offset commit请求
    • 确保request.required.acks的值在-1, 0, 1中取值
    • 调用appendToLocalLog将消息集合写入本地日志并返回produce结果
    • 查看结果中的错误码以及计算有错误的分区数
    • 如果request.required.acks = 0,则不需要任何操作(如果有异常的话还是要处理的,但因为producer不需要期望接收任何response,因此处理器需发送一个关闭连接的response给socket server以关闭底层的socket,这样producer客户端代码就知道发生了异常并会刷新它的元数据信息)——即如果存在错误的分区,发送closeConnecttion关闭底层socket连接。否则,判断一下错误码,如果没有错误且是offset commit请求,直接将consumer group对应的offset元数据信息增加到缓存中并创建response发送给client。如果不是offset commit请求,就什么都不做(因为request.required.acks = 0)
    • 如果request.required.acks = 1或者请求中的目标分区为空再或者目标分区都有错误的话,检查错误码,如果没错且是offset commit请求,还是更新位移管理器中的位移缓存然后创建并发送对应的response
    • 如果request.required.acks = 2,创建一组(topic, 分区)对作为key创建一个延迟Producer请求,检查该请求是否被阻塞,如果不是直接发送response
    • 最后清空该请求
    3. handleFetchRequest:处理FetchRequest。具体逻辑如下:
    • 从FetchRequest请求中读取目标分区数据
    • 如果该请求来自于follower副本,更新其对应的日志和最新位移
    • 检查这个请求是否已满足条件可以立即执行
    • 满足以下条件之一时就会立即发送response
      • 请求需要立即执行
      • 请求本身并未获取任何数据
      • 已累积了足够多的数据以供响应
      • 读取数据时候出现错误
    • 否则的话将该请求暂存于对应的purgatory,以供后面延时发送
    4. handleOffsetRequest:处理offset请求,获取一组分区的合法位移范围。具体逻辑如下:
    • 遍历请求中每一个topic分区,确保其leader副本的存在性
    • 获取位移,加入到responseMap
    • 遍历完之后使用responseMap创建一个OffsetResponse并发送回client
    5. handleTopicMetadataRequest:处理topic元数据请求,逻辑也很简单就是先获取topic的元数据信息——值得注意的是,如果请求的topic不存在或请求的是offset commit的topic,那么就会创建出对应的topic来。之后获取所有当前可用的broker,然后创建对应的response并发送
    6. handleLeaderAndIsrRequest:处理LeaderAndIsr请求。方法中并不检查topic是否存在,因为controller发送这个请求的时候会告诉所有的broker,这样这些broker就不会再处理被删除topic的数据了。
    7. handleStopReplicaRequest:发送停止副本的请求,并构造对应的response返回。同时还要关闭空闲线程
    8. handleUpdateMetadataRequest:处理更新元数据请求,调用副本管理器的maybyUpdateMetadataCache方法并创建对应的response返回
    9. handleControlledShutdownRequest:处理受控关闭的请求。首先要关闭broker,然后创建对应的response返回
    10. handleOffsetCommitRequest:负责位移提交的请求。首先查看请求中的版本号,如果是0,那么需要在zookeeper中/consumers/groupID/offsets&owners保存位移,否则作为一个特殊的kafka topic消息调用handleProducerOrOffsetCommitRequest保存在kafka中
    11. handleOffsetFetchRequest:处理获取位移的请求。具体逻辑如下:
    • 如果请求中版本号是0,那么从zookeeper对应目录中读取位移
    • 否则从Kafka特定topic中读取位移
    • 创建对应的response返回
    12. handleConsumerMetadataRequest:请求consumer的元数据信息。具体逻辑就是找出请求中consumer group经过哈希之后的offset commit topic分区,然后获取这个分区的元数据信息,之后创建对应的reponse返回。
  • 相关阅读:
    redis和memcached的对比
    Linux根目录下各目录含义
    阿里云常用服务介绍
    Nginx的负载均衡算法、lvs的负载均衡算法
    MySQL高可用架构之MHA
    SQL server 2012 各个版本比较
    500 ZuulException: Forwarding error
    Hibernate --主键生成策略
    springboot配置activeMQ传输object类型的消息时:classnotfound问题
    JMS简介与API
  • 原文地址:https://www.cnblogs.com/huxi2b/p/4545613.html
Copyright © 2011-2022 走看看