zoukankan      html  css  js  c++  java
  • Kafka技术内幕 读书笔记之(五) 协调者——消费者加入消费组

      消费者客户端轮询的3个步骤:发送拉取请求,客户端轮询,获取拉取结果 消费者在发送拉取请求之前,必须首先满足下面的两个条件
    - 确保消费者已经连接协调者, 即找到服务端中管理这个消费者的协调者节点
    - 确保消费者已经分配到分区, 即获取到协调者节点分配给消费者的分区信息

      消费者客户端除了从协调者节点获取到分区,还会发送心跳请求、提交偏移量给协调者节点 其中,提交偏移量主要和消息的处理有关,
    协调者只是作为偏移量的存储介质而消费者发送心跳请求给协调者,则有可能归现各种各样的问题,如下
    - 消费者没有及时发送心跳 ,可能是消费者发生故障这时协调者应该能够意识到有消费者离开了消费组,需要对消费组内的所有消费者重新分配分区 
    - 消费者发送心跳给协调者,但是服务端的协调者节点也可能出现故障而消费者所有依赖协调者的工作都必须首先存在协调者,
       所以消费者会等待一段时间重新连接正确的协调者节点,然后由协调者节点再次分配分区 。

    消费者加入消费组
      消费者发送“加入组请求”获取分区定义在抽象客户端协调者的ensureActiveGroup ()方法,而该方法又定义在消费者的轮询操作中 
     即消费者每次轮询都会调用该方法,但并不是每次轮询都要发送“加入组请求” 
     
      消费者发送“加入组”请求会返回一个异步请求对象为了确保消费者只有分配到分区之后,才可执行后续的拉取分区消息操作,
    消费者需要通过客户端阻塞式地轮询等待异步请求完成 在异步请求完成后分配的分区设置到消费者订阅状态的“分配结果”中

    “加入组请求”相关的业务逻辑 主要步骤如下
    (1)消费者加入消费组之前,需要做一些准备工作,比如同步提交一次偏移量,执行监听器的回调
    (2)消费者创建“加入组请求”,包括消费者的元数据作为请求的数据内容
    (3)消费者发送“加入组请求”,采用组合模式返回个新的异步请求对象,并定义回调处理器
    (4)客户端通过轮询,确保组合模式返回的异步请求必须完成,这是个阻塞的方法
    (5)异步请求完成后,执行回调方法,将分区设置到消费者的订阅状态,并重置心跳定时任务

    元数据与分区分配器
      消费者客户端建“加入组请求”,请求对象的变量有 消费组名称( groupid 消费者成员编号( memberId )、协议类型( protocolType )、
    元数据( metadata
      有两种协议类型 :消费者( consumer )、连接器( connect 。 “协议内容”和“元数据”构成协议元数据( ProtocolMetadata 。 元数据和协议类型有关,
    消费者的元数据是订阅的主题,连接器的元数据是一些配置信息。 

      消费者的协议内容是分区分配器( PartitionAssignor)的类名 目前Kafka消费者支持两种分区分配方式 : 范围、 循环。 
    分配算法的两个参数:
    - subscriptions 。订阅哪些主题
    - metadata 。 消费者订阅的这些主题都有多少个分区
    subscriptions 表示每个消费者的订阅信息,通过让每个消费者都发送向己的订阅信息给协调者,协调者就可以收集到所有消费者订阅的主题。
    metadata 是集群的元数据,它记录了每个主题的相关信息,包括主题的分区数。 这样协调者就可以将对应主题的分区,分配给所有订阅这些
    主题的消费者

      分配方法的返回值包含了每个消费者对应的分配结果,分配结果是一个“主题分区集合”,表示分配给消费者的所有主题分区 
     消费者发送订阅信息( subscriptions )对象以及服务端返回分配结果( Assignment )对象,在网络传输时都需要进行序列化 

    消费者的加入组和同步组

    1 . 主消费者执行任务分配

      在Kafka的实现中,协调者在收集完所有的消费者及其订阅信息后,并不执行具体的任务分配算法,而是交给其中一个消费者执行分区分配任务,这个消费者叫作主消费者 。 
    - 协调者在选择主消费者上不需要做过多的判断,通常会选择第个发送“加入组请求”的消费者作为主消费者。 执行分区分配的算法交给任何一个消费者都是可以完成的 
    不管什么类型的消费者失败, 失败的消费者会被协调者从消费组中移除 , 并触发再平衡操作,剩余存活的每个消费者都要重新加入消费组
    - 每个消费者发送“加入组请求”后 ,协调者在收集完所有的消费者及其订阅信息后,会返回“加入组响应”给每个消费者,但这个响应结果并不是分区分配结果 
    - 每个消费者收到“加入组响应”后,都会发送“ 同步组请求”给协调者来获取分配的分区

      主消费者会在完成分区的分配任务后才发送“同步组请求” 普通消费者会即发送“同步组请求”,但因为主消费者还没有将分配结果返回给协调者,
    普通消费者的“同步组请求”在服务端会被延迟处理。 协调者收到主消费者带有分配结果的“同步组请求” 后 , 会将分配结果分配给每个消费者

    2. 发送“加入组请求”
       如果是协调者负责分区的分配工作,消费者发送完“加入组请求”后,就可以从 “加入组应”中获取到分配给它们的分区 。 

      但因为协调者并不执行分区分配,所以它返回的“加入组响应”没有分配结果协调者返回给每个消费者的“加入组响应”是不同的,
    主消费者收到的是“所有消费者成员列表及其对应的订阅信息”,而普通消费者并没有这些数据,因为普通消费者并不会执行分区分配的工作 
    在回调方法中,由于消费者接收的“加入组响应”不是分配的分区,所以不能直接完成“加入组”的异步请求,而应该再次发送“同步组请求 ” 
      但如果“加入组响应”有错误,就不需要继续发送“同步组请求”,而应该对“加入组”的异步请求调用 raise ()方法,表示“加入组”的异步请求有异常 
     客户端轮询完成,但异步请求没有成功, 就不会执行。onJoinCoplete () 回调方法,消费者需要重新发送“加入组请求” 
     

    3 . 发送“同步组”请求
      普通消费者在收到“加入组响应结果”后,会立即发送“同步组请求”给协调者 而主消费者在收到“加入组响应结果”后,会从“加入组响应结果”
    中获取执行分区分配过程中需要用到的数据,然后执行分分配。 只有这个执行过程完成后,主消费者才会开始发送“同步组请求”给协调者

    主消费者执行分配任务
      消费者发送的“加入组请求”( JoinGroupRequest )的内容包括 消费组编号、消费者成员编号 、协议类型、协议内容和元数据( protocolMetadata 
     其中,协议内容是分区分配算法的名称,元数据是消费者订阅的主题列表。 “加入组响应”对象的内容包括:消费者成员编号 、 的消费组协议、
    主消费者编号、协调者执行分区分配工作的次数、消费者成员列表


    - 客户端发送的协议与服务端返回的 消费组协议 (groupProtocol )。虽然“加入组请求”中的“协议名称”包括了系统支持的所有协议类型(范围分配和循环分配),
      但且正执行具体的分区分配时只允许一种协议 。 协调者会负责统所有消费者的协议,选择个大家都支持认可的协议作为“消费组协议” 
      协调者发送“加入组响应”给每个消费者的“消费组”协议都是样的,虽然只有主消费者会使用这个协议来做实际的分配工作 

    - 消费者成员编号( memberId ) 。消费者发送的“加入组请求”需要指定消费者成员编号,当消费者初次加入消费组时 这个编号是UNKNOWN_MEMBER 
      协调者处理每个消费者发送的“加入组请求”,会为每个消费者指定唯一的消费者成员编号,并包含在“加入组响应”中运回给消费者。 后续消费者需要重新加入
      消费组时,发送“加入组请求”巾的消费者成员编号,就是协调者之前分配给它的编号 

    - 主消费者编号( leaderId ) 。协调者选择的主消费者编号,如果消费者的成员编号和主消费者编号相等,那么这个消费者就是主消费者
     
    - 纪元编号( generation )。只在每个消费者每次需要重新加入组时,才会在协调者端进行更新,它表示协调者从启动至今共发生了多少次分区分配
    的工作 每次消费组发生再平衡操作时,协调者都会发起一次分区分配的工作 。 虽然分区分配工作是由主消费者执行的,但主消费者有可能变化,
    所以要由服务端的协调者来记录这个编号

      普通消费者收到“加入组响应”会调用onJoinFollower()方法,立即发送“同步组请求”给协调者,并给返回的“同步组”异步请求链接上“加入组”的异步请求 
     当消费者收到“同步组响应”后,会完成“同步组”的异步请求,再完成“加入组”的异步请求,这样普通消费者就可以从“加入组”的异步请求结果中获取分配给它的分区 
     主消费者在收到“加入组响应”时会调用onJoinLeader()方法,也会发送“同步组请求”给协调者 。 它也会给返回的“同步组”异步请求链接上“加入组”异步
    请求,后续流程和普通也费者类似,分别是:收到“同步组响应”、完成“同步组”异步请求、完成“加入组”异步请求、获取“加入组”异步请求结果 

      消费者发送“同步组请求”( SyncGroupRequest )的内容包括:消费组编号、纪元编号、消费者成员编号 消费组的分配结果 其中前3个信息
    在协调者返回给消费者的“加入组响应”结果中,“消费组的分配结果”只有主消费者会传递主消费者在收到“加入组响应”后,并不会立即发送“同步
    组请求”给协调者,而是要等到执行分区分配的工作完成后才发送“同步组请求” 主消费者发送的“同步组请求”带有“消费组的分配结果”( groupAssignment ),
    普通消费者发送的“同步组请求”没有分配结果,因为它并没有执行分区分配工作 

    消费者发送“加入组请求”给协调者,到获取到分区列表的过程 具体步骤如下
    (1)消费者发送“加入组请求”,得到个“加入组”的异步请求
    (2)消费者获得“加入组响应”结果,表示协调者已经收集到所有发送了“加入组请求”的消费者
    (3)主消费者会执行分区分配任务,返回结果是消费组中所有消费者及其对应的分区列表
    (4)每个消费者都会发送“同步组请求”,得到一个“同步组”的异步请求
    (5)每个消费者获得“同步组响应”结果,表示分配给当前消费者的分区列表
    (6)完成“同步组”的异步请求,并通过模式完成“加入组”的异步请求
    (7)消费者获取“加入组”异步请求的结果,这个数据表示的就是分配给消费者的分区

    加入组的准备、完成和监听器
      消费者重新加入消费组,在分配到分区的前后,都会对消费者的拉取工作产生影响 消费者发送“加入组请求”之前要停止拉取消息,在收到“加入组响应”中的
    区之后要重新开始拉取消息 同时,为了能够让客户端应用程序感知消费者管理的分区发生变化,在加入组前后,客户端还可以设置自定义的“消费者再平衡
    监听器”,以便对分区的变化做出合适的处理
    1. 准备和完成“加入组请求”
      消费者发送“加入每请求”给协调者,最终从协调者获取到的分配结果对象( Assignment )表示 分配给消费者的分区列表消费者在加入消费组过程中调用
    onJoinPrepare ()方法,这表示消费者正准备加入消费组,正在等待分配分区 此时拉取器应该暂停拉取消息,而只有等消费者分配到分区,并
    将最新的分配结果更新到订阅状态中后,拉取器才可以开始发送拉取请求并拉取消息

       行“入组分配分区 塞主流程 :如果“加入组”操作没有完成,后续的流程都不会。 可以看到 , 消费者重新入消费组分区分配

    为后续拉取器拉取分区消息提供了数据来源这两个动作通过消费者的订阅状态关联起来 消费者入组完成后,将分区设置到订阅状态中,
    拉取器工作获取订阅状态的分配结果,然后开始拉取消息 
     
    消费者在加入组的前后会对其他相关组件产生影响,并不仅仅是发送“加入组请求”,然后获取到分配的分区结果就结束了 。 比如,在加入组之前,
    需要执行下面两个操作
    (1)禁用自动提交任务,因为在加入组过程中不会拉取和消费新消息,所以没必要提交偏移量
    (2)执行次同步提交偏移量,这个操作是阻塞的,确保提交偏移量能够成功完成
    在加入组之后 ,也要执行下面几个操作
    (1)更新订阅状态的 needsFetchCommittedOffsets变量,表示需要刷新分区的提交偏移量
    (2)更新订阅状态的分配结果,为每个分区新创建分区状态,这个对象用来记录分区的最新状态
    (3)启动消费者的向动提交任务

    消费者加入消费组、拉取前的准备工作、拉取消息这3个步骤都在消费者的轮询操作中完成 。 加入组之前需要先采用同步方式提交分区偏移量给协调者 
    取准备工作会先从协调者获取分区的提交偏移量,然后更新分区的拉取偏移量,使消费者的拉取消息工作可以正常开始 

    有下面几种事件触发再平衡操作
    - 消费者订阅的主题集合中任意一个主题的分区数量发生变化
    - 创建或删除一个主题
    - 消费组中已经存在的一个消费者成员挂掉了
    - 一个新的消费者成员加入已经存在的消费组中

    2. 消费者平衡监听器
      消费者在发送“加入组请求”之前,调用。onJoinPrepare ()方法会触发“消费者再平衡监听器”的onPartitionsRevoked () 方法,在加入消费组后调用 
    onJoinComplete () 方法会调用监听器的onPartitionsAssigned () 。 这两个方法的参数都是分区,前者是加入组之前分配的分区,后者是加入组之后分配
    的分区,所以这两个分区参数值会不一样

      “消费者再平衡监听器”只适用于订阅模式的消费者API ,如果使用手动分配分区模式,监听器不会起作用 。 因为消费者如果指定消费固定的分区 就不需要
    再平衡操作使用自定义“消费者再平衡监听器” 典型场景是 : 发生再平衡操作时,保存偏移量到外部存储系统中 
     

  • 相关阅读:
    js yui
    ie tbody table 兼容方法
    js json ie不支持json
    js ie 6,7,8 使用不了 firstElementChild
    js for in
    js json 单双引号互换
    html table 上移下移
    js autoComplate
    开发总结
    CentOS6.7 下安装git
  • 原文地址:https://www.cnblogs.com/jixp/p/9855211.html
Copyright © 2011-2022 走看看