zoukankan      html  css  js  c++  java
  • Kafka实现思路(转发)

    https://kafka.apachecn.org/documentation.html#networklayer

    5.1 网络层

    网络层相当于一个 NIO 服务,在此不在详细描述.

    sendfile(零拷贝) 的实现是通过 MessageSet 接口的 writeTo 方法完成的.这样的机制允许 file-backed 集使用更高效的 transferTo 实现,而不在使用进程内的写缓存.

    线程模型是一个单独的接受线程和 N 个处理线程,每个线程处理固定数量的连接.这种设计方式在其他地方经过大量的测试,发现它是实现简单而且快速的.

    协议保持简单以允许未来实现其他语言的客户端.

    5.2 消息

     消息包含一个可变长度的 header ,一个可变长度不透明的字节数组 key ,一个可变长度不透明的字节数组 value ,消息中 header 的格式会在下一节描述.

    保持消息中的 key 和 value 不透明(二进制格式)是正确的决定: 目前构建序列化库取得很大的进展,而且任何单一的序列化方式都不能满足所有的用途.

    毋庸置疑,使用kafka的特定应用程序可能会要求特定的序列化类型作为自己使用的一部分. 

    RecordBatch 接口就是一种简单的消息迭代器,它可以使用特定的方法批量读写消息到 NIO 的 Channel 中.

    5.3 消息格式

    消息通常按照批量的方式写入.record batch 是批量消息的技术术语,它包含一条或多条 records.

    不良情况下, record batch 只包含一条 record. Record batches 和 records 都有他们自己的 headers.

    在 kafka 0.11.0及后续版本中(消息格式的版本为 v2 或者 magic=2)解释了每种消息格式.点击查看消息格式详情.

    5.3.1 Record Batch

    以下为 RecordBatch 在硬盘上的格式.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    baseOffset: int64
    batchLength: int32
    partitionLeaderEpoch: int32
    magic: int8 (current magic value is 2)
    crc: int32
    attributes: int16
        bit 0~2:
            0: no compression
            1: gzip
            2: snappy
            3: lz4
        bit 3: timestampType
        bit 4: isTransactional (0 means not transactional)
        bit 5: isControlBatch (0 means not a control batch)
        bit 6~15: unused
    lastOffsetDelta: int32
    firstTimestamp: int64
    maxTimestamp: int64
    producerId: int64
    producerEpoch: int16
    baseSequence: int32
    records: [Record]
         

    请注意,启用压缩时,压缩的记录数据将直接按照记录数进行序列化。

    CRC(一种数据校验码) 会覆盖从属性到批处理结束的数据, (即 CRC 后的所有字节数据).

    CRC 位于 magic 之后,这意味着,在决定如何解释批次的长度和 magic 类型之前,客户端需要解析 magic 类型.

    CRC 计算不包括分区 learder epoch 字段,是为了避免 broker 收到每个批次的数据时 需要重新分配计算 CRC . CRC-32C (Castagnoli) 多项式用于计算.

    压缩: 不同于旧的消息格式, magic v2 及以上版本在清理日志时保留原始日志中首次及最后一次 offset/sequence .这是为了能够在日志重新加载时恢复生产者的状态.

    例如,如果我们不保留最后一次序列号,当分区 learder 失败以后,生产者会报 OutOfSequence 的错误.必须保留基础序列号来做重复检查(broker 通过检查生产者该批次请求中第一次及最后一次序列号是否与上一次的序列号相匹配来判断是否重复).因此,当批次中所有的记录被清理但批次数据依然保留是为了保存生产者最后一次的序列号,日志中可能有空的数据.不解的是在压缩中时间戳可能不会被保留,所以如果批次中的第一条记录被压缩,时间戳也会改变

    5.3.1.1 批次控制

    批次控制包含成为控制记录的单条记录. 控制记录不应该传送给应用程序,相反,他们是消费者用来过滤中断的事务消息.

    控制记录的 key 符合以下模式:

    1
    2
    version: int16 (current version is 0)
    type: int16 (0 indicates an abort marker, 1 indicates a commit)

    批次记录值的模式依赖于类型. 对客户端来说它是透明的.

    5.3.2 Record(记录)

    Record level headers were introduced in Kafka 0.11.0. The on-disk format of a record with Headers is delineated below.

    Record 级别的头部信息在0.11.0 版本引入. 拥有 headers 的 Record 的磁盘格式如下.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    length: varint
    attributes: int8
        bit 0~7: unused
    timestampDelta: varint
    offsetDelta: varint
    keyLength: varint
    key: byte[]
    valueLen: varint
    value: byte[]
    Headers => [Header]
         
    5.4.2.1 Record Header
    1
    2
    3
    4
    5
    headerKeyLength: varint
    headerKey: String
    headerValueLength: varint
    Value: byte[]
         

    我们使用了和 Protobuf 编码格式相同的 varint 编码. 更多后者相关的信息 在这里. Record 中 headers 的数量也被编码为 varint .

    5.4 日志

    命名为 "my_topic" 的主题日志有两个分区,包含两个目录 (命名为 my_topic_0 和 my_topic_1) ,目录中分布着包含该 topic 消息的日志文件.

    日志文件的格式是 "log entries" 的序列; 每个日志对象是由4位的数字N存储日志长度,后跟 N 字节的消息.

    每个消息使用64位的整数作为 offset 唯一标记, offset 即为发送到该 topic partition 中所有流数据的起始位置.

    每个消息的磁盘格式如下. 每个日志文件使用它包含的第一个日志的 offset 来命名.所以创建的第一个文件是 00000000000.kafka, 并且每个附件文件会有大概 S 字节前一个文件的整数名称,其中 S 是配置给出的最大文件大小.

    记录的精确二进制格式是版本化的,并且按照标准接口进行维护,所以批量的记录可以在 producer, broker 和客户端之间传输,而不需要在使用时进行重新复制或转化.前一章包含了记录的磁盘格式的详情.

    消息的偏移量用作消息 id 是不常见的.我们最开始的想法是使用 producer 自增的 GUID ,并维护从 GUID 到每个 broker 的 offset 的映射.这样的话每个消费者需要为每个服务端维护一个 ID,提供全球唯一的 GUID 没有意义.而且,维护一个从随机 ID 到偏移量映射的复杂度需要一个重度的索引结构,它需要与磁盘进行同步,本质上需要一个完整的持久随机访问数据结构.

    因此为了简化查找结构,我们决定针对每个分区使用一个原子计数器,它可以利用分区id和节点id唯一标识一条消息.虽然这使得查找结构足够简单,但每个消费者的多个查询请求依然是相似的.

    一旦我们决定使用使用计数器,直接跳转到对应的偏移量显得更加自然-毕竟对于每个分区来说它们都是一个单调递增的整数.由于消费者API隐藏了偏移量,所以这个决定最终是一个实现细节,我们采用了更高效的方法。

    Writes

    日志允许序列化的追加到最后一个文件中.

    当文件大小达到配置的大小(默认 1G)时,会生成一个新的文件.

    日志中有两个配置参数: M 是在 OS 强制写文件到磁盘之前的消息条数, S 是强制写盘的秒数.这提供了一个在系统崩溃时最多丢失 M 条或者 S 秒消息的保证.

    Reads

    通过提供消息的64位逻辑偏移量和 S 位的 max chunk size 完成读请求.这会返回一个包含 S 位的消息缓存迭代器. S 必须大于任何单条的数据,但是在异常的大消息情况下,读取操作可以重试多次,每次会加倍缓冲的大小,直到消息被读取成功.

    可以指定最大消息大小和缓存大小使服务器拒绝接收超过其大小的消息,并为客户端设置消息的最大限度,它需要尝试读取多次获得完整的消息.

    读取缓冲区可能以部分消息结束,这很容易通过大小分界来检测.

    按照偏移量读取的实际操作需要在数据存储目录中找到第一个日志分片的位置,在全局的偏移量中计算指定文件的偏移量,然后读取文件偏移量.

    搜索是使用二分查找法查找在内存中保存的每个文件的偏移量来完成的.

    日志提供了将消息写入到当前的能力,以允许客户端从'当前开始订阅.

    在消费者未能在其SLA指定的天数内消费其数据的情况下,这也是有用的.

    在这种情况下,客户端会尝试消费不存在的偏移量的数据,这会抛出 OutOfRangeException 异常,并且也会重置 offset 或者失败.

    以下是发送给消费者的结果格式.

    1
    2
    3
    4
    5
    6
    7
    MessageSetSend (fetch result)
     
    total length     : 4 bytes
    error code       : 2 bytes
    message 1        : x bytes
    ...
    message n        : x bytes
    1
    2
    3
    4
    5
    6
    7
    MultiMessageSetSend (multiFetch result)
     
    total length       : 4 bytes
    error code         : 2 bytes
    messageSetSend 1
    ...
    messageSetSend n

    Deletes

    在一个时点下只有一个 log segment 的数据能被删除。

    日志管理器允许使用可插拔的删除策略来选择哪些文件符合删除条件.当前的删除策略会删除 N 天之前改动的日志,尽管保留最后的 N GB 数据可能有用.

    为了避免锁定读,同时允许删除修改 segment 列表,我们使用 copy-on-write 形式的 segment 列表实现,在删除的同时它提供了一致的视图允许在多个 segment 列表视图上执行二进制的搜索。

    Guarantees

    日志提供了配置项 M ,它控制了在强制刷盘之前的最大消息数。

    启动时,日志恢复线程会运行,对最新的日志片段进行迭代,验证每条消息是否合法。如果消息对象的总数和偏移量小于文件的长度并且 消息数据包的 CRC32 校验值与存储在消息中的 CRC 校验值相匹配的话,说明这个消息对象是合法的。如果检测到损坏,日志会在最后一个合法 offset 处截断。

    请注意,有两种损坏必须处理:由于崩溃导致的未写入的数据块的丢失和将无意义已损坏的数据块添加到文件。

    原因是:通常系统不能保证文件索引节点和实际数据快之间的写入顺序,除此之外,如果在块数据被写入之前,文件索引已更新为新的大小,若此时系统崩溃,文件不会的到有意义的数据,则会导致数据丢失。

     

    5.5 分布式

    Consumer Offset Tracking(消费者offset跟踪)

    高级别的consumer跟踪每个分区已消费的offset,并定期提交,以便在重启的情况下可以从这些offset中恢复。

    Kafka提供了一个选项在指定的broker中来存储所有给定的consumer组的offset,称为offset manager。

    例如,该consumer组的所有consumer实例向offset manager(broker)发送提交和获取offset请求。高级别的consumer将会自动处理这些过程。如果你使用低级别的consumer,你将需要手动管理offset。目前在低级别的java consumer中不支持,只能在Zookeeper中提交或获取offset。如果你使用简单的Scala consumer,将可拿到offset manager,并显式的提交或获取offset。对于包含offset manager的consumer可以通过发送GroupCoordinatorRequest到任意kafka broker,并接受GroupCoordinatorResponse响应,consumer可以继续向`offset manager broker`提交或获取offset。如果offset manager位置变动,consumer需要重新发现offset manager。如果你想手动管理你的offset,你可以看看OffsetCommitRequest 和 OffsetFetchRequest的源码是如何实现的。

    当offset manager接收到一个OffsetCommitRequest,它将追加请求到一个特定的压缩名为__consumer_offsets的kafka topic中,当offset topic的所有副本接收offset之后,offset manager将发送一个提交offset成功的响应给consumer。

    万一offset无法在规定的时间内复制,offset将提交失败,consumer在回退之后可重试该提交(高级别consumer自动进行)。

    broker会定期压缩offset topic,因为只需要保存每个分区最近的offset。

    offset manager会缓存offset在内存表中,以便offset快速获取。

    当offset manager接收一个offset的获取请求,将从offset缓存中返回最新的的offset。如果offset manager刚启动或新的consumer组刚成为offset manager(成为offset topic分区的leader),则需要加载offset topic的分区到缓存中,在这种情况下,offset将获取失败,并报出OffsetsLoadInProgress异常,consumer回滚后,重试OffsetFetchRequest(高级别consumer自动进行这些操作)。

     
    从ZooKeeper迁移offset到kafka

    Kafka consumers在早先的版本中offset默认存储在ZooKeeper中。可以通过下面的步骤迁移这些consumer到Kafka:

    1. 在consumer配置中设置offsets.storage=kafka 和 dual.commit.enabled=true
    2. consumer做滚动消费,验证你的consumer是健康正常的。
    3. 在你的consumer配置中设置dual.commit.enabled=false
    4. consumer做滚动消费,验证你的consumer是健康正常的。

    回滚(就是从kafka回到Zookeeper)也可以使用上面的步骤,通过设置 offsets.storage=zookeeper

    ZooKeeper 目录

    下面给出了Zookeeper的结构和算法,用于协调consumer和broker。

    Notation

    当一个path中的元素表示为[XYZ],这意味着xyz的值不是固定的,实际上每个xyz的值可能是Zookeeper的znode,例如`/topic/[topic]`是一个目录,/topic包含一个子目录(每个topic名称)。

    数字的范围如[0...5]来表示子目录0,1,2,3,4。

    箭头`->`用于表示znode的内容,例如:/hello->world表示znode /hello包含值”world”。

    Broker节点注册

    1
    /brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)

    这是当前所有broker的节点列表,其中每个提供了一个唯一的逻辑broker的id标识它的consumer(必须作为配置的一部分)。

    在启动时,broker节点通过在/brokers/ids/下用逻辑broker id创建一个znode来注册它自己。

    逻辑broker id的目的是当broker移动到不同的物理机器时,而不会影响消费者。尝试注册一个已存在的broker id时将返回错误(因为2个server配置了相同的broker id)。

    由于broker在Zookeeper中用的是临时znode来注册,因此这个注册是动态的,如果broker关闭或宕机,节点将消失(通知consumer不再可用)。

    Broker Topic 注册

    1
    /brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)

    每个broker在它自己的topic下注册,维护和存储该topic分区的数据。

    Consumers and Consumer Groups

    topic的consumer也在zookeeper中注册自己,以便相互协调和平衡数据的消耗。

    consumer也可以通过设置offsets.storage = zookeeper将他们的偏移量存储在zookeeper中。但是,这个偏移存储机制将在未来的版本中被弃用。因此,建议将数据迁移到kafka中。

    多个consumer可组成一组,共同消费一个topic,在同一组中的每个consumer共享一个group_id。

    例如,如果一个consumer是foobar,在三台机器上运行,你可能分配这个这个consumer的ID是“foobar”。这个组id是在consumer的配置文件中配置的。 

    每个分区正好被一个consumer组的consumer所消费,一组中的consumer尽可能公平地分配分区。

    Consumer Id 注册

    除了由所有consumer共享的group_id,每个consumer都有一个临时且唯一的consumer_id(主机名的形式:uuid)用于识别。consumer的id在以下目录中注册。

    1
    /consumers/[group_id]/ids/[consumer_id] --> {"version":...,"subscription":{...:...},"pattern":...,"timestamp":...} (ephemeral node)

    组中的每个consumer用consumer_id注册znode。znode的值包含一个map<topic,#streams>。这个id只是用来识别在组里目前活跃的consumer,这是个临时节点,如果consumer在处理中挂掉,它就会消失。

    Consumer Offsets

    Consumers track the maximum offset they have consumed in each partition. This value is stored in a ZooKeeper directory if offsets.storage=zookeeper.

    1
    /consumers/[group_id]/offsets/[topic]/[partition_id] --> offset_counter_value (persistent node)

    Partition Owner registry

    Each broker partition is consumed by a single consumer within a given consumer group. The consumer must establish its ownership of a given partition before any consumption can begin. To establish its ownership, a consumer writes its own id in an ephemeral node under the particular broker partition it is claiming.

    1
    /consumers/[group_id]/owners/[topic]/[partition_id] --> consumer_node_id (ephemeral node)

    Cluster Id

    The cluster id is a unique and immutable identifier assigned to a Kafka cluster. The cluster id can have a maximum of 22 characters and the allowed characters are defined by the regular expression [a-zA-Z0-9_-]+, which corresponds to the characters used by the URL-safe Base64 variant with no padding. Conceptually, it is auto-generated when a cluster is started for the first time.

    Implementation-wise, it is generated when a broker with version 0.10.1 or later is successfully started for the first time. The broker tries to get the cluster id from the /cluster/id znode during startup. If the znode does not exist, the broker generates a new cluster id and creates the znode with this cluster id.

    Broker node registration

    The broker nodes are basically independent, so they only publish information about what they have. When a broker joins, it registers itself under the broker node registry directory and writes information about its host name and port. The broker also register the list of existing topics and their logical partitions in the broker topic registry. New topics are registered dynamically when they are created on the broker.

    Consumer registration algorithm

    When a consumer starts, it does the following:

    1. Register itself in the consumer id registry under its group.
    2. Register a watch on changes (new consumers joining or any existing consumers leaving) under the consumer id registry. (Each change triggers rebalancing among all consumers within the group to which the changed consumer belongs.)
    3. Register a watch on changes (new brokers joining or any existing brokers leaving) under the broker id registry. (Each change triggers rebalancing among all consumers in all consumer groups.)
    4. If the consumer creates a message stream using a topic filter, it also registers a watch on changes (new topics being added) under the broker topic registry. (Each change will trigger re-evaluation of the available topics to determine which topics are allowed by the topic filter. A new allowed topic will trigger rebalancing among all consumers within the consumer group.)
    5. Force itself to rebalance within in its consumer group.

    Consumer rebalancing algorithm

    The consumer rebalancing algorithms allows all the consumers in a group to come into consensus on which consumer is consuming which partitions. Consumer rebalancing is triggered on each addition or removal of both broker nodes and other consumers within the same group. For a given topic and a given consumer group, broker partitions are divided evenly among consumers within the group. A partition is always consumed by a single consumer. This design simplifies the implementation. Had we allowed a partition to be concurrently consumed by multiple consumers, there would be contention on the partition and some kind of locking would be required. If there are more consumers than partitions, some consumers won't get any data at all. During rebalancing, we try to assign partitions to consumers in such a way that reduces the number of broker nodes each consumer has to connect to.

    Each consumer does the following during rebalancing:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    1. For each topic T that C<sub>i</sub> subscribes to
    2.   let P<sub>T</sub> be all partitions producing topic T
    3.   let C<sub>G</sub> be all consumers in the same group as C<sub>i</sub> that consume topic T
    4.   sort P<sub>T</sub> (so partitions on the same broker are clustered together)
    5.   sort C<sub>G</sub>
    6.   let i be the index position of C<sub>i</sub> in C<sub>G</sub> and let N = size(P<sub>T</sub>)/size(C<sub>G</sub>)
    7.   assign partitions from i*N to (i+1)*N - 1 to consumer C<sub>i</sub>
    8.   remove current entries owned by C<sub>i</sub> from the partition owner registry
    9.   add newly assigned partitions to the partition owner registry
            (we may need to re-try this until the original partition owner releases its ownership)

    When rebalancing is triggered at one consumer, rebalancing should be triggered in other consumers within the same group about the same time.

  • 相关阅读:
    模型层
    视图层,模板层
    ORM表关系建立
    CMakeList入门
    C++标准模板库
    C++基本语法
    g++应用说明
    Linux快捷键
    Git 操作备忘
    Block的详细介绍
  • 原文地址:https://www.cnblogs.com/panpanwelcome/p/13558294.html
Copyright © 2011-2022 走看看