zoukankan      html  css  js  c++  java
  • kafka消息的处理机制(五)

    这一篇我们不在是探讨kafka的使用,前面几篇基本讲解了工作中的使用方式,基本api的使用还需要更深入的去钻研,多使用才会有提高。今天主要是探讨一下kafka的消息复制以及消息处理机制。

    1. broker的注册

    Kafka使用Zookeeper来维护集群成员的信息。每个broker都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成。在kafka启动的时候,他通过创建临节点把自己的id注册到zk,kafka组件订阅zk的/broker/ids路径(broker在zk上的注册路径),当有broker加入或者退出集群的时候,这些组件就可以获得通知。

    如果当前id所在的broker已经注册然后启动另一个有相同id的broker,启动会出错,新的broker会试着进行注册,但是不会成功。因为zk中已经有一个相同名字的id注册过了。

    如果broker出现停机或者网络长时间无响应,broker会从zk断开链接,zk中注册的临时节点会删除,下次broker启动需要重新注册。

    如果是关闭broker那么他对应的节点也会消失,但是他的id也许会存在于其他的数据结构中。比如主题对应的副本,在完全关闭一个broker之后如果使用相同的id启动另一个全新的broker,他会立即加入集群,并且会拥有之前broker所有的主题和分区(前提是没有发生重排序,没有第二个新的broker加入)。

    kafka的哪些组件需要注册到zookeeper?

    (1)Broker注册到zk

    每个broker启动时,都会注册到zk中,把自身的broker.id通知给zk。待zk创建此节点后,kafka会把这个broker的主机名和端口号记录到此节点。

    (2)Topic注册到zk

    当broker启动时,会到对应topic节点下注册自己的broker.id到对应分区的isr列表中;当broker退出时,zk会自动更新其对应的topic分区的ISR列表,并决定是否需要做消费者的rebalance

    (3)Consumer注册到zk

    一旦有新的消费者组注册到zk,zk会创建专用的节点来保存相关信息。如果zk发现消费者增加或减少,会自动触发消费者的负载均衡。

    ==注意,producer不注册到zk==

    2. kafka集群leader选举

    1. 在kafka集群中,第一个启动的broker会在zk中创建一个临时节点/controller让自己成为控制器。其他broker启动时也会试着创建这个节点当然他们会失败,因为已经有人创建过了。那么这些节点会在控制器节点上创建zk watch对象,这样他们就可以收到这个节点变更的通知。任何时刻都确保集群中只有一个leader的存在。
    2. 如果控制器被关闭或者与zk断开连接,zk上的KB是节点马上就会消失。那么其他订阅了leader节点的broker也会收到通知随后他们会尝试让自己成为新的leader,重复第一步的操作。
    3. 如果leader完好但是别的broker离开了集群,那么leader会去确定离开的broker的分区并确认新的分区领导者(即分区副本列表里的下一个副本)。然后向所有包含该副本的follower或者observer发送请求。随后新的分区首领开始处理请求。

    3. kafka副本

    Kafka每个topic的partition有N个副本,其中N是topic的复制因子。Kafka通过多副本机制实现故障自动转移,当Kafka集群中一个Broker失效情况下仍然保证服务可用。在Kafka中发生复制时确保partition的预写式日志有序地写到其他节点上。N个replicas中。其中一个replica为leader,其他都为follower,leader处理partition的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。

    Kafka必须提供数据复制算法保证,如果leader发生故障或挂掉,一个新leader被选举并接收客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为leader,或者换句话说,follower追赶leader数据。leader负责维护和跟踪ISR中所有follower滞后状态。当生产者发送一条消息到Broker,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的follower限制,重要的是快速检测慢副本,如果follower”落后”太多或者失效,leader将会把它从replicas从ISR移除。

    3.1 kafka创建副本的2种模式——同步复制和异步复制

    Kafka动态维护了一个同步状态的副本的集合(a set of In-Sync Replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息只有被这个集合中的每个节点读取并追加到日志中,才会向外部通知说“这个消息已经被提交”。

    只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。消息从leader复制到follower,我们可以通过决定Producer是否等待消息被提交的通知(ack)来区分同步复制和异步复制。

    同步复制流程:

    1. producer联系zk识别leader;
    2. 向leader发送消息;
    3. leadr收到消息写入到本地log;
    4. follower从leader pull消息;
    5. follower向本地写入log;
    6. follower向leader发送ack消息;
    7. leader收到所有follower的ack消息;

    leader向producer回传ack。

    异步复制流程:

    和同步复制的区别在于,leader写入本地log之后,直接向client回传ack消息,不需要等待所有follower复制完成。

    既然卡夫卡支持副本模式,那么其中一个Broker里的挂掉,一个新的leader就能通过ISR机制推选出来,继续处理读写请求。

    kafka判断一个broker节点是否存活,依据2个条件:

    1. 节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接;
    2. 如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久。Leader会追踪所有“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。
    3.2 一些名词:
    1. Leader副本:每个分区都有多个副本,针对每个分区,都有一个唯一的一个Leader副本,负责该分区的读写请求处理。
    2. Follower副本:从Leader副本拉取数据,作为Leader副本的热备。
    3. AR:(Assigned Replica)副本集合(Leader+Follower的总和)
    4. ISR:(In-Sync Replica)同步副本集合,与leader副本消息镜像“相差”不多的副本集合,又称为“核心副本集”,与kafka 发送端的ACK的几种语义有关,后面会详聊(注意这个集合是动态的,是会剔除和新增的)。
    5. HW:HW俗称高水位,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broKer的读取请求,没有HW的限制。6. LEO:(Log End Offset)每个分区都会有的一个标记,标示当前分区的最后一条消息(针对Leader就是Leader上的最后一条消息,针对某个Follower,就是当前该Follower的最后一条消息)
    3.3 图解AR,ISR,HW,LEO:

    这里我们假设每个副本有三个分区,副本被剔除和加入ISR的临界条件为落后leader 三条消息,kafka判断是否符合ISR的条件有两个:

    • Follower落后leader多少条消息,落后超过配置值后将踢出ISR
    • Follwer多久没从leader同步消息,超过配置时间没拉取数据将从ISR踢出(kafka0.9后删除了该判断,a为唯一判断标准)。

    下面我们用图来表达下上面的概念的关系:

    1.时刻t1该分区的情况如下,此时ISR与AR一致(Leader,follower1,follower2),follower2 和 leader的消息一致,LEO都为4,follower1的LEO为2,因此leader的HW为2。

    2.时刻t2 follower full gc:

    3.时刻t3,leader接受producer发送来的2条消息5、6,此时发现Follower1已经落后了自己4条消息,将follower1踢出ISR集合:

    4.时刻4,follower1从leader拉取到5这条消息,更新HW:

    5.时刻5,follower1 full gc完成后,发现自己已经落后了很多消息,开始从leader追消息,待消息不落后leader太多时,申请加入ISR中。

    经过上面的图解分析后,我们来看下几个需要注意的点:

    1. ISR是AR的一个子集,并且是不断伸缩的,变化的条件为“是否落后太多的消息”
    2. HW之前的消息代表被集群“commit”的消息,只有commit的消息才对client端(consumer以及request.required.acks为-1时的producer),在前面我们说过,这样能够使kafka在语义上支持不丢消息。我们从producer和consumer两个维度来分析:

    在这之前,我们先说下request.required.acks的取值范围(1,0、-1)

    1. 1:leader成功就返回
    2. 0:无需等待leader响应
    3. -1:ISR都成功才返回

    从producer的角度:当producer将request.required.acks设置为-1时候,保证了消息已经在多个副本中存在了,此时即便leader挂了,这个消息还是存在的(leader选举会从ISR中选举出新的leader),那么假如ISR迟迟同步不成功怎么办呢?

    从consumer的角度:如果没有HW,consumer拉取到最新的消息后,而此时leader宕机,很有可能新的leader中并没有此消息。

    当然不能保证消息永远不会丢,极端的情况下,如ISR中只有leader的时候(当然可以配置集群可用的最小核心副本集个数,但会极大的损失可用性),或者所有副本都宕机了(这个。。。没办法。),消息还是会丢的。

  • 相关阅读:
    PHP 5.5.0 Alpha5 发布
    Ubuntu Touch 只是另一个 Android 皮肤?
    MariaDB 10 已经为动态列提供文档说明
    Percona Toolkit 2.1.9 发布,MySQL 管理工具
    Oracle Linux 6.4 发布
    Ruby 2.0.0 首个稳定版本(p0)发布
    Apache Pig 0.11.0 发布,大规模数据分析
    Node.js 0.8.21 稳定版发布
    红薯 MySQL 5.5 和 5.6 默认参数值的差异
    Django 1.5 正式版发布,支持 Python 3
  • 原文地址:https://www.cnblogs.com/rickiyang/p/11074191.html
Copyright © 2011-2022 走看看