zoukankan      html  css  js  c++  java
  • Apache Kafka源码分析 – Controller

    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals
    https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3

    Controller是为了加入replica机制而创建的,0.7时broker之间没有很强的关联,而由于现在每个topic partition都需要考虑,将replicas放在哪个broker上,并要处理例如reassignment或delete等操作,所以需要有个master来协调,于是就加入了controller

    One of the brokers is elected as the controller for the whole cluster. It will be responsible for:

    1. Leadership change of a partition (each leader can independently update ISR)
    2. New topics; deleted topics
    3. Replica re-assignment

    After the controller makes a decision, it publishes the decision permanently in ZK and also sends the new decisions to affected brokers through direct RPC. The published decisions are the source of truth and they are used by clients for request routing and by each broker during startup to recover its state. After the broker is started, it picks up new decisions made by the controller through RPC.

    Potential benefits:

    1. Easier debugging since leadership changes are made in a central place.
    2. ZK reads/writes needed for leadership changes can be batched (also easier to exploit ZK multi) and thus reduce end-to-end latency during failover.
    3. Fewer ZK watchers.
    4. More efficient communication of state changes by using direct RPC, instead of via a queue implementation in Zookeeper.

    Potential downside:

    1. Need controller failover. 

    ControllerContext

    其中关键是记录所有topics的partitions和replicas间关系,包括assignment和leadship关系

    PartitionStateMachine

    加入replica是很复杂的设计,其中重要的一点就是要考虑partitions和replicas中不同情况下的处理
    这里先看看Partition的各种state,以及state之间的状态机
    NonExistentPartition,不存在或被删掉的p,前一个状态是OfflinePartition
    NewPartition, 刚被创建,但还没有完成leader election, 前一个状态是NonExistentPartition
    OnlinePartition,具有leader,正常状态,前一个状态是NewPartition/OfflinePartition
    OfflinePartition,leader die的时候partition会变为offline,前一个状态是NewPartition/OnlinePartition
    这里NewPartition和OfflinePartition都是没有leader的状态,为何要区别开来?见下


    核心函数handleStateChanges,对于每个topicAndPartition调用状态机函数handleStateChange,然后把结果告诉每个brokers

    状态机函数的具体逻辑, 其他的状态变化都比较简单
    唯有,在变成OnlinePartition的时候,需要区分new或offline两种状况,下面具体看下


    对于初始化leader只需要取出liveAssignedReplicas的head
    而对于offline,需要优先ISR中的,然后才是AR中的 (初始时,AR应该=ISR,但因为会通过shrink isr来调整isr列表,所以AR应该是>=ISR的)

    NewPartition->OnlinePartition

    OfflinePartition,OnlinePartition->OnlinePartition
    OfflinePartitionLeaderSelector

    ReplicaStateMachine

    对应于Replica也有比较复杂的状态和相应的状态机
    NewReplica,新创建的,只能接受become follower请求,前一个状态为NonExistentReplica
    OnlineReplica,正常状态,可以接受become leader和become follower,前一个状态为NewReplica, OnlineReplica or OfflineReplica
    OfflineReplica ,replica die,一般由于存储该replica的broker down,前一个状态为NewReplica, OnlineReplica
    Replica的删除有3种状态,offline的replica就可以被delete
    ReplicaDeletionStarted,前一个状态是OfflineReplica
    ReplicaDeletionSuccessful,前一个状态是ReplicaDeletionStarted
    ReplicaDeletionIneligible, 前一个状态是ReplicaDeletionStarted
    NonExistentReplica,被删除成功的replica,前一个状态是ReplicaDeletionSuccessful

    和PartitionStateMachine一样,最关键的函数就是状态机函数


    KafkaController

    KafkaController中定义,
    partition和replica的状态机
    controllerElector, controller作为master可以简单的select partition的leader,但如果controller dead,需要重新在brokers中选取controller,这就需要基于ZK的elect算法
    一系列的partition leader的selector用于在不用情况下select partition的leader

    下面具体看下Controller上面实现的接口,包含对broker,controller,partition的系列操作

    onBrokerStartup:
    1. send UpdateMetadata requests for all partitions to newly started brokers
    2. replicas on the newly started broker –> OnlineReplica
    3. partitions in OfflinePartition and NewPartition -> OnlinePartition (with OfflinePartitionLeaderSelector)
    4. for partitions with replicas on newly started brokers, call onPartitionReassignment to complete any outstanding partition reassignment

    onBrokerFailure:
    1. partitions w/o leader –> OfflinePartition
    2. partitions in OfflinePartition and NewPartition -> OnlinePartition (with OfflinePartitionLeaderSelector)
    3. each replica on the failed broker –> OfflineReplica


    shutdownBroker:
    1. each partition whose leader is on shutdown broker -> OnlinePartition (ControlledShutdownPartitionLeaderSelector)
    2. each replica on shutdown broker that is follower, send StopReplica request (w/o deletion)
    3. each replica on shutdown broker that is follower -> OfflineReplica (to force shutdown replica out of the isr)

    onControllerFailover:
    当通过zk electing出当前的broker为controller后,调用此函数做初始化工作
    增加controller epoch
    初始化controller context
    startup channel manager,replica state machine和partition state machine
    当初始化时发生任何问题, 都会放弃成为controller

    onNewTopicCreation:
    1. call onNewPartitionCreation


    onNewPartitionCreation:
    1. new partitions –> NewPartition
    2. all replicas of new partitions –> NewReplica
    3. new partitions –> OnlinePartition
    4. all replicas of new partitions –> OnlineReplica

     

    onPartitionReassignment: (OAR: old assigned replicas; RAR: new re-assigned replicas when reassignment completes)
    注释写的很详细,参考其中的例子
    1. 在ZK中,将AR更新为OAR + RAR,由于在reassignment的过程中所有这些replicas都可以被认为是assign给该partition
    2. 给every replica in OAR + RAR发送LeaderAndIsr request,更新ISR为OAR + RAR
    3. 对于RAR - OAR,创建新的replicas,并把replica状态设为NewReplica
    4. 等待所有RAR中的replicas和leader完成同步
    5. 将RAR中的所有replicas设为OnlineReplica
    6. 在内存中,将AR设为RAR,但Zk里面仍然是OAR + RAR
    7. 如果RAR中没有leader,elect一个新的leader
    8. 将OAR - RAR中的设为OfflineReplica. shrink isr以remove OAR - RAR,并发送LeaderAndIsr通知leader,最后发送StopReplica(delete = false)
    9. 将OAR - RAR中的设为NonExistentReplica
    10. 在ZK中,将AR设为RAR,代表reassignment完成


  • 相关阅读:
    模板
    模板
    模板
    模板
    2017-2018 ACM-ICPC Asia Tsukuba Regional Contest
    牛客
    软件工程
    Codeforces Round 696(Div.2)
    Atcoder ARC111 contest
    Codeforces Educational Round 100(Div.2)
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3569518.html
Copyright © 2011-2022 走看看