zoukankan      html  css  js  c++  java
  • Kafka高级设计和架构,一文深化理解

    主题:

    • 1、kafka是写磁盘还是写内存?
    • 2、kafka究竟是由 consumer 从 broker 那里拉数据,还是由 broker 将数据推到 consumer?
    • 3、如何区分已消费(consumed)的记录?
    • 4、kafka用什么方法保障持久化的低延迟和高效率?
    • 5、kafka的消息保证有几种方式?
    • 6、kafka消息是否会丢失?为什么?
    • 7、kafka最合理的配置是什么?
    • 8、kafka的leader选举机制是什么?
    • 9、kafka节点之间如何复制备份的?
    • 10、kafka什么情况下会丢消息?

    1、kafka是写磁盘还是写内存?

    当然是磁盘。不同于Redis的内存 + 持久化磁盘,也不同于Mongodb仅把索引放于内存。kafka是基于磁盘的持久化。


    2、kafka究竟是由 consumer 从 broker 那里 pull 数据,还是由 broker 将数据 push 到 consumer?

    Kafka 在这方面采取了一种较为传统的设计方式,也是大多数的消息系统所共享的方式:即 producer 把数据 push 到 broker,然后 consumer 从 broker 中 pull 数据。
    pull-based 系统有一个很好的特性, 那就是当 consumer 速率落后于 producer 时,可以在适当的时间赶上来。还可以通过使用某种 backoff 协议来减少这种现象:即 consumer 可以通过 backoff 表示它已经不堪重负了,然而通过获得负载情况来充分使用 consumer(但永远不超载)这一方式实现起来比它看起来更棘手。前面以这种方式构建系统的尝试,引导着 Kafka 走向了更传统的 pull 模型。

    另一个 pull-based 系统的优点在于:它可以大批量生产要发送给 consumer 的数据。而 push-based 系统必须选择立即发送请求或者积累更多的数据,然后在不知道下游的 consumer 能否立即处理它的情况下发送这些数据。如果系统调整为低延迟状态,这就会导致一次只发送一条消息,以至于传输的数据不再被缓冲,这种方式是极度浪费的。 而 pull-based 的设计修复了该问题,因为 consumer 总是将所有可用的(或者达到配置的最大长度)消息 pull 到 log 当前位置的后面,从而使得数据能够得到最佳的处理而不会引入不必要的延迟。

    简单的 pull-based 系统的不足之处在于:如果 broker 中没有数据,consumer 可能会在一个紧密的循环中结束轮询,实际上 busy-waiting 直到数据到来。为了避免 busy-waiting,我们在 pull 请求中加入参数,使得 consumer 在一个“long pull”中阻塞等待,直到数据到来(还可以选择等待给定字节长度的数据来确保传输长度)。

    3、消费者如何跟踪已消费内容的位置?或如何区分已消费(consumed)的记录?
    为了解决消息丢失的问题,许多消息系统增加了确认机制:即当消息被发送出去的时候,消息仅被标记为sent 而不是 consumed;然后 broker 会等待一个来自 consumer 的特定确认,再将消息标记为consumed。这个策略修复了消息丢失的问题,但也产生了新问题。 首先,如果 consumer 处理了消息但在发送确认之前出错了,那么该消息就会被消费两次。第二个是关于性能的,现在 broker 必须为每条消息保存多个状态(首先对其加锁,确保该消息只被发送一次,然后将其永久的标记为 consumed,以便将其移除)。 还有更棘手的问题要处理,比如如何处理已经发送但一直得不到确认的消息。

    Kafka的做法非常高效。Kafka的 topic 被分割成了一组完全有序的 partition,其中每一个 partition 在任意给定的时间内只能被每个订阅了这个 topic 的 consumer 组中的一个 consumer 消费。这意味着 partition 中 每一个 consumer 的位置仅仅是一个数字,即下一条要消费的消息的offset。这使得被消费的消息的状态信息相当少,每个 partition 只需要
    一个数字。这个状态信息还可以作为周期性的 checkpoint。这以非常低的代价实现了和消息确认机制等同的效果。

    这种方式还有一个附加的好处。consumer 可以回退到之前的 offset 来再次消费之前的数据,这个操作违反了队列的基本原则,但事实证明对大多数 consumer 来说这是一个必不可少的特性。 例如,如果 consumer 的代码有 bug,并且在 bug 被发现前已经有一部分数据被消费了, 那么 consumer 可以在 bug 修复后通过回退到之前的 offset 来再次消费这些数据

    4、kafka用什么方法保障持久化的低延迟和高效率?

    • 线性写磁盘文件
    • 预读取
    • 预写、批量写
    • zero-copy(零拷贝)优化
    • 端到端的批量压缩

    线性写磁盘
    通常认为磁盘很慢,那是随机写慢,随机写的速度只有100K/秒,而几年前的6个7200rpm SATA RAID-5 的磁盘阵列上线性写的速度大概是600M/秒。相差6000倍。一个好的磁盘结构设计(线性写)可以使之跟网络速度一样快。

    预读取
    预读取将有效地将每个磁盘中有用的数据预存到缓存。现代的操作系统提供了预读和写技术,将多个大块预取数据。

    预写
    较小的写入组合成一个大的物理写。

    对于主要用于日志处理的消息系统,数据的持久化可以简单的通过将数据追加到文件中实现,读的时候从文件中读就好了。
    这样做的好处是读和写都是 O(1) 的,并且读操作不会阻塞写操作和其他操作。这样带来的性能优势是很明显的,因为性能和数据的大小没有关系了。这意味着我们可以提供一般消息传递系统无法提供的特性。 例如,在Kafka中,消息被消费后不是立马被删除,我们可以保留消息相对较长的时间(默认设置为一个星期)。 这将为消费者带来很大的灵活性。

    一旦消除了磁盘访问模式不佳的情况,该类系统性能低下的主要原因就剩下了两个:大量的小型 I/O 操作,以及过多的字节拷贝。

    为了避免这种情况,我们的协议是建立在一个 “消息块” 的抽象基础上,合理将消息分组。 这使得网络请求将多个消息打包成一组,而不是每次发送一条消息,从而使整组消息分担网络中往返的开销。Consumer 每次获取多个大型有序的消息块,并由服务端 依次将消息块一次加载到它的日志中。这个简单的优化对速度有着数量级的提升。批处理允许更大的网络数据包,更大的顺序读写磁盘操作,连续的内存块等等,所有这些都使 KafKa 将随机流消息顺序写入到磁盘, 再由 consumers 进行消费。批处理是提升性能的一个主要驱动,为了允许批量处理,kafka 生产者会尝试在内存中汇总数据,并用一次请求批次提交信息。 批处理,不仅仅可以配置指定的消息数量,也可以指定等待特定的延迟时间(如64k 或10ms),这允许汇总更多的数据后再发送,在服务器端也会减少更多的IO操作。 该缓冲是可配置的,并给出了一个机制,通过权衡少量额外的延迟时间获取更好的吞吐量。

    zero-copy(零拷贝)优化

    broker 维护的消息日志本身就是一个文件目录,每个文件都由一系列以相同格式写入到磁盘的消息集合组成,这种写入格式被 producer 和 consumer 共用。保持这种通用格式可以对一些很重要的操作进行优化: 持久化日志块的网络传输。 现代的unix 操作系统提供了一个高度优化的编码方式,用于将数据从 pagecache 转移到 socket 网络连接中;在 Linux 中
    系统调用 sendfile 做到这一点。

    为了理解 sendfile 的意义,了解数据从文件到套接字的常见数据传输路径就非常重要(四次 copy):

    1、操作系统从磁盘读取数据到内核空间的 pagecache
    2、应用程序读取内核空间的数据到用户空间的缓冲区
    3、应用程序将数据(用户空间的缓冲区)写回内核空间到套接字缓冲区(内核空间)
    4、操作系统将数据从套接字缓冲区(内核空间)复制到通过网络发送的 NIC 缓冲区

    这显然是低效的,有四次 copy 操作和两次系统调用。使用 sendfile 方法,可以允许操作系统将数据从 pagecache 直接发送到网络,这样避免重新复制数据。所以这种优化方式,只需要最后一步的copy操作,将数据复制到 NIC 缓冲区。

    我们期望一个普遍的应用场景,一个 topic 被多消费者消费。使用上面提交的 zero-copy(零拷贝)优化,数据在使用时只会被复制到 pagecache 中一次,节省了每次拷贝到用户空间内存中,再从用户空间进行读取的消耗。这使得消息能够以接近网络连接速度的 上限进行消费。

    pagecache 和 sendfile 的组合使用意味着,在一个kafka集群中,大多数 consumer 消费时,您将看不到磁盘上的读取活动,因为数据将完全由缓存提供。

    端到端的批量压缩

    在某些情况下,数据传输的瓶颈不是 CPU ,也不是磁盘,而是网络带宽。对于需要通过广域网在数据中心之间发送消息的数据管道尤其如此。当然,用户可以在不需要 Kakfa 支持下一次一个的压缩消息。但是这样会造成非常差的压缩比和消息重复类型的冗余,比如 JSON 中的字段名称或者是或 Web 日志中的用户代理或公共字符串值。高性能的压缩是一次压缩多个消息,而不是压缩单个消息。

    Kafka 以高效的批处理格式支持一批消息可以压缩在一起发送到服务器。这批消息将以压缩格式写入,并且在日志中保持压缩,只会在 consumer 消费时解压缩。

    Kafka 支持 GZIP,Snappy 和 LZ4 压缩协议。


    5、kafka的消息保证有几种方式?请描述细节。


    Kafka可以提供的消息交付语义保证有多种:

    • At most once最多一次—— 消息可能会丢失但绝不重传。
    • At least once最少一次—— 消息可以重传但绝不丢失。
    • Exactly once只一次—— 每一条消息只被传递一次.

    生产端的at-least-once
    在 0.11.0.0 之前的版本中, 如果 producer 没有收到表明消息已经被提交的响应, 那么 producer 除了将消息重传之外别无选择。 这里提供的是 at-least-once 的消息交付语义,因为如果最初的请求事实上执行成功了,那么重传过程中该消息就会被再次写入到 log 当中。

    生产端的exactly-once
    从 0.11.0.0 版本开始,Kafka producer新增了幂等性的传递选项,该选项保证重传不会在 log 中产生重复条目。 为实现这个目的, broker 给每个 producer 都分配了一个 ID ,并且 producer 给每条被发送的消息分配了一个序列号来避免产生重复的消息。 同样也是从 0.11.0.0 版本开始, producer 新增了使用类似事务性的语义将消息发送到多个 topic
    partition 的功能: 也就是说,要么所有的消息都被成功的写入到了 log,要么一个都没写进去。这种语义的主要应用场景就是 Kafka topic 之间的 exactly-once 的数据传递(如下所述)。

    消费端的at-most-once
    Consumer 可以先读取消息,然后将它的位置保存到 log 中,最后再对消息进行处理。在这种情况下,消费者进程可能会在保存其位置之后,带还没有保存消息处理的输出之前发生崩溃。而在这种情况下,即使在此位置之前的一些消息没有被处理,接管处理的进程将从保存的位置开始。在 consumer 发生故障的情况下,这对应于“at-most-once”的语义,可能会有消息得不到处理。

    消费端的at-least-once
    Consumer 可以先读取消息,然后处理消息,最后再保存它的位置。在这种情况下,消费者进程可能会在处理了消息之后,但还没有保存位置之前发生崩溃。而在这种情况下,当新的进程接管后,它最初收到的一部分消息都已经被处理过了。在 consumer 发生故障的情况下,这对应于“at-least-once”的语义。 在许多应用场景中,消息都设有一个主键,所以更新操作是幂等的(相同的消息接收两次时,第二次写入会覆盖掉第一次写入的记录)。

    消费端的exactly-once
    使用我们上文提到的 0.11.0.0 版本中的新事务型 producer,达成消息存储的exactly-once。将 consumer 的offset存储为一个 topic 中的消息,所以我们可以在输出 topic 接收信号(数据已经被处理的)时候,在同一个事务中向 Kafka 写入 offset。如果事务被中断,则消费者的位置将恢复到原来的值,而输出 topic 上产生的数据对其他消费者是否可见,取决
    于事务的“隔离级别”。 在默认的“read_uncommitted”隔离级别中,所有消息对 consumer 都是可见的,即使它们是中止的事务的一部分,但是在“read_committed”的隔离级别中,消费者只能访问已提交的事务中的消息(以及任何不属于事务的消息)。举一个非常实际的例子,HR系统将组织架构的变更生产到Kafka,同步程序来完成其他业务系统(比如OA、CRM等)来进行exactly-once的同步变更,需要在 consumer 的 offset 与实际存储为输出的内容间进行协调,比方消费的ID号批次为10095-10099,则offset截止到10099(上一次的offset为10094),实际同步程序也在处理数据同步,保持exactly-once的一般做法为two-phase commit,但有些系统不支持two-phase commit,一般的做法是使consumer和输出系统 将其 offset 存储在与其输出相同的位置,在这个同步组织架构的例子就是同步程序处理完了,可以将offsize保存在db、cache、文件系统、HDFS之类的介质中,如果同步失败(比如10098和10099失败了),则把offsize保存为10097,这样可以下次再次(实际上为exactly-once)处理10098和10099两条记录。

    最后说一下,Kafka 默认保证 at-least-once 的消息交付,这样可以避免exactly-once带来的业务复杂性,至于多次传的重复数据处理,完全可以通过更新操作幂等来去除重复(即交给业务系统来去重)。


    6、kafka节点之间如何复制备份的?

    这个比较详细:

    https://blog.csdn.net/lizhitao/article/details/51718185

    Kafka 的主题被分为多个分区 ,分区是 Kafka 最基本的存储单位。每个分区可以有多个副本 (可以在创建主题时使用 replication-factor 参数进行指定)。其中一个副本是首领副本 (Leader replica),所有的事件都直接发送给首领副本;其他副本是跟随者副本(Follower replica),需要通过复制来保持与首领副本数据一致,当首领副本不可用时,其中一个跟随者副本将成为新首领。

    每个分区都有一个 ISR(in-sync Replica) 列表,用于维护所有同步的、可用的副本。首领副本必然是同步副本,而对于跟随者副本来说,它需要满足以下条件才能被认为是同步副本:

    • 与 Zookeeper 之间有一个活跃的会话,即必须定时向 Zookeeper 发送心跳;
    • 在规定的时间内从首领副本那里低延迟地获取过消息。

    如果副本不满足上面条件的话,就会被从 ISR 列表中移除,直到满足条件才会被再次加入。

    这里给出一个主题创建的示例:使用 --replication-factor 指定副本系数为 3,创建成功后使用 --describe 命令可以看到分区 0 的有 0,1,2 三个副本,且三个副本都在 ISR 列表中,其中 1 为首领副本。

    需要注意的是,并不是所有保存在分区首领上的数据都可以被客户端读取到,为了保证数据一致性,只有被所有同步副本 (ISR 中所有副本) 都保存了的数据才能被客户端读取到。

    在创建主题时,Kafka 会首先决定如何在 broker 间分配分区副本,它遵循以下原则:

    • 在所有 broker 上均匀地分配分区副本;
    • 确保分区的每个副本分布在不同的 broker 上;
    • 如果使用了 broker.rack 参数为 broker 指定了机架信息,那么会尽可能的把每个分区的副本分配到不同机架的 broker 上,以避免一个机架不可用而导致整个分区不可用。

    生产者直接发送数据到主分区的服务器上,不需要经过任何中间路由。 为了让生产者实现这个功能,所有的 kafka 服务器节点都能响应这样的元数据请求: 哪些服务器是活着的,主题的哪些分区是主分区,分配在哪个服务器上,这样生产者就能适当地直接发送它的请求到服务器上。

    客户端控制消息发送数据到哪个分区,这个可以实现随机的负载均衡方式,或者使用一些特定语义的分区函数。 我们有提供特定分区的接口让用于根据指定的键值进行hash分区(当然也有选项可以重写分区函数),例如,如果使用用户ID作为key,则用户相关的所有数据都会被分发到同一个分区上。 这允许消费者在消费数据时做一些特定的本地化处理。

    7、kafka消息是否会丢失?为什么?


    几乎不会丢消息。
    kafka的副本机制是只有当消息被所有的副本节点加入到日志中时, 才算是提交, 只有提交的消息才会被 consumer 消费, 这样就不用担心一旦 leader 挂掉了消息会丢失。另一方面, producer 也 可以选择是否等待消息被提交,这取决他们的设置在延迟时间和持久性之间的权衡,这个选项是由 producer 使用的 acks 设置控制。 请注意,Topic 可以设置同步备份的最小数量,producer 请求确认消息是否被写入到所有的备份时, 可以用最小同步数量判断。如果 producer 对同步的备份数没有严格的要求,即使同步的备份数量低于 最小同步数量(例如,仅仅只有 leader 同步了数据),消息也会被提交,然后被消费。


    在所有时间里,Kafka 保证只要有至少一个同步中的节点存活,提交的消息就不会丢失。


    8、kafka最合理的配置是什么?

    这个要看使用的场景,比如

    1、如果是不允许丢失数据,高度安全,高可用,高一致性,性能和吞吐可以大幅降低,比如金融领域,则

    • topic的配置:replication.factor>=3,即副本数至少是3个;2<=min.insync.replicas<=replication.factor
    • 生产者设置:
      • 设置为同步,producer.type=sync
      • 全部副本全部写入再返回结果校验,request.required.acks=-1(all),
      • SSL用起来,PLAINTEXT不能使用,security.protocol= SSL,以及一系列的SSL开头的设置
    • broker的配置:leader的选举条件(禁止不完全的首领选举),unclean.leader.election.enable=false

    2、如果效率速度优先,允许丢失数据,且对持续在线连接性要求很高

    • 生产者设置:不进行任何写入效验,request.required.acks=0
    • broker的配置:leader的选举条件(允许不完全的首领选举,保障极端情况下的可连接性),unclean.leader.election.enable=true
    • topic的配置:尽量保持min.insync.replicas的默认值==1,replication.factor>=3,即副本数至少是3个;1<=min.insync.replicas<=replication.factor

    3、综合性的要求,类似大部分的应用,既要保障高可用,又要有性能要求,又不允许丢消息(尽量)

    • 生产者设置:保证写入Lead,request.required.acks=1
    • broker的配置:leader的选举条件(不允许不完全的首领选举,保障数据完整),unclean.leader.election.enable=false
    • topic的配置:尽量保持min.insync.replicas的默认值==1

    9、kafka的leader选举机制是什么?

    https://blog.csdn.net/yanshu2012/article/details/54894629

    10、kafka在什么情况下会丢消息?

    情况1:
    在生产者 设置 acks=0 (即消息发送出去就认为发送已经成功了,不会等待任何来自服务器的响应),如果这时候网络异常,会丢消息;

    情况2:
    在生产者 设置 acks=1 (即只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应),如果这时首领节点断电、爆炸什么的,而follow还没有成功写入同步日志,会丢数据;

    情况3:使用不完全的首领选举
    对于副本机制,在 broker 级别有一个可选的配置参数 unclean.leader.election.enable,默认值为 fasle,代表禁止不完全的首领选举。
    这是针对当首领副本挂掉且 ISR 中没有其他可用副本时,是否允许某个不完全同步的副本成为首领副本,这可能会导致数据丢失或者数据不一致,在某些对数据一致性要求较高的场景 (如金融领域),这可能无法容忍的,所以其默认值为 false。

    如果你能够允许部分数据不一致(可能丢失)的话,就配置为 true。

    这是可用性和一致性之间的简单妥协,如果我只等待 ISR 的备份节点,那么只要 ISR 备份节点都挂了,我们的服务将一直会不可用,如果它们的数据损坏了或者丢失了,那就会是长久的宕机。另一方面,如果不是 ISR 中的节点恢复服务并且我们允许它成为 leader , 那么它的数据就是可信的来源,即使它不能保证记录了每一个已经提交的消息。 kafka 默认选择第二种策略,当所有的 ISR 副本都挂掉时,会选择一个可能不同步的备份作为 leader ,可以配置属性 unclean.leader.election.enable 禁用此策略,那么就会使用第 一种策略即停机时间优于不同步。

    这种困境不只有 Kafka 遇到,它存在于任何 quorum-based 规则中。例如,在大多数投票算法当中,如果大多数服务器永久性的挂了,那么您要么选择丢失100%的数据,要么违背数据的一致性选择一个存活的服务器作为数据可信的来源。

    情况4:写入异常造成的数据丢失(或者说是写入失败)
    ISR 机制有个参数是 min.insync.replicas , 可以在 broker 或者主题级别进行配置,代表 ISR 列表中至少要有几个可用副本。
    这里假设设置为 2,那么当目前可用副本数量小于该值时(比如1),就认为整个分区处于不可用状态。此时客户端再向分区写入数据时候就会抛出异常
    org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required

    情况5:极端不可能
    在生产者 即使设置 acks=all 所有的机房节点全部同一时间硬盘全毁,这就不是丢数据的问题了,是数据全丢的问题了。

  • 相关阅读:
    MySQL
    用python自动复制粘贴excel表里某一列的数据到另一个表中
    python操作excel小试牛刀
    python- 安装扩展包
    15分钟用ppt制作桌面整理四格壁纸
    R-算术运算符
    R-变量
    R-函数/语法-整合版
    MySQL-函数-整合版
    Python_图片对比问题汇总
  • 原文地址:https://www.cnblogs.com/starcrm/p/11990972.html
Copyright © 2011-2022 走看看