zoukankan      html  css  js  c++  java
  • 【原创】Kakfa utils源代码分析(一)

    Kafka.utils,顾名思义,就是一个工具套件包,里面的类封装了很多常见的功能实现——说到这里,笔者有一个感触:当初为了阅读Kafka源代码而学习了Scala语言,本以为Kafka的实现会用到很多函数编程(Functional Programming, FP),结果目前来看,大部分还是很朴素地以面向对象的方式来实现的,只有很少一部分集合的处理使用诸如map,reduce这样的FP方式。不能不说有点小小的遗憾。——当然也许后面Kafka的核心代码中会看到更多FP的身影。

    下图就是kafka.utils包的所有代码:
    因为很难像其他包代码之间有逻辑关系,我们就一个一个说吧:
    一、Annotations.scala
    这个源代码文件中定义了3个注释类:threadsafe、nonthreadsafe和immutable。它们都继承了StaticAnnotation——Scala提供的StaticAnnotation类似于Java中的@Target(ElementType.TYPE),因此主要的作用域是类和接口。具体到这三个元注解(meta-annotation),很容易知道它们的含义:分别标记线程安全、非线程安全和不可变性。Kafka开发中常用到的SimpleConsumer类就是被标记为@threadsafe的。
    二. CommandLineUtils.scala
    这个文件使用JOpt Simple库负责解析命令行参数,具体使用用法参见官网:http://pholser.github.io/jopt-simple/
    Kafka在这个文件中提供了一个object:CommandLineUtils。具体包含的方法有:
    1. printUsageAndDie: 打印命令使用方法并终止程序
    2. checkRequiredArgs:使用Jopts Simple的API(以下皆同)检查是否缺少必要参数
    3. checkInvalidArgs:检查指定的参数是否存在不兼容情况,即哪些参数不能同时使用
    4. parseKeyValueArgs:解析key=value格式的参数对,并返回一个Properties对象
    三、Crc32.scala
    这个类就是CRC32校验码的实现类,来自于Hadoop提供的PureJavaCrc32类——CRC32校验码的纯Java实现版本。这个类很长,里面有很多位操作,由于CRC32计算不在本次研究范围,所以就了解到这吧。
    四、DelayedItem.scala
    这个类是个泛型类,实现了java.util.Delayed接口。用于标记那些在给定延迟时间之后执行的对象。该类接收一个泛型T,一个延迟时间以及延迟时间的单位。另外,实现这个接口的话必须要实现一个compareTo和getDelay方法。
    1. getDelay: 计算距离触发时间还剩下多长时间
    2. compareTo: 比较2个Delayed对象的延迟触发时间
    五、FileLock.scala
    顾名思义,FileLock就是一个文件锁,它的构造函数接收一个文件对象,并总是先尝试创建这个文件(如果不存在的话),然后创建一个FileChannel对象对该文件进行随机读写操作。同时创建一个java.nio.channel.FlieLock文件锁对象用于实现下面的方法:
    1. lock: 对文件加锁,如果该文件上已有锁抛出异常
    2. tryLock: 尝试对文件加锁,如果成功返回true,否则返回false
    3. unlock: 如果持有锁使用FileLock.release方法释放锁
    4. destroy: 先释放锁然后调用FileChannel的close方法销毁该channel
    六、IteratorTemplate.scala
    这个文件视图定义一个迭代器模板,主要为遍历消息集合使用。迭代器模板有一个状态字段,因此在定义迭代器模板抽象类之前首先定义了一个State状态object,以及一组具体的状态object:完成(DONE),READY(准备就绪),NOT_READY(未准备)和FAILED(失败)。
    之后就是定义IteratorTemplate抽象类了,它同时实现了trait Iterator和java Iterator接口——可谓迭代器领域的集大成者:)
     
    如前所述,该类有个字段表明了迭代器的状态:state,还有一个nextItem字段执行遍历中的下一个对象,当然初始化为null——说起null,想到一个题外话。我很怀疑Kafka的开发人员是深度的Java编程人员亦或是强面向对象开发人员,Scala推荐使用Option来代替null的,可Kafka的代码中null还是随处可见,当然可能也是为了更好更自然地与Java集成。
     
    这个抽象类提供很多方法,但似乎只有一个抽象方法:makeNext,其他全是具体方法:
    1. next:如果迭代器已遍历完并无法找到下一项或下一项为空,直接抛出异常;否则将状态置为NOT_READY并返回下一项
    2. peek:只是探查一下迭代器是否遍历完,如果是抛出异常,否则直接返回下一项,并不做非空判断,也不做状态设置
    3. hasNext: 如果状态为FAILED直接抛出异常,如果是DONE返回false,如果是READY返回true,否则调用maybeComputeNext方法
    4. makeNext: 返回下一项,这是你需要唯一需要实现的抽象方法。同时你还需要在该方法中对状态字段进行更新
    5. maybeComputeNext:调用makeNext获取到下一项,如果状态是DONE返回false,否则返回true并将状态置为READY
    6. allDone: 将状态置为DONE并返回null
    7. resetStatus:顾名思义,就是重置状态字段为NOT_READY
    七、JSON.scala
    JSON的一个封装类,用于JSON到String的相互转换,该类不是线程安全的。Scala提供的JSON是将数字型的字符串转化为Double,不过该类创建一个简单函数用于将数字型字符串转为换Integer,并指定其为JSON.globalNumberParser。该类只有2个方法:
    1. parseFull: 调用scala JSON的parseFull方法将一个json字符串转化为一个对象,如果出错则抛出异常
    2. encode: 讲一个对象编码成json字符串。这个对象只能是null,Boolean,String,Number,Map[String, T],Array[T]或Iterable[T]中的一种,否则会报错
  • 相关阅读:
    读懂diff
    Sqlite数据库的加密
    SQLite 数据类型
    SQLite 混合模式程序集是针对“v2.0.50727”版的运行时生成的,在没有配置其他信息的情况下,无法在 4.0 运行时中加载该程序集。
    datatable写入sqlite
    使用NSSM将exe封装为服务
    Java OPC client开发踩坑记
    最终解决:win10小娜无法使用(win10 win+q 无法搜索应用程序)
    OPC DA 客户端实例[.net]
    KepServer作为OPC UA服务器以及建立OPC UA客户端
  • 原文地址:https://www.cnblogs.com/huxi2b/p/4378439.html
Copyright © 2011-2022 走看看