zoukankan      html  css  js  c++  java
  • Zookeeper 源码(六)Leader-Follower-Observer

    Zookeeper 源码(六)Leader-Follower-Observer

    上一节介绍了 Leader 选举的全过程,本节讲解一下 Leader-Follower-Observer 服务器的三种角色。经过 Leader 选举后各服务器都能确定自己的角色,下一步就是初始化各自的角色。

    先回顾一下【QuorumPeer】的 run 方法选举结束后创建对应的角色:

    case OBSERVING:
        setObserver(makeObserver(logFactory));
        observer.observeLeader();
        break;
    case FOLLOWING:
        setFollower(makeFollower(logFactory));
        follower.followLeader();
        break;
    case LEADING:
        // 3.1 初始化 Leader 对象
        setLeader(makeLeader(logFactory));
        // 3.2 lead 线程在这里阻塞
        leader.lead();
        setLeader(null);
        break;
    

    一、Leader

    Leader 服务器是整个 Zookeeper 集群工作机制中的核心,其主要工作有以下两个。

    • 事务请求的唯一调度和处理者,保证集群事务处理的顺序性。
    • 集群内部各服务器的调度者

    (1) Leader 初始化 【Leader】

    protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
        return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
    }
    
    // Leader 启动时启动 2888 的端口,用于服务器内部通信(如数据同步等)
    Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
        this.self = self;
        try {
            if (self.getQuorumListenOnAllIPs()) {
                ss = new ServerSocket(self.getQuorumAddress().getPort());
            } else {
                ss = new ServerSocket();
            }
            ss.setReuseAddress(true);
            if (!self.getQuorumListenOnAllIPs()) {
                ss.bind(self.getQuorumAddress());
            }
        } catch (BindException e) {
            // 省略...
        }
        this.zk = zk;
    }
    

    (2) lead 【Leader】

    void lead() throws IOException, InterruptedException {
        // 1. 恢复本地数据
        zk.loadData();
    
        // 2. 启动 lead 端口的监听线程,专门用来监听新的 follower 
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();
    
        // 3. 启动服务
        startZkServer();
    
        while (true) {
            // 省略...
        }
    }
    

    (3) LearnerCnxAcceptor 【Leader】

    follower 连上 leader 后,此时 leader 会为每个 follower 启动单独 IO 线程,请看 LearnerCnxAcceptor 代码

    class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
        private volatile boolean stop = false;
        @Override
        public void run() {
        while (!stop) {
            Socket s = ss.accept();
            // 读超时设为 initLimit 时间  
            s.setSoTimeout(self.tickTime * self.initLimit);
            s.setTcpNoDelay(nodelay);
            // 为每个 follower 启动单独线程,处理 IO 
            LearnerHandler fh = new LearnerHandler(s, Leader.this);
            fh.start();
        }
    }
    

    LearnerHandler 持有 Leader 与 Learner 的 Socket 对象,专门用来处理服务器之间的通信。

    (4) startZkServer 【Leader】

    private synchronized void startZkServer() {
        // 省略...
        zk.startup();
        // 省略...
    }
    

    (4) startup 【LeaderZooKeeperServer】

    LeaderZooKeeperServer 继承自 ZooKeeperServer,重写了 startup 和 setupRequestProcessors,这里重点关注 Leader 服务器的请求处理链。

    @Override
    public synchronized void startup() {
        super.startup();
        if (containerManager != null) {
            containerManager.start();
        }
    }
    
    /**
     * Leader 服务器的请求处理链
     * LeaderRequestProcessor -> PrepRequestProcessor -> ProposalRequestProcessor ->
     * CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor
     */
    @Override
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        commitProcessor.start();
    
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
        proposalProcessor.initialize();
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
    
        setupContainerManager();
    }
    

    (1) PrepRequestProcessor 请求预处理器。在Zookeeper中,那些会改变服务器状态的请求称为事务请求(创建节点、更新数据、删除节点、创建会话等),PrepRequestProcessor能够识别出当前客户端请求是否是事务请求。对于事务请求,PrepRequestProcessor处理器会对其进行一系列预处理,如创建请求事务头、事务体、会话检查、ACL检查和版本检查等。

    (2) ProposalRequestProcessor 事务投票处理器。Leader服务器事务处理流程的发起者,对于非事务性请求,ProposalRequestProcessor会直接将请求转发到CommitProcessor处理器,不再做任何处理,而对于事务性请求,处理将请求转发到CommitProcessor外,还会根据请求类型创建对应的Proposal提议,并发送给所有的Follower服务器来发起一次集群内的事务投票。同时,ProposalRequestProcessor还会将事务请求交付给SyncRequestProcessor进行事务日志的记录。

    (3) CommitProcessor 事务提交处理器。对于非事务请求,该处理器会直接将其交付给下一级处理器处理;对于事务请求,其会等待集群内针对Proposal的投票直到该Proposal可被提交,利用CommitProcessor,每个服务器都可以很好地控制对事务请求的顺序处理。

    (4) ToBeCommitProcessor 该处理器有一个toBeApplied队列,用来存储那些已经被CommitProcessor处理过的可被提交的Proposal。其会将这些请求交付给FinalRequestProcessor处理器处理,待其处理完后,再将其从toBeApplied队列中移除。

    (5) FinalRequestProcessor 用来进行客户端请求返回之前的操作,包括创建客户端请求的响应,针对事务请求,该处理还会负责将事务应用到内存数据库中去。

    (6) SyncRequestProcessor 事务日志记录处理器。用来将事务请求记录到事务日志文件中,同时会触发Zookeeper进行数据快照。

    (7) AckRequestProcessor 负责在SyncRequestProcessor完成事务日志记录后,向Proposal的投票收集器发送ACK反馈,以通知投票收集器当前服务器已经完成了对该Proposal的事务日志记录。

    二、Follower

    Follower 的初始化与 Leader 类似,重点关注一下 Follower 的请求处理链。

    void followLeader() throws InterruptedException {
        // 省略...
        try {
            InetSocketAddress addr = findLeader();            
            try {
                connectToLeader(addr);
                // 省略...
                syncWithLeader(newEpochZxid);                
                QuorumPacket qp = new QuorumPacket();
                while (self.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);
                }
            } catch (Exception e) {
                // 省略...
            }
        } 
    }
    
    /**
     * Follower 服务器的请求处理链
     * 1. FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor
     * 2. SyncRequestProcessor -> SendAckRequestProcessor
     */
    @Override
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this,
                new SendAckRequestProcessor((Learner)getFollower()));
        syncProcessor.start();
    }
    

    (1) FollowerRequestProcessor 其用作识别当前请求是否是事务请求,若是,那么Follower就会将该请求转发给Leader服务器,Leader服务器是在接收到这个事务请求后,就会将其提交到请求处理链,按照正常事务请求进行处理。

    (2) SendAckRequestProcessor其承担了事务日志记录反馈的角色,在完成事务日志记录后,会向Leader服务器发送ACK消息以表明自身完成了事务日志的记录工作。

    ZookeeperServer 类图

    参考:

    1. 《Zookeeper源码分析之六 Leader/Follower初始化》:https://blog.csdn.net/haihongazar/article/details/52709244
    2. 《zookeeper源码分析之五服务端(集群leader)处理请求流程》:https://www.cnblogs.com/davidwang456/p/5004599.html
    3. 从 Paxos 到 Zookeeper : 分布式一致性原理与实践

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    RabbitMQ学习之:(八)Topic Exchange (转贴+我的评论)
    RabbitMQ学习之:(六)Direct Exchange (转贴+我的评论)
    抽象类
    开闭原则
    反射
    解决默认方法冲突
    单一职责原则
    java四种访问修饰符
    Java中基本类型和包装类
    方法在继承过程中可能出现的问题
  • 原文地址:https://www.cnblogs.com/binarylei/p/9955957.html
Copyright © 2011-2022 走看看