zoukankan      html  css  js  c++  java
  • Kafka如何加载日志段

    kafka日志作为日志段的容器,重点分析kafka日志是如何加载日志段。

    Log源码结构

      Log源码位于kakfa core工程的log包下,对应的文件名为Log.scala。文件中中包含了与log有关的10个class或者object,见下图所示。

      

      模块概述

      LogAppendInfo(class)

        保存了一组待写入消息的各种元数据信息,包含位移值或者最大消息的时间戳等,后续会进行详细介绍

      LogAppendInfo(object)

        我们可以理解为定义了一些工厂方法,用来创建LogAppendInfo对象

      Log(class)

        Log源码中最最核心的代码

      Log(object)

        Log伴生类的工厂方法,里面有很多的辅助方法

      RollParams(C)

        定义了日志段是否需要切分的数据结构

      RollParams(object)

        RollParams的伴生类

      LogMetricNames

        定义了Log对象的监控指标

      LogOffsetSnapshot

        封装分区所有位移元数据的容器类

      LogReadInfo

        封装读取日志返回的数据和元数据

      CompletedTxn

        记录已完成事务的元数据,用来构建事务索引

      Log Class&Log Object

        下面我会一一介绍他们,我们都知道Scala的派生类一般存储的是静态变量或者静态工厂方法等,考虑到派生类的阅读比较容易,我们先分析下Log Object相关信息。我们先看下里面都定义了那些常量,见下图所示

        

    object Log {
    
      /** a log file */
      val LogFileSuffix = ".log"
    
      /** an index file */
      val IndexFileSuffix = ".index"
    
      /** a time index file */
      val TimeIndexFileSuffix = ".timeindex"
    
      val ProducerSnapshotFileSuffix = ".snapshot"
    
      /** an (aborted) txn index */
      val TxnIndexFileSuffix = ".txnindex"
    
      /** a file that is scheduled to be deleted */
      val DeletedFileSuffix = ".deleted"
    
      /** A temporary file that is being used for log cleaning */
      val CleanedFileSuffix = ".cleaned"
    
      /** A temporary file used when swapping files into the log */
      val SwapFileSuffix = ".swap"
    
      /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher.
       * This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be
       * avoided by passing in the recovery point, however finding the correct position to do this
       * requires accessing the offset index which may not be safe in an unclean shutdown.
       * For more information see the discussion in PR#2104
       */
      val CleanShutdownFile = ".kafka_cleanshutdown"
    
      /** a directory that is scheduled to be deleted */
      val DeleteDirSuffix = "-delete"
    
      /** a directory that is used for future partition */
      val FutureDirSuffix = "-future"
    }
    

      我们发现这些常量 中包含了很多种的文件类型,除了大家熟知的index、timeindex、txnindex、log等,还有不认识的其他文件类型,我们对这些不认识的进行汇总说明。

      snapshot:是kafka对幂等型或者事务型producer所生成的快照文件。事务型幂等先不做介绍。

      deleted:删除日志段操作时所创建的文件,目前删除日志段文件的操作是异步的,

      cleaned和swarp是compaction操作的产物,后续会进行解释

      delete 是应用于文件夹的,当我们删除一个主题的时候,主题的分区文件夹会被加上这个后缀

      当然Log object中还定义了很多的工具类方法,比如下图,基于偏移量计算出日志段的文件名,kafak日志段的固定长度是20位。

      

    /**
       * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
       * so that ls sorts the files numerically.
       *
       * @param offset The offset to use in the file name
       * @return The filename
       */
      def filenamePrefixFromOffset(offset: Long): String = {
        val nf = NumberFormat.getInstance()
        nf.setMinimumIntegerDigits(20)
        nf.setMaximumFractionDigits(0)
        nf.setGroupingUsed(false)
        nf.format(offset)
      }
    

      Log class 详细介绍

       Log源码中最重要的就是这个Log class了,代码量还是比较大的,我们先从这个类定义的入参开始讲起,见下图所示。

      

    class Log(@volatile private var _dir: File,
              @volatile var config: LogConfig,
              @volatile var logStartOffset: Long,
              @volatile var recoveryPoint: Long,
              scheduler: Scheduler,
              brokerTopicStats: BrokerTopicStats,
              val time: Time,
              val maxProducerIdExpirationMs: Int,
              val producerIdExpirationCheckIntervalMs: Int,
              val topicPartition: TopicPartition,
              val producerStateManager: ProducerStateManager,
              logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
            .......
    }    
    

      我们发现这里面的字段比较多,重点关注以下两个字段,分别是dir和logStartOffset。

      dir是主题分区的路径

      logStartOffset是日志的最早的位移

      此外我们还需要关注一下几个属性

      nextOffsetmetadata:封装了下一条待插入消息的位移值

      highWatermarkMetadata:用来区分日志的高水位值,即已提交事务和未提交事务的分界

      segment:保存了分区日志下所有的日志段信息,Map结构,key是日志段的起始位移,value是日志段本身对象

      大概了解了上文之后,我们一起看看Log类是如何初始化的。

  • 相关阅读:
    storm 学习教程
    Scala 面向接口编程
    Scala 继承
    IntelliJ IDEA 代码检查规范QAPlug
    Spark入门实战系列
    IntelliJ Idea 常用快捷键 列表(实战终极总结!!!!)
    使用DOM解析XML文档
    栈结构Stack
    队列Queue ,双端队列Deque
    集合转换为数组toArray(),数组转换为集合asList()
  • 原文地址:https://www.cnblogs.com/cnxieyang/p/12731629.html
Copyright © 2011-2022 走看看