zoukankan      html  css  js  c++  java
  • Kafka 0.10 Coordinator概述

    由Kafka内置实现了失败检测和Rebalance(ZKRebalancerListener),但是它存在羊群效应和脑裂的问题,客户端代码实现低级API也不能解决这个问题。如果将失败探测和Rebalance的逻辑放到一个高可用的中心Coordinator,这两个问题即可解决。同时还可大大减少Zookeeper的负载,有利于Kafka Broker的扩展(Broker也会作为协调节点的角色存在)。

    有几种类型:

    • GroupCoordinator: GroupCoordinator handles general group membership and offset management. Each Kafka server instantiates a coordinator which is responsible for a set of groups. Groups are assigned to coordinators based on their group names.
    • WorkerCoordinator:This class manages the coordination process with the Kafka group coordinator on the broker for managing assignments to workers.
    • ConsumerCoordinator:This class manages the coordination process with the consumer coordinator.

    中文:

    • GroupCoordinator:broker端的,每个kafka server都有一个实例,管理部分的consumer group和它们的offset
    • WorkerCoordinator:broker端的,管理GroupCoordinator程序,主要管理workers的分配。
    • ConsumerCoordinator:consumer端的,管理ConsumerCoordinator程序。

    1.Broker端的Coordinator

    Kafka的group management protocol包括以下的动作序列:

    • Group Registration:Group的成员需要向cooridnator注册自己,并且提供关于成员自身的元数据(比如,这个消费成员想要消费的topic)
    • Group/Leader Selection:cooridnator确定这个group包括哪些成员,并且选择其中的一个作为leader。
    • State Assignment: leader收集所有成员的metadata,并且给它们分配状态(state,可以理解为资源,或者任务)。
    • Group Stabilization: 每个成员收到leader分配的状态,并且开始处理。

    这里边有三个角色:coordinator, group memeber, group leader.

    有这么几个情况:

    1. 所有的consumer线程要先向coordinator注册,由coordinator选出leader, 然后由leader来分配state。 从group memeber里选出来一个做为leader,由leader来执行性能开销大的协调任务, 这样把负载分配到client端,可以减轻broker的压力,支持更多数量的消费组。
    2. 所有group member(指的是consumer线程)都需要发心跳给coordinator,这样coordinator才能确定group的成员。
    3. 对于Kafka consumer,它的实际上必须跟coordinator保持连接,因为它还需要提交offset给coordinator。所以coordinator实际上负责commit offset,那么,即使leader来确定状态的分配,但是每个partition的消费起始点,还需要coordinator来确定。

    问题:这就带来了一问题,每个partition的消费开始的offset是由leader向coordinator请求,然后做为state分配,还是leader只分配partition,而follower去coordinator处请求开始消费的offset?

    回答:我从合理性来思考,coordinator向leader发送了topic-partition的Offset消费情况,leader分配好partition后,回传给leader。所有的follower同步这个状态。

    2.客户端的ConsumerCoordinator

    image
    这张图展示了Server和Client端的关系。

    ConsumerCoordinator是KafkaConsumer的一个成员变量,所以每个消费者都要自己的ConsumerCoordinator,消费者的ConsumerCoordintor只是和服务端的GroupCoordinator通信的介质。

    下文中提到的协调者一般指的是服务端的GroupCoordinator。

    每个KafkaServer都有一个GroupCoordinator实例,服务端的GroupCoordinator管理消费组成员和offset,它可以管理多个消费组(因为Broker本身即使存储一个topic的消息,也可以被不同的消费组订阅)。注意:组成员的状态管理(比如GroupMetadata)是在服务端的GroupCoordinator完成的,而不是由消费组的ConsumerCoordinator完成(因为消费者只能看到自己的,无法看到和自己同组的其他成员)。

    3.Consumer消费者的工作过程:

    1. 在consumer启动时或者coordinator节点故障转移时,consumer发送ConsumerMetadataRequest给任意一个brokers。在ConsumerMetadataResponse中,它接收对应的Consumer Group所属的Coordinator的位置信息。
    2. Consumer连接Coordinator节点,并发送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration错误码,说明协调节点已经在初始化平衡。消费者就会停止抓取数据,提交offsets,发送JoinGroupRequest给协调节点。在JoinGroupResponse,它接收消费者应该拥有的topic-partitions列表以及当前Consumer Group的新的generation编号。这个时候Consumer Group管理已经完成,Consumer就可以开始fetch数据,并为它拥有的partitions提交offsets。
    3. 如果HeartbeatResponse没有错误返回,Consumer会从它上次拥有的partitions列表继续抓取数据,这个过程是不会被中断的。
      image

    4. Coordinator协调节点的工作过程

    1. 在稳定状态下,Coordinator节点通过故障检测协议跟踪每个Consumer Group中每个Consumer的健康状况。
    2. 在选举和启动时,Coordinator节点读取它管理的Consumer Group列表,以及从ZK中读取每个消费组的成员信息。如果之前没有成员信息,它不会做任何动作。只有在同一个消费组的第一个消费者注册进来时,Coordinator节点才开始工作(即开始加载Group的Consumer成员信息)。
    3. 当Coordinator节点完全加载完它所负责的Consumer Group列表的所有组成员之前,它会在以下几种请求的响应中返回CoordinatorStartupNotComplete错误码:HeartbeatRequest,OffsetCommitRequest,JoinGroupRequest。这样消费者就会过段时间重试(直到完全加载,没有错误码返回为止)。
    4. 在选举或启动时,Coordinator节点会对消费组中的所有消费者进行故障检测。根据故障检测协议被协调节点标记为Dead的消费者会从消费组中移除,这个时候协调节点会为Dead的消费者所属的消费组触发一个Rebalance操作(消费者Dead之后,这个消费者拥有的partition需要平衡给其他消费者)。
    5. 当HeartbeatResponse返回IllegalGeneration错误码,就会触发平衡操作。一旦所有存活的Consumer通过JoinGroupRequests重新注册到Coordinator节点,Coordinator节点会将最新的partition所有权信息在JoinGroupResponse的每个消费者之间通信(同步),然后就完成了Rebalance操作。
    6. Coordinator节点会跟踪任何一个Consumer已经注册的topics的topic-partition的变更。如果它检测到某个topic新增的partition,就会触发Rebalance操作。当创建一个新的topics也会触发Rebalance操作,因为消费者可以在topic被创建之前就注册它感兴趣的topics。

    Consumer状态机

    image

    Coordinator状态机

    image

    当Coordinator发生故障时,Consumer发现new Coordinator的顺序可能发生在新的协调者完成故障处理(包括从zk中加载消费组元数据等)之前或之后。如果在完成故障处理之后才发现new Coordinator,new Coordinator就会像之前一样接收消费者的心跳请求。而如果是在之前,新的协调者则会拒绝消费者的心跳请求,会导致消费者重新发现协调者,并重新连接协调者。如果消费者太晚连接新的协调者,协调者可能会标记消费者挂掉了,消费者再次加入时,会认为这是一个新的消费者,并触发rebalance。

    消费者发现新的协调者(co-ordinator re-discovery),包括两个步骤,首先确定新的协调者,然后消费者连接协调者。如果新的协调者确定了,并且消费者成功连接上新协调者,这样消费者发送的心跳请求就会被新的协调者正常接收。但是如果新协调者已经确定,而消费者并没有连接上新的协调者,消费者发送的心跳请求并不会被接收:因为连接都还没有建立!

    5.Coordinator存储的信息有什么

    对于每个Consumer Group,Coordinator会存储以下信息:

    1. 对每个存在的topic,可以有多个消费组group订阅同一个topic(对应消息系统中的广播)
    2. 对每个Consumer Group,元数据如下:
    • 订阅的topics列表
    • Consumer Group配置信息,包括session timeout等
    • 组中每个Consumer的元数据。包括主机名,consumer id
    • 每个正在消费的topic partition的当前offsets
    • Partition的ownership元数据,包括consumer消费的partitions映射关系

    没看完,这里还有更多的细节, 博文

  • 相关阅读:
    Linux混杂设备驱动学习
    Linux字符设备驱动解析
    U-BOOT-Linux启动指令bootm分析
    项目-基于视频压缩的实时监控系统--tiny6410
    django分页linaro-django-pagination
    django FileFIeld和ImageField 上传路径改写
    ajaxFileUpload用法
    gzip压缩
    编写EL函数
    struts2上传下载
  • 原文地址:https://www.cnblogs.com/byrhuangqiang/p/6384986.html
Copyright © 2011-2022 走看看