zoukankan      html  css  js  c++  java
  • kafka 教程(四)-原理进阶

    kafka 最初由 Linkedin 公司开发,是一个 分布式、支持分区、多副本的,基于 zookeeper 协调的分布式发布订阅消息系统,该公司在 2010 年将 kafka 贡献给 apache 基金会,1年后升级为顶级项目。

    kafka 由 scalar 语言编写, 并提供了多种语言的接口。

    kafka 特性

    kafka 是一个分布式系统,天然具有分布式的所有优势。

    1. 高吞吐量:支持每秒百万级的消息处理,最低只有几毫秒的延迟

    2. 可扩展性:集群优势

    3. 容错:集群优势,允许部分节点挂机

    4. 持久化:消息被持久化到磁盘,并支持副本

    kafka 使用场景

    kafka 可以使用到许多场景,包括实时和离线,这里举几个例子,以便扩展你的思维。

    1. 日志存储:将各个服务的 log 统一存储到 kafka,并提供通用接口,方便各种调用,如 hadoop

    2. 消息系统:kafka 本身就是消息队列

    3. 用户活动跟踪:将各种 web / app 的用户行为存储到 kafka,用于实时监控分析,或者迁移到 hadoop、hdfs 等做离线分析

    4. 运营监控:如各种大屏展示

    5. 流式处理:如 spark storm

    kafka 基础架构

    kafka 的特点在于 发布订阅,其机制如下图

    producer 向 kafka send 消息,send 之前,会对消息进行分类,即 topic

    consumer 会与 kafka 建立长连接,当 topic 中有新的消息时,所有 consumer 会自动接收到;当然 consumer 也可以主动拉取历史消息

    Broker

    集群中的一个节点,很多核心的逻辑部署在 broker 上

    controller:中央控制器,管理分区和副本状态,并负责管理 分区 的重新分配,包括 分区选举

    Isr:同步副本

    [root@localhost kafka_2.11-0.9.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2191 --topic sq
    Topic:sq    PartitionCount:4    ReplicationFactor:3    Configs:
        Topic: sq    Partition: 0    Leader: 0    Replicas: 0,10,11    Isr: 0,10,11
        Topic: sq    Partition: 1    Leader: 10    Replicas: 10,11,12    Isr: 10,11,12
        Topic: sq    Partition: 2    Leader: 11    Replicas: 11,12,0    Isr: 11,12,0
        Topic: sq    Partition: 3    Leader: 12    Replicas: 12,0,10    Isr: 12,0,10

    topic sq 4个分区,3个副本,每个分区有个 leader,作为读写的首选 partition

    producer 详解

    producer 处理流程如下图

    以下是有顺序的解释

    1. ProducerRecord 代表数据,数据必须包含 topic 和 value,key 和 partition 可选  --- 创建数据

    2. Serializer 代表格式化 key 和 value  --- 格式化

    3. Partitioner 代表分区策略  --- 根据 key 计算分区

      // 如果 ProducerRecord 中指定了分区,此步忽略,直接返回指定的 partition 

    4. 有了分区和topic,把该消息添加到对应的 topic 和分区上的 batch

    5. 专有线程负责发送 batch record 到 broker

    6. broker 接收到消息,返回一个 response

      // 如果写入成功,返回的是 RecordMetadata,包括 topic、partition、offset

      // 如果写入失败,返回 error,此时 producer 会自动重复发几次,直到 producer 返回 error

    Producer 发送消息有 3 种方式

    1. 立即发送:只管发送给 kafka,不关心是否发送成功

      // 即使不成功,程序也不会提示;不过大部分情况是成功的,因为 kafka 具有高可用性,且自动重试;但不保证一定成功

    2. 同步发送:通过 send 方法发送消息,并返回 future 对象,get 方法会等待 future 对象,检验是否发送成功

    3. 异步发送:通过带有回调函数的 send 方法发送消息,当 producer 收到 broker 的 response 会触发回调函数

    consumer 详解

    由于 consumer 处理消息 和 commit 提交反馈这两个动作不是原子性的,或者说不在一个事务内,由此 consumer 具有 3 种消费模式。

    1. 最多一次:表示这条消息最多被消费一次,可能是 0 次

      // 客户端收到消息后,在处理消息前,自动提交,这样 kafka 就会认为这条消息被消费过了,offset 会加一,这样这条消息就不会再被消费,也就是只能被消费一次

    实现方式: 较小时间间隔内自动提交

    设置 enable.auto.commit为ture
    设置 auto.commit.interval.ms为一个较小的时间间隔.
    client 不要调用 commitSync(),kafka 在特定的时间间隔内自动提交。

    2. 最少一次:表示这条消息最少被消费一次,可能是多次

      // 客户端收到消息后,先处理消息,处理完了再提交,可是处理过程中提交前,kafka 服务可能挂了,或者网络原因等各种异常,导致没有提交成功,kafka 会认为这条消息没被消费

    实现方式:手动提交或者较大时间间隔自动提交

    方法一
    设置 enable.auto.commit 为false
    client 调用 commitSync(),增加消息偏移;
    
    方法二
    设置 enable.auto.commit 为ture
    设置 auto.commit.interval.ms 为一个较大的时间间隔.
    client调用commitSync(),增加消息偏移;

    3. 正好一次:表示这条消息只好被消费一次

      // 保证处理消息和提交在一个事务内

    实现方式比较麻烦,参考 Kafka client 消息接收的三种模式

    在消费者消费过程中, kafka 会使用 offset 来记录当前消费的位置

    kafka 分区机制

    1. broker 端支持 topic 分区,一个 topic 可以有多个分区,一个分区可以有多个副本,一个分区的多个副本分布在不同 broker 上;

    2. partition 使得 kafka 作为 MQ 可以横向扩展,分区越多,吞吐量越大;

    3. 副本中有一个 leader,其他都是 follower,message 先写到 leader 上,再由 leader 同步到 follower;

    4. 一个 partition 中 message 的顺序就是 producer 发送的顺序,所以一个 partition 是一个有序的队列;

    5. 最晚被接收的 message 会被最后消费;

    6. partition 中每个 message 都有一个序列号叫 offset,作为消息的标识符;

    7. 在 broker 中每个分区是一个目录,目录名字规则为  topic-副本序号,副本序号从 0 开始;

    下面的五个文件和 partition 存在一起,都各有用处。

    cleaner-offset-checkpoint:存了每个log的最后清理offset,log 指的是消息

    meta.properties:broker.id 信息

    recovery-point-offset-checkpoint:表示已经刷写到磁盘的记录。recoveryPoint以下的数据都是已经刷 到磁盘上的了。

    replication-offset-checkpoint: 用来存储每个replica的HighWatermark的(high watermark (HW),表示已经被commited的message,HW以下的数据都是各个replicas间同步的,一致的。)

    8. 每个 partition 由多个大小相等的 segment (段)组成,segment 是消息的真实载体,每个 segment 的消息数量不一定相等;

    9. 每个 segment file 由两部分组成:index 和 log,分别存放 索引 和 数据

     00000000000004909731.index
     00000000000004909731.log // 1G文件--Segment
     00000000000005048975.index // 数字是Offset
     00000000000005048975.log

    10. index 和 log 映射关系

    .index文件存放的是message逻辑相对偏移量 (相对offset=绝对offset-base offset)与在相应的.log文件中的物理位置(position);

    但.index并不是为每条message都指定到物理位置的映射,而是以entry为单位,每条entry可以指定连续n条消息的物理位置映射

    例如:假设有20000~20009共10条消息,.index文件可配置为每条entry指定连续10条消息的物理位置映射,该例中,index entry会记录偏移量为20000的消息到其物理文件位置,一旦该条消息被定位,20001~20009可以很快查到。

    每个entry大小8字节,前4个字节是这个message相对于该log segment第一个消息offset(base offset)的相对偏移量,后4个字节是这个消息在log文件中的物理位置。

    11. segment 文件命名规则:partion 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

    12. 多个 segment 方便 old message 被删除;

    13. segment 文件的生命周期有 kafka 参数决定;

    消费组与分区的重平衡-reblance

    之前讲到了消费者组的作用,不熟悉的看我之前的博客。

    如果一个新的消费者加入消费者组,那么他应该会被分配 1 个或多个分区,而这些分区原来是由其他消费者负责;

    当一个消费者离开消费组时,他所消费的分区会被分给其他消费者;

    这种机制叫做 重平衡

    这是 kafka 的一个重要特性,它保证了高可用和横向扩展。

    缺点

    1. 在 重平衡 期间,所有消费者不能进行消费,导致整个消费组不可用;

    2. 重平衡 会导致原来的消费者状态过期,消费者需要重新更新状态,导致消费性能降低

    如何监控消费者

    消费者需要定期向一个作为 组协调者 的 broker 发送心跳,注意 这个 broker 不是特定的,每个组的 broker 可能都不同;

    消费者拉取或者提交时,就会发送心跳;  【老版本】

    如果超过一定时间没有发送心跳,那么他的会话 session 就会过期,触发 重平衡。

    从上可以看出,从宕机到被监测到宕机,是有一个时间段的,这段时间这个消费者负责的分区都没有消息被消费;

    所以通常 我们需要 优雅的关闭,这样消费者会发生 离开 的消息到 组协调者,从而避免这个空白期

    可能有些同学会发现,上面讲的定期发送心跳和拉取提交发送心跳似乎矛盾了,确实如此,所以新版的 kafka 将发送心跳和拉取消息进行分离,拉取消息的频率不影响心跳发送。

    另外更高版本的Kafka支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。

    kafka 与 zookeeper 关系

    zookeeper 起到协调控制的作用。

    1. 管理 broker 和 consumer 的加入与离开

    2. 触发负载均衡,当有 broker 或者 consumer 加入或离开时,会触发负载均衡算法,是的一个 消费组 内的 多个消费者 消费平衡。

    3. 维护消费关系 和 每个 partition 的消费信息

    1. 每个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions信息。
    2. 每个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。
    3. 每个consumer group关联一个临时的owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。

    新版的 kafka 已经把 consumer 部分信息移到 kafka 上了。

     

    参考资料:

    https://blog.csdn.net/lingbo229/article/details/80761778    史上最详细

    https://blog.csdn.net/luanpeng825485697/article/details/81036028  很不错的教程,原理与实操都比较深入

    https://www.jianshu.com/p/d3e963ff8b70

  • 相关阅读:
    数据类型比较(==)
    uniapp(一)
    小程序分包
    小程序网易云(五)
    java.lang的详解
    有哪些日常节省时间的诀窍?
    怎么把知乎的回答转化成自己的知识?
    linux下搭建hadoop环境
    linux下,免密码登录
    mac下创建用户及赋予sudo权限
  • 原文地址:https://www.cnblogs.com/yanshw/p/11468606.html
Copyright © 2011-2022 走看看