zoukankan      html  css  js  c++  java
  • kafka原理

    摘自:https://blog.csdn.net/ychenfeng/article/details/74980531          https://blog.csdn.net/qq_24084925/article/details/78842844

    Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

    Kafka的特性:

    • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
    • 可扩展性:kafka集群支持热扩展。
    • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
    • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
    • 高并发:支持数千个客户端同时读写。

    Kafka的使用场景:

    • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
    • 消息系统:解耦和生产者和消费者、缓存消息等。
    • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
    • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
    • 流式处理:比如spark streaming和storm。
    • 事件源。

    Kafka架构

    相关概念

    1.producer:
      消息生产者,发布消息到 kafka 集群的终端或服务。
    2.broker:
      kafka 集群中包含的服务器。
    3.topic:
      每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
    4.partition:
      partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
    5.consumer:
      从 kafka 集群中消费消息的终端或服务。
    6.Consumer group:
      high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
    7.replica:
      partition 的副本,保障 partition 的高可用。
    8.leader:
      replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
    9.follower:
      replica 中的一个角色,从 leader 中复制数据。
    10.controller:
      kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。
    12.zookeeper:
      kafka 通过 zookeeper 来存储集群的 meta 信息。
    View Code

    Zookeeper节点

    Producer发布消息

     写入方式

     producer 采用push模式将消息发布到broker,每条消息都被append到patition中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高保障kafka吞吐率)。

     消息路由   producer发送消息到broker时,会根据分区算法选择将其存储到哪一个partition。其路由机制为:

    1. 指定了 patition,则直接使用;
    2. 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
    3. patition 和 key 都未指定,使用轮询选出一个 patition。

     Java客户端分区源码

    //创建消息实例
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
         if (topic == null)
              throw new IllegalArgumentException("Topic cannot be null");
         if (timestamp != null && timestamp < 0)
              throw new IllegalArgumentException("Invalid timestamp " + timestamp);
         this.topic = topic;
         this.partition = partition;
         this.key = key;
         this.value = value;
         this.timestamp = timestamp;
    }
    
    //计算 patition,如果指定了 patition 则直接使用,否则使用 key 计算
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
         Integer partition = record.partition();
         if (partition != null) {
              List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
              int lastPartition = partitions.size() - 1;
              if (partition < 0 || partition > lastPartition) {
                   throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
              }
              return partition;
         }
         return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }
    
    // 使用 key 选取 patition
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
         int numPartitions = partitions.size();
         if (keyBytes == null) {
              int nextValue = counter.getAndIncrement();
              List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
              if (availablePartitions.size() > 0) {
                   int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
                   return availablePartitions.get(part).partition();
              } else {
                   return DefaultPartitioner.toPositive(nextValue) % numPartitions;
              }
         } else {
              //对 keyBytes 进行 hash 选出一个 patition
              return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
         }
    }
    View Code

     写入流程

    1. producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader
    2. producer 将消息发送给该 leader
    3. leader 将消息写入本地 log
    4. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
    5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
    View Code

     producer delivery guarantee

    1. At most once 消息可能会丢,但绝不会重复传输
    2. At least one 消息绝不会丢,但可能会重复传输
    3. Exactly once 每条消息肯定会被传输一次且仅传输一次
    View Code

    当producer向broker发送消息时,一旦这条消息被 commit,由于replication的存在,它就不会丢。但是如果producer发送数据给broker后,遇到网络问题而造成通信中断,那 Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once,但目前还并未实现。所以目前默认情况下一条消息从producer到broker是确保了At least once,可通过设置producer异步发送实现At most once。

    Broker 保存消息

     存储方式  物理上把topic分成一个或多个patition(对应 server.properties 中的 num.partitions=3 配置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件),如下:

     存储策略

      无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。

    1. 基于时间:log.retention.hours=168
    2. 基于大小:log.retention.bytes=1073741824

     topic 创建与删除

      创建topic

    1. controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
    2. controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
        2.1 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
        2.2 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
    3. controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。
    View Code

     删除topic

    1. controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
    2. 若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。
    View Code

    kafka HA

     replication

      同一个partition可能会有多个replica(对应 server.properties 配置中的 default.replication.factor=N)。没有replica的情况下,一旦 broker宕机,其上所有patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。引入replication之后,同一个partition可能会有多个replica,而这时需要在这些replica之间选出一个 leader,producer和consumer只与这个leader交互,其它replica作为follower从leader中复制数据。

     Kafka分配Replica 的算法如下:

    1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序
    2. 将第 i 个 partition 分配到第(i mod n)个 broker 上
    3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上
    View Code

      leader failover

      当partition对应的leader宕机时,需要从follower中选举出新leader。在选举新leader时,一个基本的原则是,新的leader必须拥有旧leader commit 过的所有消息。kafka 在 zookeeper中(/brokers/.../state)动态维护了一个ISR(in-sync replicas),由3.3节的写入流程可知ISR里面的所有replica都跟上了leader,只有 ISR 里面的成员才能选为 leader。对于f+1个replica,一个partition可以在容忍f个replica失效的情况下保证消息不丢失。当所有 replica 都不工作时,有两种可行的方案:

    1. 等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
    2. 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。
    View Code

     broker failover

    1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
    2. controller 从 /brokers/ids 节点读取可用broker
    3. controller决定set_p,该集合包含宕机 broker 上的所有 partition
    4. 对 set_p 中的每一个 partition
        4.1 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR
        4.2 决定新 leader(如4.3节所描述)
        4.3 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点
    5. 通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令
    View Code

    controller failover

    当controller宕机时会触发controller failover。每个broker都会在zookeeper的 "/controller" 节点注册 watcher,当 controller 宕机时zookeeper中的临时节点消失,所有存活的broker收到fire的通知,每个broker都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。当新的 controller当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成如下操作:

    1. 读取并增加 Controller Epoch。
    2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
    3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
    4. 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
    5. 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
    6. 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
    7. 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
    8. 启动 replicaStateMachine 和 partitionStateMachine。
    9. 将 brokerState 状态设置为 RunningAsController。
    10. 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。
    11. 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
    12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。
    View Code

     consumer 消费消息

     kafka提供了两套consumer API:其中high-level consumer API提供了一个从kafka消费数据的高层抽象,而SimpleConsumer API则需要开发人员更多地关注细节。

    1. The high-level Consumer API
    2. The SimpleConsumer API

      The high-level consumer API

        high-level consumer API提供了consumer group的语义,一个消息只能被group内的一个consumer所消费,且consumer消费消息时不关注offset,最后一个offset由zookeeper 保存。使用 high-level consumer API 可以是多线程的应用,应当注意:

    1. 如果消费线程大于 patition 数量,则有些线程将收不到消息
    2. 如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
    3. 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的

      The SimpleConsumer API

       如果你想要对 patition 有更多的控制权,那就应该使用 SimpleConsumer API,比如:

    1. 多次读取一个消息
    2. 只消费一个 patition 中的部分消息
    3. 使用事务来保证一个消息仅被消费一次

    但是使用此 API 时,partition、offset、broker、leader 等对你不再透明,需要自己去管理。你需要做大量的额外工作:

    1. 必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息
    2. 应用程序需要通过程序获知每个 Partition 的 leader 是谁
    3. 需要处理 leader 的变更

    使用 SimpleConsumer API 的一般流程如下:

    1. 查找到一个“活着”的 broker,并且找出每个 partition 的 leader
    2. 找出每个 partition 的 follower
    3. 定义好请求,该请求应该能描述应用程序需要哪些数据
    4. fetch 数据
    5. 识别 leader 的变化,并对之作出必要的响应

     consumer group

      kafka的分配单位是patition。每个consumer都属于一个group,一个partition只能被同一个group内的一个consumer所消费(也就保障了一个消息只能被group内的一个consuemr 所消费),但是多个group可以同时消费这个partition。kafka的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用spark/Storm 这些实时处理系统对消息在线处理,同时使用Hadoop批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的 consumer group。如下图所示:

    消费方式

    consumer采用pull模式从broker中读取数据。push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

    consumer delivery guarantee

    如果将consumer设置为autocommit,consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka确保了Exactly once。但实际使用中应用程序并非在 consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了consumer delivery guarantee:

    1.读完消息先 commit 再处理消息。
        这种模式下,如果 consumer 在 commit 后还没来得及处理消息就 crash 了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于 At most once
    2.读完消息先处理再 commit。
        这种模式下,如果在处理完消息之后 commit 之前 consumer crash 了,下次重新开始工作时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于 At least once。
    3.如果一定要做到 Exactly once,就需要协调 offset 和实际操作的输出。
        精典的做法是引入两阶段提交。如果能让 offset 和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer 拿到数据后可能把数据放到 HDFS,如果把最新的 offset 和数据本身一起写到 HDFS,那就可以保证数据的输出和 offset 的更新要么都完成,要么都不完成,间接实现 Exactly once。(目前就 high-level API而言,offset 是存于Zookeeper 中的,无法存于HDFS,而SimpleConsuemr API的 offset 是由自己去维护的,可以将之存于 HDFS 中)
    View Code

    consumer rebalance

    当有 consumer 加入或退出、以及 partition 的改变(如 broker 加入或退出)时会触发 rebalance。consumer rebalance算法如下:

    1. 将目标 topic 下的所有 partirtion 排序,存于PT
    2. 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci
    3. N=size(PT)/size(CG),向上取整
    4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始)
    5. 将第i*N到(i+1)*N-1个 partition 分配给 Ci
    View Code

    在 0.8.*版本,每个 consumer 都只负责调整自己所消费的 partition,为了保证整个consumer group 的一致性,当一个 consumer 触发了 rebalance 时,该 consumer group 内的其它所有其它 consumer 也应该同时触发 rebalance。这会导致以下几个问题:

    1.Herd effect
      任何 broker 或者 consumer 的增减都会触发所有的 consumer 的 rebalance
    2.Split Brain
      每个 consumer 分别单独通过 zookeeper 判断哪些 broker 和 consumer 宕机了,那么不同 consumer 在同一时刻从 zookeeper 看到的 view 就可能不一样,这是由 zookeeper 的特性决定的,这就会造成不正确的 reblance 尝试。
    3. 调整结果不可控
      所有的 consumer 都并不知道其它 consumer 的 rebalance 是否成功,这可能会导致 kafka 工作在一个不正确的状态。
    View Code

     基于以上问题,kafka 设计者考虑在0.9.*版本开始使用中心 coordinator 来控制 consumer rebalance,然后又从简便性和验证要求两方面考虑,计划在 consumer 客户端实现分配方案。

  • 相关阅读:
    二分图匹配(匈牙利算法)
    最长共公子序列(LCS)
    网页常用Js代码
    linux 服务器常用命令整理
    阿里云学生服务器搭建网站-Ubuntu16.04安装php开发环境
    BAT批处理中的字符串处理详解(字符串截取)
    DOS批处理高级教程(还不错)(转)
    EntityFramework的linq扩展where
    RestSharp发送请求得到Json数据
    socket
  • 原文地址:https://www.cnblogs.com/java-oracle/p/8630082.html
Copyright © 2011-2022 走看看