zoukankan      html  css  js  c++  java
  • Kafka技术内幕 读书笔记之(五) 协调者——延迟的加入组操作

     

      协调者处理不同消费者的“加入组请求”,由于不能立即返回“加入组响应”给每个消费者,它会创建一个“延迟操作”,表示协调者会延迟发送“加入组响应”给消费者 。 
    但协调者不会为每个消费者的 加入组请求”都创建一个 延迟操作”,而是仅当消费组状态从“稳定”转变为“准备再平衡”,才创建一个“延迟操作”对象 

    (1)初始时消费组状态为“稳定”,第一个消费者加入消费组 。 因为消费组状态为“稳定”,所以协调者允许消费者执行再平衡操作 。协调者更改消费
           组的状态为“准备再平衡”,并创建个延迟的操作对象 

    (2)消费组状态为“准备再平衡”,第二个消费者加入消费组 。 因为消费组状态为“准备再平衡”,所以协调者不允许消费者执行再平衡操作 。消费组状态仍然不变,
           协调者也不创建延迟的操作对象。 

    “准备再平衡” 

      协调者处理消费者的“加入组请求”,如果消费者设置的成员编号未知,协调者会为这个消费者指定一个新的成员编号,然后创建消费者成员元数据,并加入到消
     费组元数据中 如果消费者成员编号是已知的,说明消费组元数据中已经存在对应的消费者成员元数据,只需要更新已有的成员元数据

      协调者为消费者分配的成员编号,会作为“加入组响应结果”返回给消费者消费者发送“同步组请求”时必须指定这个新分配的成员编号 ,这样协调者才能
    知道成员编号对应的消费者元数据实际上 , 后面如果消费者要重新加入消费组,再次发送“加入组请求”时也需要指定成员编号 。 通常来说,协调者为消费者
    分配了一个成员编号,协调者的消费组元数据就会直记录这个消费者的信息 。成员编号的主要作用就是用来标识一个消费者,消费者后续的任何请求动作都
    应该带有分配给它的成员编号
       消费组的状态从“稳定”进入“准备再平衡”,表示准备开始再平衡操作 一次再平衡操作只会由一个消费者发起,并只会创建一个延迟的操作对象。 延迟操
    作还和延迟缓存有关,因为延迟操作如果不能及时完成,就应该放入延迟缓存。 当然,延迟缓存不是普通的数据缓存,它还要提供检查延迟操作能否完成的方法,
    并且保证在指定时间内未完成时,必须强制完成超时的延迟操作

    延迟操作和延迟缓存
      Kafka服务端在处理客户端的一些请求时,如果不能及时返回响应结果给客户端,会在服务端创建一个延迟操作对象( DelayedOperatlon ),并放在
    延迟缓存中( DelayedOperationPurgatory Kafka的延迟操作有多种:延迟的生产 、延迟的响应 、延迟的加入 、延迟的心跳
     这里先给出一些延迟操作相关的结论
    - 延迟操作需要指定一个超时时间,表示在指定时间内没有完成时会被强制完成
    - 延迟操作加入到延迟缓存中,会指定一个键 比如,和消费组相关的延迟加入,键是消费组编号 
    - 服务端创建延迟操作后,通常会有“尝试完成延迟操作”的动作(延迟操作如果能够尽早完成是最好的) 。 尝试完成延迟操作的外部事件会有多种情况,
      而且因为延迟操作有依赖条件,所以任何可能改变依赖条件的事件,都应该执行“尝试完成延迟操作” 。比如,协调者因为依赖了“等待消费者发送加入组请求”
      这个条件才会创建“延迟的加入组”对象 。 如果有消费者发送了加入组请求,就应该尝试完成“延迟的加入组”对象
    - 当外部事件尝试完成延迟操作时,怎么判断延迟操作能不能完成?不同的延迟操作类型因为依赖条件不同,应该自定义可以完成延迟操作的条件判断
     
    创建延迟操作的最终目的是让操作不再被延迟,延迟操作对象有下面几个跟完成操作相关的方法
    - tryComplete () 尝试完成,如果不能完成,返回 false ,表示延迟操作还不能完成
    - onComplete ()延迟操作完成时的回调方法,完成有两种:正常主动完成和超时被动完成
    - onExpiration ()延迟操作超时的回调方法,如果之前一直调用尝试完成都不能完成,在指定的超时时间过去后就会强制完成。 调用这个回调方法,
       一定会再调用tryComplete ()方法

    尝试完成延迟的加入操作

      协调者在创建完延迟操作对象之后,为了检查能否完成刚刚创建的延迟操作,会调用延迟缓存的tryCompleteElseWatch () 方法立即尝试完成 
     延迟缓存会调用延迟操作的tryComplete () 方法, 对于加入组的延迟缓存,就是调用延迟加入对象的tryCompleteJoin ()方法 。 这个方法的第
    个参数表示如果可以完成,就会强制完成延迟加入对象, RP最终会调用到延迟加入对象的 onCompleteJoin ()方法 。 延迟加入操作对象的
    tryComplete () 方法和 onComplete() 方法,它们的具体实现是调用协调者的tryCompleteJoin ()方法和 onCompleteJoin () 方法 

       延迟操作能否完成的判断条件是:消费组元数据的notYetRejoinedMembers ()方法返回值是否为空 这个方法收集的是消费组中 awaitingJoinCallback
    值对象为空的消费者成员元数据因为协调者旦开始处理消费者发送的“加入组请求”,就会设置awaitingJoinCallback值对象为“发送响应的回调方法”,
    所以如果消费者发送了“加入组请求”,并且也被协调者开始处理,就不会被notYetRejoinedMembers()方法选出来 

      以协调者处理第一个消费者发迭的加入组请求为例,因为第个消费者的 awaitingJoinCallback值对象为空,所以 notYetRejoinedMembers  ()方法
    不会选择第个消费者那么,这个方法因为没有收集到任何个满足条件的消费者,返回值为空,就会执行 forceComplete ()方法,并调用延迟操作
    onCompleteJoin () 方法,开始返回“加入组响应”给消费者 

    这和我们前面认为的“协调者会等待所有的消费者都发送了加入组请求后,才会认为请求处理完成”看起来有点矛盾。 协调者在处理第一个消费者的加
    入组请求,没等到其他消费者发送加入组请求,就已经开始返回加入组响应结果给第个消费者了 。 实际上 ,协调者实现消费组的再平衡操作,
    是通过让消费者重新发送“加入组请求”的方式来完成的 。 

    消费组稳定后,原有消费者重新加入消费组

      协调者在处理消费者发送的“加入组请求”和“同步组请求”时,都会依赖于消费组当前的状态进入不同的分支流程 。 假设第个消费者完成一次再平衡操
    作后,又有新的消费者发送了“加入组请求” 。 如下图 所示,新消费者会发起新的再平衡操作,原有的消费者也需要重新发送“加入组请求”,
    具体步骤如下
    (1) (图 (左))第一个消费者发送“加入组请求”,也完成了延迟操作,会将它的回调方法重置为空。

    (2) (图(中))协调者处理第二个消费者的“加入组请求”,消费组状态已经是“稳定” 和处理第个消费者的“加入组请求”类似,
      它也会将消费组状态改为“准备再平衡”,并创建一个延迟的操作对象

    (3) (图 (中))协调者创建完延迟操作后,通过延迟缓存尝试完成刚刚创建的延迟操作 但和协调者处理第个消费者不一样,这时尝试完成的条件不能满足,
      处理第个消费者的“加入组请求”就结束了 。

    (4)  虽然协调者处理第个消费者的“加入组请求”结束了,但延迟操作对象还不能完成,延迟操作对象会被加入到延迟缓存的监控列表中 
        后续要完成延迟操作,有两种办法:外部事件触发、超时触发 

    (5)(图 (右))协调者等待第个消费者重新发送“加入组请求” 如果第个消费者在超时时间内重新发送“加入组”请求,再次调用“尝试完成延
      迟操作”(外部事件触发),满足完成延迟操作的条件。 延迟操作会从延迟缓存中移除,并调用延迟操作完成的回调方法,返回“加入组响应”给所有的
      两个消费者。

    (6)但如果第一个消费者在会话超时时间内没有重新发送“加入组请求,消费组成员元数据中第个消费者的回调方法一直是空的 。 协调者会在延迟
           操作超时后,强制完成超时的延迟操作 这时也调用延迟操作完成的回调方法,但只返回“加入组响应” 给第二个消费者(因为第一个消费者的回
      调方法为空,所以并不会返回响应结果给第个消费者)

      协调者在处理第个消费者的“加入组请求”时,创建的延迟操作对象会即完成 但处理第二个消费者的“加入组请求”时,消费组中已经存在第个消费者
    的成员元数据,此时消费组中总共有两个消费者了 。 但是第个消费者成员元数据的回调方法在协调者返回“加入组响应”给它时,就被重置为空了 
     协调者在处理第个消费者的“加入组请求”时,第个消费者成员元数据的回调方法不为空 消费组元数据的 notYetRejoinnedMembers ()方法,
    表示还没重新发送“加入组请求”的消费者成员 。 如果该方法的返回值有数据,说明还有消费者成员没发送“加入组请求” 。 这里的消费者成员
    必须是存在于消费组中的消费者成员,也是之前发送过“加入组请求”的成员,所以才表示“重新发送” 

     

     消费组未稳定,原有消费者重新加入消费组

      再来看另种场景:其他消费者发送“加入组请求”先于第个消费者发送“同步组请求” 协调者返回“加入组响应”给第个消费者,并更改消费组
    状态为“等待同步” 第一个消费者收到“加入组响应”后,但还没完成分区分配的工作,就有新的消费者发送了“加入组请求” 。 这时候其实第
    个消费者是不需要执行分区分配的,因为即使执行了,也只有它一个的,并不会包含新加入的第二个消费者 。 

      如下图 (上)所示,第个消费者完成分区分配工作后,“同步组请求”的消费组分配结果只有 第一个消费者的数据此时,第一个消费者将分配
    结果发送给协调者,协调者是不会接受的 。 因为协调者已经处理了第二个消费者的“加入组请求”,消费组的状态被更改为“准备再平衡” 个消费
    者在这个状态下, 会收到 REBALANCE_IN_PROGRESS的错误码,并重新发送“加入组请求”

      如下图 (下)所示,第一个消费者重新发送了“加入组请求”,协调者对第个消费者重新发送的“加入组请求”,也会尝试完成第个消费者创建的
    延迟操作因为满足可以完成延迟操作的条件,所以协调者会再次将消费组状态改为“等待同步”,并返回“加入组响应”给两个消费者 。 后续协调
    者处理两个消费者的“同步组请求”,个消费者不是主消费者,只会设置消费者成员元数据中的回调方法。 当第一个消费者执行完分区分配后,
    协调者处理第个消费者的“同步组请求”, 同时返回“同步组响应”给两个消费者。 



      消费组状态为“稳定”后,当有新消费者加入消费组,将消费组状态更改为“准备再平衡”,原有的消费者是通过心跳的方式
    感知到需要重新发送“加入组请求” 。 如果消费组状态还是“等待同步”,就有新消费者加入消费组,也更改消费组状态为“再平衡” 
    于消费组状态未稳定,原有的消费者不会有心跳任务,所以协调者采用返回错误码方式通知原有的消费者 。 即消
    者在发送“同步组请求”时,如果消费组状态为“准备再平衡”,协调者会要求消费者重新发送“加入组请求” 

      消费者的“加入组请求”时,使用一个“延迟操作”对象表示延迟返回“加入组响应”给消费者。 延迟操作建时,
    伴随着消费组状态从“稳定”转变为“准备再平衡”;延迟操作完成时,消费组状态会从“准备再平衡”转变为“等待同步” 。 

  • 相关阅读:
    Python面向对象-类成员
    python面向对象-继承
    Linux 静态和动态添加路由
    Linux 添加虚拟网卡
    centos7源码包安装Mongodb,并设置开机自启动
    centos7配置静态ip地址
    Python开发【前端篇】JavaScript和Jquery
    Python爬虫【解析库之pyquery】
    Python爬虫 selenium
    Python爬虫【解析库之beautifulsoup】
  • 原文地址:https://www.cnblogs.com/jixp/p/9857831.html
Copyright © 2011-2022 走看看