zoukankan      html  css  js  c++  java
  • Zookeeper connection loss leads to Flink job restart

    Flink可以使用zookeeper来进行ha,而一般我们都会使用zookeeper的高级api架构curator来对zk进行通讯。在curator中引入了状态的概念,包括connected,reconnected,suspeneded,lost与read_only,其中suspended是个有意思的状态,当因为网络抖动、机器繁忙、zk集群短暂无响应,都会导致curator将状态置为suspended.

    而Flink对suspended采取了非常谨慎的处理,就是发现是suspended,则取消所有作业,进行restart,显得未免有些太敏感了,其实这个时候往往zk也是ok的,相应的jm也是leader都没有问题。

    好,我们再顺一下:

    在发生zk connection loss的情况下,curator会设置suspended状态,在此状态下,curator会释放leader,flink在发现notleader之后则会revokeLeadership,进而导致dispatcher会cancel掉所有的job,cancel的过程中flink会主动抛出异常。

    虽然这样做没什么大的影响,因为其实如果connection很快恢复,作业也会很快被拉起,没有大碍,但看起来总是不好,zk连接随便的一个扰动,都可能导致job重启,所以就想把它改动。

    方案一:

    在flink的ZooKeeperUtils.java通过CuratorFrameworkFactory来构造CuratorFramework时,通过connectionStateErrorPolicy将ConnectionStateErrorPolicy从StandardConnectionStateErrorPolicy更新为SessionConnectionStateErrorPolicy,前者将suspended和lost都作为error,后者只是将lost作为error,而只有发生error的时候才会取消leadership,所以如此设置之后,在进入suspended状态时,不在发生leadership的取消和重新选举。

    优点:从整体的状态转换上进行了控制,优雅。

    缺点:目前flink所引用的curator的版本为2.12.0,不支持设置policy,需要更新curator版本号,是否会带来其他问题,不可知。

    测试:成功。

    更改curator的版本为4.2.0,提交作业,restart zk,job没有重启,checkpoint正常进行。

    方案二:

    在flink内部,在代码ZooKeeperLeaderElectionService.java中的notLeader方法中,在收到notleader的通知的时候,根据当前的状态是否是suspended进行相应的处理。

    优点:不对flink的整体造成影响,更改在局部范围内可控。

    缺点:由于curator对suspended的处理依旧,所以从curator的层面还是会发生取消leadership然后重新进行选举的情况,虽然这一切都不必要。

    测试:失败

    1.原先预计的是在notleader方法中,如果发现当前状态是suspended,就不去执行revokeLeadership方法,但notleader方法和suspended状态的获取分别是在两个回调方法中触发的,经过测试,无法保证两个回调的执行顺序,即有可能notleader方法已经触发,但是suspended状态还没有触发。

    2.如果只是修改notleader方法,即使修改成功,还是会触发isleader方法,在isleader方法中,如果不修改,还是会触发原有作业的取消和重新提交,所以这里也要改,改成重新链接之后这里即使被通知isleader也不会去给dispatcher进行grantLeadership,但又不能直接这么操作,还需要判断是否自己已经是leader,但可惜的是,在发生suspended的时候,curator里面已经将leadership取消掉了,所以如果在这里加上判断是connected状态并且不是leader然后不去grantleadership,会看起来很奇怪。

    总而言之,如果不动curator的逻辑,只是在flink里改,这里的逻辑就会被改的难以理解,并且还无法成功。

    目前的方案应对的场景是zk connection的短时间抖动,如果发生zk connection的长时间不可用,则tm和jm都会失败,这个也是应有之义。

    另,

    在flink中对curator的suspended状态起作用的还有一个地方,在ZooKeeperCheckpointIDCounter.java中有对suspended的判断,如果之前是suspended或者Lost,则flink就不会去zk上存取checkpoint的信息了。这里感觉是个坑,也需要改对suspended的策略。

    外一篇,

    zookeeper可以设置session timeout时间,但是不是你随便设置就会起作用,会有一个判断的过程。

    SessionTimeOut的协商如下:

    • 情况1: 配置文件配置了maxSessionTimeOut和minSessionTimeOut

    最终SessionTimeOut,必须在minSessionTimeOut和maxSessionTimeOut区间里,如果跨越上下界,则以跨越的上届或下界为准。

    • 情况2:配置文件没有配置maxSessionTimeOut和minSessionTimeOut

    maxSessionTimeout没配置则 maxSessionTimeOut设置为 20 * tickTime

    minSessionTimeOut没配置则 minSessionTimeOut设置为 2 * tickTime

    也就是默认情况下, SessionTimeOut的合法范围为 4秒~40秒,默认配置中tickTime为2秒。

  • 相关阅读:
    深入学习webpack(四)
    深入学习webpack(三)
    深入学习webpack(二)
    深入学习webpack(一)
    (转载)路径中 斜杠/和反斜杠 的区别
    jquery中beforeSend和complete的使用 --- 提高用户体验&&设置请求头
    好的用户界面-界面设计的一些技巧
    使用vue Devtools
    vue脚手架 && 实例
    表单(中)-EasyUI Combogrid 组合网格、EasyUI Numberbox 数字框、EasyUI Datebox 日期框、EasyUI Datetimebox 日期时间框、EasyUI Calendar 日历
  • 原文地址:https://www.cnblogs.com/029zz010buct/p/10946244.html
Copyright © 2011-2022 走看看