概述
日志段及其相关代码是 Kafka 服务器源码中最为重要的组件代码之一。你可能会非常关心,在 Kafka 中,消息是如何被保存和组织在一起的。毕竟,不管是学习任何消息引擎,弄明白消息建模方式都是首要的问题。因此,你非常有必要学习日志段这个重要的子模块的源码实现。今天,我会带你详细看下日志段部分的源码。不过在此之前,你需要先了解一下 Kafka 的日志结构日志是 Kafka 服务器端代码的重要组件之一,很多其他的核心组件都是以日志为基础的,比如后面要讲到的状态管理机和副本管理器等。
总的来说,Kafka 日志对象由多个日志段对象组成,而每个日志段对象会在磁盘上创建一组文件,包括消息日志文件(.log)、位移索引文件(.index)、时间戳索引文件(.timeindex)以及已中止(Aborted)事务的索引文件(.txnindex)。当然,如果你没有使用 Kafka 事务,已中止事务的索引文件是不会被创建出来的。图中的一串数字 0 是该日志段的起始位移值(Base Offset),也就是该日志段中所存的第一条消息的位移值。
日志段核心代码
日志段源码位于 Kafka 的 core 工程下,具体文件位置是 core/src/main/scala/kafka/log/LogSegment.scala。实际上,所有日志结构部分的源码都在 core 的 kafka.log 包下。该文件下定义了三个 Scala 对象:LogSegment class;LogSegment object;LogFlushStats object。LogFlushStats 结尾有个 Stats,它是做统计用的,主要负责为日志落盘进行计时。每个日志段由两个核心组件构成:日志和索引。当然,这里的索引泛指广义的索引文件。另外,这段注释还给出了一个重要的事实:每个日志段都有一个起始位移值(Base Offset),而该位移值是此日志段所有消息中最小的位移值,同时,该值却又比前面任何日志段中消息的位移值都大。
下面,我分批次给出比较关键的代码片段,并对其进行解释。首先,我们看下 LogSegment 的定义:
1 class LogSegment private[log] (val log: FileRecords, 2 val lazyOffsetIndex: LazyIndex[OffsetIndex], 3 val lazyTimeIndex: LazyIndex[TimeIndex], 4 val txnIndex: TransactionIndex, 5 val baseOffset: Long, 6 val indexIntervalBytes: Int, 7 val rollJitterMs: Long, 8 val time: Time) extends Logging { … }
就像我前面说的,一个日志段包含消息日志文件、位移索引文件、时间戳索引文件、已中止事务索引文件等。这里的 FileRecords 就是实际保存 Kafka 消息的对象。专栏后面我将专门讨论 Kafka 是如何保存具体消息的,也就是 FileRecords 及其家族的实现方式。同时,我还会给你介绍一下社区在持久化消息这块是怎么演进的,你一定不要错过那部分的内容。下面的 lazyOffsetIndex、lazyTimeIndex 和 txnIndex 分别对应于刚才所说的 3 个索引文件。不过,在实现方式上,前两种使用了延迟初始化的原理,降低了初始化时间成本。后面我们在谈到索引的时候再详细说。
每个日志段对象保存自己的起始位移 baseOffset——这是非常重要的属性!事实上,你在磁盘上看到的文件名就是 baseOffset 的值。每个 LogSegment 对象实例一旦被创建,它的起始位移就是固定的了,不能再被更改。
对于一个日志段而言,最重要的方法就是写入消息和读取消息了,它们分别对应着源码中的 append 方法和 read 方法。另外,recover 方法同样很关键,它是 Broker 重启后恢复日志段的操作逻辑。
append 方法
我们先来看 append 方法,了解下写入消息的具体操作。append 方法接收 4 个参数,分别表示待写入消息批次中消息的最大位移值、最大时间戳、最大时间戳对应消息的位移以及真正要写入的消息集合。下面这张图展示了 append 方法的完整执行流程:
第一步:在源码中,首先调用 log.sizeInBytes 方法判断该日志段是否为空,如果是空的话, Kafka 需要记录要写入消息集合的最大时间戳,并将其作为后面新增日志段倒计时的依据。
第二步:代码调用 ensureOffsetInRange 方法确保输入参数最大位移值是合法的。那怎么判断是不是合法呢?标准就是看它与日志段起始位移的差值是否在整数范围内,即 largestOffset - baseOffset 的值是不是介于 [0,Int.MAXVALUE] 之间。在极个别的情况下,这个差值可能会越界,这时,append 方法就会抛出异常,阻止后续的消息写入。一旦你碰到这个问题,你需要做的是升级你的 Kafka 版本,因为这是由已知的 Bug 导致的。
第三步:待这些做完之后,append 方法调用 FileRecords 的 append 方法执行真正的写入。前面说过了,专栏后面我们会详细介绍 FileRecords 类。这里你只需要知道它的工作是将内存中的消息对象写入到操作系统的页缓存就可以了。
第四步:再下一步,就是更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性。每个日志段都要保存当前最大时间戳信息和所属消息的位移信息。还记得 Broker 端提供定期删除日志的功能吗?比如我只想保留最近 7 天的日志,没错,当前最大时间戳这个值就是判断的依据;而最大时间戳对应的消息的位移值则用于时间戳索引项。虽然后面我会详细介绍,这里我还是稍微提一下:时间戳索引项保存时间戳与消息位移的对应关系。在这步操作中,Kafka 会更新并保存这组对应关系。
第五步:append 方法的最后一步就是更新索引项和写入的字节数了。我在前面说过,日志段每写入 4KB 数据就要写入一个索引项。当已写入字节数超过了 4KB 之后,append 方法会调用索引对象的 append 方法新增索引项,同时清空已写入字节数,以备下次重新累积计算。
read 方法
好了,append 方法我就解释完了。下面我们来看 read 方法,了解下读取日志段的具体操作。
read 方法接收 4 个输入参数。startOffset:要读取的第一条消息的位移;maxSize:能读取的最大字节数;maxPosition :能读到的最大文件位置;minOneMessage:是否允许在消息体过大时至少返回第一条消息。前 3 个参数的含义很好理解,我重点说下第 4 个。当这个参数为 true 时,即使出现消息体字节数超过了 maxSize 的情形,read 方法依然能返回至少一条消息。引入这个参数主要是为了确保不出现消费饿死的情况。
逻辑很简单,我们一步步来看下。第一步是调用 translateOffset 方法定位要读取的起始文件位置 (startPosition)。输入参数 startOffset 仅仅是位移值,Kafka 需要根据索引信息找到对应的物理文件位置才能开始读取消息。待确定了读取起始位置,日志段代码需要根据这部分信息以及 maxSize 和 maxPosition 参数共同计算要读取的总字节数。举个例子,假设 maxSize=100,maxPosition=300,startPosition=250,那么 read 方法只能读取 50 字节,因为 maxPosition - startPosition = 50。我们把它和 maxSize 参数相比较,其中的最小值就是最终能够读取的总字节数。最后一步是调用 FileRecords 的 slice 方法,从指定位置读取指定大小的消息集合。
recover 方法
除了 append 和 read 方法,LogSegment 还有一个重要的方法需要我们关注,它就是 recover 方法,用于恢复日志段。下面的代码是 recover 方法源码。什么是恢复日志段呢?其实就是说, Broker 在启动时会从磁盘上加载所有日志段信息到内存中,并创建相应的 LogSegment 对象实例。在这个过程中,它需要执行一系列的操作。
recover 开始时,代码依次调用索引对象的 reset 方法清空所有的索引文件,之后会开始遍历日志段中的所有消息集合或消息批次(RecordBatch)。对于读取到的每个消息集合,日志段必须要确保它们是合法的,这主要体现在两个方面:该集合中的消息必须要符合 Kafka 定义的二进制格式;该集合中最后一条消息的位移值不能越界,即它与日志段起始位移的差值必须是一个正整数值。
校验完消息集合之后,代码会更新遍历过程中观测到的最大时间戳以及所属消息的位移值。同样,这两个数据用于后续构建索引项。再之后就是不断累加当前已读取的消息字节数,并根据该值有条件地写入索引项。最后是更新事务型 Producer 的状态以及 Leader Epoch 缓存。不过,这两个并不是理解 Kafka 日志结构所必需的组件,因此,我们可以忽略它们。遍历执行完成后,Kafka 会将日志段当前总字节数和刚刚累加的已读取字节数进行比较,如果发现前者比后者大,说明日志段写入了一些非法消息,需要执行截断操作,将日志段大小调整回合法的数值。同时, Kafka 还必须相应地调整索引文件的大小。把这些都做完之后,日志段恢复的操作也就宣告结束了。
日志Log基础知识
你可以认为,日志是日志段的容器,里面定义了很多管理日志段的操作。坦率地说,如果看 Kafka 源码却不看 Log,就跟你买了这门课却不知道作者是谁一样。在我看来,Log 对象是 Kafka 源码(特别是 Broker 端)最核心的部分,没有之一。
Log 源码结构
Log 源码位于 Kafka core 工程的 log 源码包下,文件名是 Log.scala。总体上,该文件定义了 10 个类和对象,如下图所示:
图中括号里的 C 表示 Class,O 表示 Object。还记得我在上节课提到过的伴生对象吗?没错,同时定义同名的 Class 和 Object,就属于 Scala 中的伴生对象用法。我们先来看伴生对象,也就是 LogAppendInfo、Log 和 RollParams。
LogAppendInfo
LogAppendInfo(C):保存了一组待写入消息的各种元数据信息。比如,这组消息中第一条消息的位移值是多少、最后一条消息的位移值是多少;再比如,这组消息中最大的消息时间戳又是多少。总之,这里面的数据非常丰富(下节课我再具体说说)。LogAppendInfo(O): 可以理解为其对应伴生类的工厂方法类,里面定义了一些工厂方法,用于创建特定的 LogAppendInfo 实例。
Log
Log(C): Log 源码中最核心的代码。这里我先卖个关子,一会儿细聊。Log(O):同理,Log 伴生类的工厂方法,定义了很多常量以及一些辅助方法。
RollParams
RollParams(C):定义用于控制日志段是否切分(Roll)的数据结构。
RollParams(O):同理,RollParams 伴生类的工厂方法。
除了这 3 组伴生对象之外,还有 4 类源码。LogMetricNames:定义了 Log 对象的监控指标。LogOffsetSnapshot:封装分区所有位移元数据的容器类。LogReadInfo:封装读取日志返回的数据及其元数据。CompletedTxn:记录已完成事务的元数据,主要用于构建事务索引。
Log Class & Object
下面,我会按照这些类和对象的重要程度,对它们一一进行拆解。首先,咱们先说说 Log 类及其伴生对象。考虑到伴生对象多用于保存静态变量和静态方法(比如静态工厂方法等),因此我们先看伴生对象(即 Log Object)的实现。
1 object Log { 2 val LogFileSuffix = ".log" 3 val IndexFileSuffix = ".index" 4 val TimeIndexFileSuffix = ".timeindex" 5 val ProducerSnapshotFileSuffix = ".snapshot" 6 val TxnIndexFileSuffix = ".txnindex" 7 val DeletedFileSuffix = ".deleted" 8 val CleanedFileSuffix = ".cleaned" 9 val SwapFileSuffix = ".swap" 10 val CleanShutdownFile = ".kafka_cleanshutdown" 11 val DeleteDirSuffix = "-delete" 12 val FutureDirSuffix = "-future" 13 …… 14 }
这是 Log Object 定义的所有常量。如果有面试官问你 Kafka 中定义了多少种文件类型,你可以自豪地把这些说出来。耳熟能详的.log、.index、.timeindex 和.txnindex 我就不解释了,我们来了解下其他几种文件类型。
.snapshot 是 Kafka 为幂等型或事务型 Producer 所做的快照文件。鉴于我们现在还处于阅读源码的初级阶段,事务或幂等部分的源码我就不详细展开讲了。
.deleted 是删除日志段操作创建的文件。目前删除日志段文件是异步操作,Broker 端把日志段文件从.log 后缀修改为.deleted 后缀。如果你看到一大堆.deleted 后缀的文件名,别慌,这是 Kafka 在执行日志段文件删除。
.cleaned 和.swap 都是 Compaction 操作的产物,等我们讲到 Cleaner 的时候再说。
-delete 则是应用于文件夹的。当你删除一个主题的时候,主题的分区文件夹会被加上这个后缀。
-future 是用于变更主题分区文件夹地址的,属于比较高阶的用法。
总之,记住这些常量吧。记住它们的主要作用是,以后不要被面试官唬住!开玩笑,其实这些常量最重要的地方就在于,它们能够让你了解 Kafka 定义的各种文件类型。Log Object 还定义了超多的工具类方法。由于它们都很简单,这里我只给出一个方法的源码,我们一起读一下。
1 def filenamePrefixFromOffset(offset: Long): String = { 2 val nf = NumberFormat.getInstance() 3 nf.setMinimumIntegerDigits(20) 4 nf.setMaximumFractionDigits(0) 5 nf.setGroupingUsed(false) 6 nf.format(offset) 7 }
这个方法的作用是通过给定的位移值计算出对应的日志段文件名。Kafka 日志文件固定是 20 位的长度,filenamePrefixFromOffset 方法就是用前面补 0 的方式,把给定位移值扩充成一个固定 20 位长度的字符串。
下面我们来看 Log 源码部分的重头戏:Log 类。这是一个 2000 多行的大类。放眼整个 Kafka 源码,像 Log 这么大的类也不多见,足见它的重要程度。我们先来看这个类的定义:
1 class Log(@volatile var dir: File, 2 @volatile var config: LogConfig, 3 @volatile var logStartOffset: Long, 4 @volatile var recoveryPoint: Long, 5 scheduler: Scheduler, 6 brokerTopicStats: BrokerTopicStats, 7 val time: Time, 8 val maxProducerIdExpirationMs: Int, 9 val producerIdExpirationCheckIntervalMs: Int, 10 val topicPartition: TopicPartition, 11 val producerStateManager: ProducerStateManager, 12 logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { 13 …… 14 }
看着好像有很多属性,但其实,你只需要记住两个属性的作用就够了:dir 和 logStartOffset。dir 就是这个日志所在的文件夹路径,也就是主题分区的路径。而 logStartOffset,表示日志的当前最早位移。dir 和 logStartOffset 都是 volatile var 类型,表示它们的值是变动的,而且可能被多个线程更新。你可能听过日志的当前末端位移,也就是 Log End Offset(LEO),它是表示日志下一条待插入消息的位移值,而这个 Log Start Offset 是跟它相反的,它表示日志当前对外可见的最早一条消息的位移值。我用一张图来标识它们的区别:
有意思的是,Log End Offset 可以简称为 LEO,但 Log Start Offset 却不能简称为 LSO。因为在 Kafka 中,LSO 特指 Log Stable Offset,属于 Kafka 事务的概念。
其实,除了 Log 类签名定义的这些属性之外,Log 类还定义了一些很重要的属性,比如下面这段代码:
1 @volatile private var nextOffsetMetadata: LogOffsetMetadata = _ 2 @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset) 3 private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] 4 @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
第一个属性 nextOffsetMetadata,它封装了下一条待插入消息的位移值,你基本上可以把这个属性和 LEO 等同起来。
第二个属性 highWatermarkMetadata,是分区日志高水位值。
第三个属性 segments,我认为这是 Log 类中最重要的属性。它保存了分区日志下所有的日志段信息,只不过是用 Map 的数据结构来保存的。Map 的 Key 值是日志段的起始位移值,Value 则是日志段对象本身。Kafka 源码使用 ConcurrentNavigableMap 数据结构来保存日志段对象,就可以很轻松地利用该类提供的线程安全和各种支持排序的方法,来管理所有日志段对象。
第四个属性是 Leader Epoch Cache 对象。Leader Epoch 是社区于 0.11.0.0 版本引入源码中的,主要是用来判断出现 Failure 时是否执行日志截断操作(Truncation)。之前靠高水位来判断的机制,可能会造成副本间数据不一致的情形。这里的 Leader Epoch Cache 是一个缓存类数据,里面保存了分区 Leader 的 Epoch 值与对应位移值的映射关系,我建议你查看下 LeaderEpochFileCache 类,深入地了解下它的实现原理.
LOG对象基础操作
我一般习惯把 Log 的常见操作分为 4 大部分。高水位管理操作:高水位的概念在 Kafka 中举足轻重,对它的管理,是 Log 最重要的功能之一。日志段管理:Log 是日志段的容器。高效组织与管理其下辖的所有日志段对象,是源码要解决的核心问题。关键位移值管理:日志定义了很多重要的位移值,比如 Log Start Offset 和 LEO 等。确保这些位移值的正确性,是构建消息引擎一致性的基础。读写操作:所谓的操作日志,大体上就是指读写日志。读写操作的作用之大,不言而喻。
高水位管理操作
源码中日志对象定义高水位的语句只有一行:
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
这行语句传达了两个重要的事实:高水位值是 volatile(易变型)的。因为多个线程可能同时读取它,因此需要设置成 volatile,保证内存可见性。另外,由于高水位值可能被多个线程同时修改,因此源码使用 Java Monitor 锁来确保并发修改的线程安全。高水位值的初始值是 Log Start Offset 值。每个 Log 对象都会维护一个 Log Start Offset 值。当首次构建高水位时,它会被赋值成 Log Start Offset 值。你可能会关心 LogOffsetMetadata 是什么对象。因为它比较重要,我们一起来看下这个类的定义:
1 case class LogOffsetMetadata(messageOffset: Long, 2 segmentBaseOffset: Long = Log.UnknownOffset, relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition)
显然,它就是一个 POJO 类,里面保存了三个重要的变量。
messageOffset:消息位移值,这是最重要的信息。我们总说高水位值,其实指的就是这个变量的值。
segmentBaseOffset:保存该位移值所在日志段的起始位移。日志段起始位移值辅助计算两条消息在物理磁盘文件中位置的差值,即两条消息彼此隔了多少字节。这个计算有个前提条件,即两条消息必须处在同一个日志段对象上,不能跨日志段对象。否则它们就位于不同的物理文件上,计算这个值就没有意义了。这里的 segmentBaseOffset,就是用来判断两条消息是否处于同一个日志段的。
relativePositionSegment:保存该位移值所在日志段的物理磁盘位置。这个字段在计算两个位移值之间的物理磁盘位置差值时非常有用。你可以想一想,Kafka 什么时候需要计算位置之间的字节数呢?答案就是在读取日志的时候。假设每次读取时只能读 1MB 的数据,那么,源码肯定需要关心两个位移之间所有消息的总字节数是否超过了 1MB。
LogOffsetMetadata 类的所有方法,都是围绕这 3 个变量展开的工具辅助类方法,非常容易理解。我会给出一个方法的详细解释,剩下的你可以举一反三。
1 def onSameSegment(that: LogOffsetMetadata): Boolean = { 2 if (messageOffsetOnly) 3 throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info") 4 5 this.segmentBaseOffset == that.segmentBaseOffset 6 }
看名字我们就知道了,这个方法就是用来判断给定的两个 LogOffsetMetadata 对象是否处于同一个日志段的。判断方法很简单,就是比较两个 LogOffsetMetadata 对象的 segmentBaseOffset 值是否相等。
获取和设置高水位值,关于获取高水位值的方法,其实很好理解,我就不多说了。设置高水位值的方法,也就是 Setter 方法更复杂一些,为了方便你理解,我用注释的方式来解析它的作用。
1 // getter method:读取高水位的位移值 2 def highWatermark: Long = highWatermarkMetadata.messageOffset 3 4 // setter method:设置高水位值 5 private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = { 6 if (newHighWatermark.messageOffset < 0) // 高水位值不能是负数 7 throw new IllegalArgumentException("High watermark offset should be non-negative") 8 9 lock synchronized { // 保护Log对象修改的Monitor锁 10 highWatermarkMetadata = newHighWatermark // 赋值新的高水位值 11 producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset) // 处理事务状态管理器的高水位值更新逻辑,忽略它…… 12 maybeIncrementFirstUnstableOffset() // First Unstable Offset是Kafka事务机制的一部分,忽略它…… 13 } 14 trace(s"Setting high watermark $newHighWatermark") 15 }
更新高水位值,除此之外,源码还定义了两个更新高水位值的方法:updateHighWatermark 和 maybeIncrementHighWatermark。从名字上来看,前者是一定要更新高水位值的,而后者是可能会更新也可能不会。我们分别看下它们的实现原理。
其实,这两个方法有着不同的用途。updateHighWatermark 方法,主要用在 Follower 副本从 Leader 副本获取到消息后更新高水位值。一旦拿到新的消息,就必须要更新高水位值;而 maybeIncrementHighWatermark 方法,主要是用来更新 Leader 副本的高水位值。需要注意的是,Leader 副本高水位值的更新是有条件的——某些情况下会更新高水位值,某些情况下可能不会。就像我刚才说的,Follower 副本成功拉取 Leader 副本的消息后必须更新高水位值,但 Producer 端向 Leader 副本写入消息时,分区的高水位值就可能不需要更新——因为它可能需要等待其他 Follower 副本同步的进度。因此,源码中定义了两个更新的方法,它们分别应用于不同的场景。
读取高水位值关于高水位值管理的最后一个操作是 fetchHighWatermarkMetadata 方法。它不仅仅是获取高水位值,还要获取高水位的其他元数据信息,即日志段起始位移和物理位置信息。
日志段管理
前面我反复说过,日志是日志段的容器,那它究竟是如何承担起容器一职的呢?
1 private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
可以看到,源码使用 Java 的 ConcurrentSkipListMap 类来保存所有日志段对象。ConcurrentSkipListMap 有 2 个明显的优势。它是线程安全的,这样 Kafka 源码不需要自行确保日志段操作过程中的线程安全;它是键值(Key)可排序的 Map。Kafka 将每个日志段的起始位移值作为 Key,这样一来,我们就能够很方便地根据所有日志段的起始位移值对它们进行排序和比较,同时还能快速地找到与给定位移值相近的前后两个日志段。
关键位移值管理
Log 对象维护了一些关键位移值数据,比如 Log Start Offset、LEO 等。其实,高水位值也算是关键位移值,只不过它太重要了,所以,我单独把它拎出来作为独立的一部分来讲了。
Log 对象中的 LEO 永远指向下一条待插入消息,也就是说,LEO 值上面是没有消息的!源码中定义 LEO 的语句很简单:这里的 nextOffsetMetadata 就是我们所说的 LEO,它也是 LogOffsetMetadata 类型的对象。Log 对象初始化的时候,源码会加载所有日志段对象,并由此计算出当前 Log 的下一条消息位移值。之后,Log 对象将此位移值赋值给 LEO。
实际上,LEO 对象被更新的时机有 4 个。Log 对象初始化时:当 Log 对象初始化时,我们必须要创建一个 LEO 对象,并对其进行初始化。写入新消息时:这个最容易理解。以上面的图为例,当不断向 Log 对象插入新消息时,LEO 值就像一个指针一样,需要不停地向右移动,也就是不断地增加。Log 对象发生日志切分(Log Roll)时:日志切分是啥呢?其实就是创建一个全新的日志段对象,并且关闭当前写入的日志段对象。这通常发生在当前日志段对象已满的时候。一旦发生日志切分,说明 Log 对象切换了 Active Segment,那么,LEO 中的起始位移值和段大小数据都要被更新,因此,在进行这一步操作时,我们必须要更新 LEO 对象。日志截断(Log Truncation)时:这个也是显而易见的。日志中的部分消息被删除了,自然可能导致 LEO 值发生变化,从而要更新 LEO 对象。
现在,我们再来思考一下,Kafka 什么时候需要更新 Log Start Offset 呢?我们一一来看下。Log 对象初始化时:和 LEO 类似,Log 对象初始化时要给 Log Start Offset 赋值,一般是将第一个日志段的起始位移值赋值给它。日志截断时:同理,一旦日志中的部分消息被删除,可能会导致 Log Start Offset 发生变化,因此有必要更新该值。Follower 副本同步时:一旦 Leader 副本的 Log 对象的 Log Start Offset 值发生变化。为了维持和 Leader 副本的一致性,Follower 副本也需要尝试去更新该值。删除日志段时:这个和日志截断是类似的。凡是涉及消息删除的操作都有可能导致 Log Start Offset 值的变化。删除消息时:严格来说,这个更新时机有点本末倒置了。在 Kafka 中,删除消息就是通过抬高 Log Start Offset 值来实现的,因此,删除消息时必须要更新该值。
读写操作
最后,我重点说说针对 Log 对象的读写操作。
写操作
在 Log 中,涉及写操作的方法有 3 个:appendAsLeader、appendAsFollower 和 append。appendAsLeader 是用于写 Leader 副本的,appendAsFollower 是用于 Follower 副本同步的。它们的底层都调用了 append 方法。
Kafka 消息格式经历了两次大的变迁,目前是 0.11.0.0 版本引入的 Version 2 消息格式。我们没有必要详细了解这些格式的变迁,你只需要知道,在 0.11.0.0 版本之后,lastOffset 和 lastOffsetOfFirstBatch 都是指向消息集合的最后一条消息即可。它们的区别主要体现在 0.11.0.0 之前的版本。
读操作
read 方法的流程相对要简单一些,首先来看它的方法签名:
1 def read(startOffset: Long, 2 maxLength: Int, 3 isolation: FetchIsolation, 4 minOneMessage: Boolean): FetchDataInfo = { 5 ...... 6 }
它接收 4 个参数,含义如下:startOffset,即从 Log 对象的哪个位移值开始读消息。maxLength,即最多能读取多少字节。isolation,设置读取隔离级别,主要控制能够读取的最大位移值,多用于 Kafka 事务。minOneMessage,即是否允许至少读一条消息。设想如果消息很大,超过了 maxLength,正常情况下 read 方法永远不会返回任何消息。但如果设置了该参数为 true,read 方法就保证至少能够返回一条消息。read 方法的返回值是 FetchDataInfo 类,也是一个 POJO 类,里面最重要的数据就是读取的消息集合,其他数据还包括位移等元数据信息。
日志中的索引应用
在 Kafka 源码中,跟索引相关的源码文件有 5 个,它们都位于 core 包的 /src/main/scala/kafka/log 路径下。我们一一来看下。AbstractIndex.scala:它定义了最顶层的抽象类,这个类封装了所有索引类型的公共操作。LazyIndex.scala:它定义了 AbstractIndex 上的一个包装类,实现索引项延迟加载。这个类主要是为了提高性能。OffsetIndex.scala:定义位移索引,保存“< 位移值,文件磁盘物理位置 >”对。TimeIndex.scala:定义时间戳索引,保存“< 时间戳,位移值 >”对。TransactionIndex.scala:定义事务索引,为已中止事务(Aborted Transcation)保存重要的元数据信息。只有启用 Kafka 事务后,这个索引才有可能出现。这些类的关系如下图所示:
其中,OffsetIndex、TimeIndex 和 TransactionIndex 都继承了 AbstractIndex 类,而上层的 LazyIndex 仅仅是包装了一个 AbstractIndex 的实现类,用于延迟加载。就像我之前说的,LazyIndex 的作用是为了提升性能,并没有什么功能上的改进。
AbstractIndex 代码结构
AbstractIndex 定义了 4 个属性字段。由于是一个抽象基类,它的所有子类自动地继承了这 4 个字段。也就是说,Kafka 所有类型的索引对象都定义了这些属性。我先给你解释下这些属性的含义。
索引文件(file)。每个索引对象在磁盘上都对应了一个索引文件。你可能注意到了,这个字段是 var 型,说明它是可以被修改的。难道索引对象还能动态更换底层的索引文件吗?是的。自 1.1.0 版本之后,Kafka 允许迁移底层的日志路径,所以,索引文件自然要是可以更换的。
起始位移值(baseOffset)。索引对象对应日志段对象的起始位移值。举个例子,如果你查看 Kafka 日志路径的话,就会发现,日志文件和索引文件都是成组出现的。比如说,如果日志文件是 00000000000000000123.log,正常情况下,一定还有一组索引文件 00000000000000000123.index、00000000000000000123.timeindex 等。这里的“123”就是这组文件的起始位移值,也就是 baseOffset 值。
索引文件最大字节数(maxIndexSize)。它控制索引文件的最大长度。Kafka 源码传入该参数的值是 Broker 端参数 segment.index.bytes 的值,即 10MB。这就是在默认情况下,所有 Kafka 索引文件大小都是 10MB 的原因。
索引文件打开方式(writable)。“True”表示以“读写”方式打开,“False”表示以“只读”方式打开。如果我没记错的话,这个参数应该是我加上去的,就是为了修复我刚刚提到的那个 Bug。
AbstractIndex 是抽象的索引对象类。可以说,它是承载索引项的容器,而每个继承它的子类负责定义具体的索引项结构。比如,OffsetIndex 的索引项是 < 位移值,物理磁盘位置 > 对,TimeIndex 的索引项是 < 时间戳,位移值 > 对。基于这样的设计理念,AbstractIndex 类中定义了一个抽象方法 entrySize 来表示不同索引项的大小。
子类实现该方法时需要给定自己索引项的大小,对于 OffsetIndex 而言,该值就是 8;对于 TimeIndex 而言,该值是 12。说到这儿,你肯定会问,为什么是 8 和 12 呢?我来解释一下。在 OffsetIndex 中,位移值用 4 个字节来表示,物理磁盘位置也用 4 个字节来表示,所以总共是 8 个字节。你可能会说,位移值不是长整型吗,应该是 8 个字节才对啊。还记得 AbstractIndex 已经保存了 baseOffset 了吗?这里的位移值,实际上是相对于 baseOffset 的相对位移值,即真实位移值减去 baseOffset 的值。下节课我会给你重点讲一下它,这里你只需要知道使用相对位移值能够有效地节省磁盘空间就行了。而 Broker 端参数 log.segment.bytes 是整型,这说明,Kafka 中每个日志段文件的大小不会超过 2^32,即 4GB,这就说明同一个日志段文件上的位移值减去 baseOffset 的差值一定在整数范围内。因此,源码只需要 4 个字节保存就行了。同理,TimeIndex 中的时间戳类型是长整型,占用 8 个字节,位移依然使用相对位移值,占用 4 个字节,因此总共需要 12 个字节。
如果有人问你,Kafka 中的索引底层的实现原理是什么?你可以大声地告诉他:内存映射文件,即 Java 中的 MappedByteBuffer。使用内存映射文件的主要优势在于,它有很高的 I/O 性能,特别是对于索引这样的小文件来说,由于文件内存被直接映射到一段虚拟内存上,访问内存映射文件的速度要快于普通的读写文件速度。另外,在很多操作系统中(比如 Linux),这段映射的内存区域实际上就是内核的页缓存(Page Cache)。这就意味着,里面的数据不需要重复拷贝到用户态空间,避免了很多不必要的时间、空间消耗。在 AbstractIndex 中,这个 MappedByteBuffer 就是名为 mmap 的变量。
二分查找算法
到目前为止,从已排序数组中寻找某个数字最快速的算法就是二分查找了,它能做到 O(lgN) 的时间复杂度。Kafka 的索引组件就应用了二分查找算法。我先给出原版的实现算法代码。
1 private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = { 2 // 第1步:如果当前索引为空,直接返回<-1,-1>对 3 if(_entries == 0) 4 return (-1, -1) 5 6 7 // 第2步:要查找的位移值不能小于当前最小位移值 8 if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) 9 return (-1, 0) 10 11 12 // binary search for the entry 13 // 第3步:执行二分查找算法 14 var lo = 0 15 var hi = _entries - 1 16 while(lo < hi) { 17 val mid = ceil(hi/2.0 + lo/2.0).toInt 18 val found = parseEntry(idx, mid) 19 val compareResult = compareIndexEntry(found, target, searchEntity) 20 if(compareResult > 0) 21 hi = mid - 1 22 else if(compareResult < 0) 23 lo = mid 24 else 25 return (mid, mid) 26 } 27 28 29 (lo, if (lo == _entries - 1) -1 else lo + 1)
这段代码的核心是,第 3 步的二分查找算法。熟悉 Binary Search 的话,你对这段代码一定不会感到陌生。讲到这里,似乎一切很完美:Kafka 索引应用二分查找算法快速定位待查找索引项位置,之后调用 parseEntry 来读取索引项。不过,这真的就是无懈可击的解决方案了吗?
改进版二分查找算法
大多数操作系统使用页缓存来实现内存映射,而目前几乎所有的操作系统都使用 LRU(Least Recently Used)或类似于 LRU 的机制来管理页缓存。Kafka 写入索引文件的方式是在文件末尾追加写入,而几乎所有的索引查询都集中在索引的尾部。这么来看的话,LRU 机制是非常适合 Kafka 的索引访问场景的。但,这里有个问题是,当 Kafka 在查询索引的时候,原版的二分查找算法并没有考虑到缓存的问题,因此很可能会导致一些不必要的缺页中断(Page Fault)。此时,Kafka 线程会被阻塞,等待对应的索引项从物理磁盘中读出并放入到页缓存中。下面我举个例子来说明一下这个情况。假设 Kafka 的某个索引占用了操作系统页缓存 13 个页(Page),如果待查找的位移值位于最后一个页上,也就是 Page 12,那么标准的二分查找算法会依次读取页号 0、6、9、11 和 12。
通常来说,一个页上保存了成百上千的索引项数据。随着索引文件不断被写入,Page #12 不断地被填充新的索引项。如果此时索引查询方都来自 ISR 副本或 Lag 很小的消费者,那么这些查询大多集中在对 Page #12 的查询,因此,Page #0、6、9、11、12 一定经常性地被源码访问。也就是说,这些页一定保存在页缓存上。后面当新的索引项填满了 Page #12,页缓存就会申请一个新的 Page 来保存索引项,即 Page #13。现在,最新索引项保存在 Page #13 中。如果要查找最新索引项,原版二分查找算法将会依次访问 Page #0、7、10、12 和 13。此时,问题来了:Page 7 和 10 已经很久没有被访问过了,它们大概率不在页缓存中,因此,一旦索引开始征用 Page #13,就会发生 Page Fault,等待那些冷页数据从磁盘中加载到页缓存。根据国外用户的测试,这种加载过程可能长达 1 秒。
显然,这是一个普遍的问题,即每当索引文件占用 Page 数发生变化时,就会强行变更二分查找的搜索路径,从而出现不在页缓存的冷数据必须要加载到页缓存的情形,而这种加载过程是非常耗时的。基于这个问题,社区提出了改进版的二分查找策略,也就是缓存友好的搜索算法。总体的思路是,代码将所有索引项分成两个部分:热区(Warm Area)和冷区(Cold Area),然后分别在这两个区域内执行二分查找算法。
乍一看,该算法并没有什么高大上的改进,仅仅是把搜寻区域分成了冷、热两个区域,然后有条件地在不同区域执行普通的二分查找算法罢了。实际上,这个改进版算法提供了一个重要的保证:它能保证那些经常需要被访问的 Page 组合是固定的。想想刚才的例子,同样是查询最热的那部分数据,一旦索引占用了更多的 Page,要遍历的 Page 组合就会发生变化。这是导致性能下降的主要原因。这个改进版算法的最大好处在于,查询最热那部分数据所遍历的 Page 永远是固定的,因此大概率在页缓存中,从而避免无意义的 Page Fault。
位移索引和时间戳索引
Kafka 索引类型有三大类:位移索引、时间戳索引和已中止事务索引。相比于最后一类索引,前两类索引的出镜率更高一些。在 Kafka 的数据路径下,你肯定看到过很多.index 和.timeindex 后缀的文件。不知你是否有过这样的疑问:“这些文件是用来做什么的呢?” 现在我可以明确告诉你:.index 文件就是 Kafka 中的位移索引文件,而.timeindex 文件则是时间戳索引文件。
位移索引
位移索引也就是所谓的 OffsetIndex,它可是一个老资历的组件了。如果我没记错的话,国内大面积使用 Kafka 应该是在 0.8 时代。从那个时候开始,OffsetIndex 就已经存在了。每当 Consumer 需要从主题分区的某个位置开始读取消息时,Kafka 就会用到 OffsetIndex 直接定位物理文件位置,从而避免了因为从头读取消息而引入的昂贵的 I/O 操作。不同索引类型保存不同的 <key, value="">对。就 OffsetIndex 而言,Key 就是消息的相对位移,Value 是保存该消息的日志段文件中该消息第一个字节的物理文件位置。
为什么是 8 呢?相对位移是一个整型(Integer),占用 4 个字节,物理文件位置也是一个整型,同样占用 4 个字节,因此总共是 8 个字节。那相对位移是什么值呢?我们知道,Kafka 中的消息位移值是一个长整型(Long),应该占用 8 个字节才对。在保存 OffsetIndex 的 <key, value="">对时,Kafka 做了一些优化。每个 OffsetIndex 对象在创建时,都已经保存了对应日志段对象的起始位移,因此,OffsetIndex 索引项没必要保存完整的 8 字节位移值。相反地,它只需要保存与起始位移的差值(Delta)就够了,而这个差值是可以被整型容纳的。这种设计可以让 OffsetIndex 每个索引项都节省 4 个字节。
当读取 OffsetIndex 时,源码还需要将相对位移值还原成之前的完整位移。这个是在 parseEntry 方法中实现的。
这个方法返回一个 OffsetPosition 类型。该类有两个方法,分别返回索引项的 Key 和 Value。这里的 parseEntry 方法,就是要构造 OffsetPosition 所需的 Key 和 Value。Key 是索引项中的完整位移值,代码使用 baseOffset + relativeOffset(buffer, n) 的方式将相对位移值还原成完整位移值;Value 是这个位移值上消息在日志段文件中的物理位置,代码调用 physical 方法计算这个物理位置并把它作为 Value。最后,parseEntry 方法把 Key 和 Value 封装到一个 OffsetPosition 实例中,然后将这个实例返回。
写入索引项好了,有了这些基础,下面的内容就很容易理解了。我们来看下 OffsetIndex 中最重要的操作——写入索引项 append 方法的实现。
1 def append(offset: Long, position: Int): Unit = { 2 inLock(lock) { 3 // 索引文件如果已经写满,直接抛出异常 4 require(!isFull, "Attempt to append to a full index (size = " + _entries + ").") 5 // 要保证待写入的位移值offset比当前索引文件中所有现存的位移值都要大 6 // 这主要是为了维护索引的单调增加性 7 if (_entries == 0 || offset > _lastOffset) { 8 trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}") 9 mmap.putInt(relativeOffset(offset)) // 向mmap写入相对位移值 10 mmap.putInt(position) // 向mmap写入物理文件位置 11 _entries += 1 // 更新索引项个数 12 _lastOffset = offset // 更新当前索引文件最大位移值 13 // 确保写入索引项格式符合要求 14 require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.") 15 } else { 16 throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" + 17 s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.") 18 } 19 } 20 }
append 方法接收两个参数:Long 型的位移值和 Integer 型的物理文件位置。该方法最重要的两步,就是分别向 mmap 写入相对位移值和物理文件位置。除了 append 方法,索引还有一个常见的操作:截断操作(Truncation)。截断操作是指,将索引文件内容直接裁剪掉一部分。比如,OffsetIndex 索引文件中当前保存了 100 个索引项,我想只保留最开始的 40 个索引项。
这个方法接收 entries 参数,表示要截取到哪个槽,主要的逻辑实现是调用 mmap 的 position 方法。源码中的 _entries * entrySize 就是 mmap 要截取到的字节处。下面,我来说说 OffsetIndex 的使用方式。既然 OffsetIndex 被用来快速定位消息所在的物理文件位置,那么必然需要定义一个方法执行对应的查询逻辑。这个方法就是 lookup。
方法返回的,是不大于给定位移值 targetOffset 的最大位移值,以及对应的物理文件位置。你大致可以把这个方法,理解为位移值的 FLOOR 函数。
时间戳索引
与 OffsetIndex 不同的是,TimeIndex 保存的是 < 时间戳,相对位移值 > 对。时间戳需要一个长整型来保存,相对位移值使用 Integer 来保存。因此,TimeIndex 单个索引项需要占用 12 个字节。这也揭示了一个重要的事实:在保存同等数量索引项的基础上,TimeIndex 会比 OffsetIndex 占用更多的磁盘空间。
写入索引项TimeIndex 也有 append 方法,只不过它叫作 maybeAppend。我们来看下它的实现逻辑。
和 OffsetIndex 类似,向 TimeIndex 写入索引项的主体逻辑,是向 mmap 分别写入时间戳和相对位移值。只不过,除了校验位移值的单调增加性之外,TimeIndex 还会确保顺序写入的时间戳也是单调增加的。
我带你详细分析了 OffsetIndex 和 TimeIndex,以及它们的不同之处。虽然 OffsetIndex 和 TimeIndex 是不同类型的索引,但 Kafka 内部是把二者结合使用的。通常的流程是,先使用 TimeIndex 寻找满足时间戳要求的消息位移值,然后再利用 OffsetIndex 去定位该位移值所在的物理文件位置。因此,它们其实是合作的关系。最后,我还想提醒你一点:不要对索引文件做任何修改!我碰到过因用户擅自重命名索引文件,从而导致 Broker 崩溃无法启动的场景。另外,虽然 Kafka 能够重建索引,但是随意地删除索引文件依然是一个很危险的操作。在生产环境中,我建议你尽量不要执行这样的操作。
总结
以后关于kafka系列的总结大部分来自Geek Time的课件,大家可以自行关键字搜索。