zoukankan      html  css  js  c++  java
  • ZooKeeper(三):请求处理链路的创建过程解析

      我们知道,zk就是一个个处理链组成的。

      但是,这些处理链是在什么创建的呢?

      ZooKeeper 中有三种角色的服务节点存在: Leader, Follower, Observer . 

      而每个服务节点的承担的任务是不一样的,所以处理任务的逻辑是不一样的。而在ZK中,则是巧妙的通过责任链模式将各自节点的处理能力建立起来的。

      而这个创建时机是在什么时候呢?服务一启动的时候?还是每个请求进来的时候?

      其实ZK服务节点的处理链路是在角色被确定下来了之后,才创建的!我们一起看一下过程!

    一、选举线程会一直工作

      QuorumPeer 线程会一直循环检查当前节点的状态,当当前节点的决定确认之后,其处理链自然就定了!

      所以,我们需要先来看一下他是如何处理当前角色的先!

        // org.apache.zookeeper.server.quorum.QuorumPeer#run
        @Override
        public void run() {
            updateThreadName();
    
            // 注册jmx 度量信息
            LOG.debug("Starting quorum peer");
            try {
                jmxQuorumBean = new QuorumBean(this);
                MBeanRegistry.getInstance().register(jmxQuorumBean, null);
                for (QuorumServer s : getView().values()) {
                    ZKMBeanInfo p;
                    if (getId() == s.id) {
                        p = jmxLocalPeerBean = new LocalPeerBean(this);
                        try {
                            MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                        } catch (Exception e) {
                            LOG.warn("Failed to register with JMX", e);
                            jmxLocalPeerBean = null;
                        }
                    } else {
                        RemotePeerBean rBean = new RemotePeerBean(this, s);
                        try {
                            MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
                            jmxRemotePeerBean.put(s.id, rBean);
                        } catch (Exception e) {
                            LOG.warn("Failed to register with JMX", e);
                        }
                    }
                }
            } catch (Exception e) {
                LOG.warn("Failed to register with JMX", e);
                jmxQuorumBean = null;
            }
    
            try {
                /*
                 * Main loop
                 */
                 // 死循环,一直检测当前节点的状态,当确认角色后,进行正式工作循环
                 // 当状态再次变更时,会抛出相应异常,从而进行重新选举操作
                while (running) {
                    switch (getPeerState()) {
                    // LOOKING 将触发选举,很关键,但我们后续再来解析这块东西
                    case LOOKING:
                        LOG.info("LOOKING");
                        ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
    
                        if (Boolean.getBoolean("readonlymode.enabled")) {
                            LOG.info("Attempting to start ReadOnlyZooKeeperServer");
    
                            // Create read-only server but don't start it immediately
                            final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
    
                            // Instead of starting roZk immediately, wait some grace
                            // period before we decide we're partitioned.
                            //
                            // Thread is used here because otherwise it would require
                            // changes in each of election strategy classes which is
                            // unnecessary code coupling.
                            Thread roZkMgr = new Thread() {
                                public void run() {
                                    try {
                                        // lower-bound grace period to 2 secs
                                        sleep(Math.max(2000, tickTime));
                                        if (ServerState.LOOKING.equals(getPeerState())) {
                                            roZk.startup();
                                        }
                                    } catch (InterruptedException e) {
                                        LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                    } catch (Exception e) {
                                        LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                                    }
                                }
                            };
                            try {
                                roZkMgr.start();
                                reconfigFlagClear();
                                if (shuttingDownLE) {
                                    shuttingDownLE = false;
                                    startLeaderElection();
                                }
                                setCurrentVote(makeLEStrategy().lookForLeader());
                            } catch (Exception e) {
                                LOG.warn("Unexpected exception", e);
                                setPeerState(ServerState.LOOKING);
                            } finally {
                                // If the thread is in the the grace period, interrupt
                                // to come out of waiting.
                                roZkMgr.interrupt();
                                roZk.shutdown();
                            }
                        } else {
                            try {
                                reconfigFlagClear();
                                if (shuttingDownLE) {
                                    shuttingDownLE = false;
                                    startLeaderElection();
                                }
                                setCurrentVote(makeLEStrategy().lookForLeader());
                            } catch (Exception e) {
                                LOG.warn("Unexpected exception", e);
                                setPeerState(ServerState.LOOKING);
                            }
                        }
                        break;
                    // OBSERVING 节点只会接收来自 Leader 的数据
                    case OBSERVING:
                        try {
                            LOG.info("OBSERVING");
                            setObserver(makeObserver(logFactory));
                            observer.observeLeader();
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                        } finally {
                            observer.shutdown();
                            setObserver(null);
                            updateServerState();
    
                            // Add delay jitter before we switch to LOOKING
                            // state to reduce the load of ObserverMaster
                            if (isRunning()) {
                                Observer.waitForObserverElectionDelay();
                            }
                        }
                        break;
                    // FOLLOWING 节点将参与数据的决策
                    case FOLLOWING:
                        try {
                            LOG.info("FOLLOWING");
                            setFollower(makeFollower(logFactory));
                            follower.followLeader();
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                        } finally {
                            follower.shutdown();
                            setFollower(null);
                            updateServerState();
                        }
                        break;
                    // LEADING 负责数据的写入,以及维护各Follower的数据同步等
                    case LEADING:
                        LOG.info("LEADING");
                        try {
                            setLeader(makeLeader(logFactory));
                            leader.lead();
                            setLeader(null);
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                        } finally {
                            if (leader != null) {
                                leader.shutdown("Forcing shutdown");
                                setLeader(null);
                            }
                            updateServerState();
                        }
                        break;
                    }
                }
            } finally {
                // 最终退出,可能是异常退出,也可能是主动关闭了(running=false),总之做好记录、清理工作
                LOG.warn("QuorumPeer main thread exited");
                MBeanRegistry instance = MBeanRegistry.getInstance();
                instance.unregister(jmxQuorumBean);
                instance.unregister(jmxLocalPeerBean);
    
                for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
                    instance.unregister(remotePeerBean);
                }
    
                jmxQuorumBean = null;
                jmxLocalPeerBean = null;
                jmxRemotePeerBean = null;
            }
        }

      总体来说就是,始终会有一个后台线程,一直去检查当前节点的状态,然后根据状态去分派任务。

      当任务分派下去之后,没有变化就不要返回了,即一直处理自己任务即可。只要不发生异常,它会一直保持下去。

      总共有四个状态: LOOKING, OBSERVING, FOLLOWING, LEADING, 也字如其义,做各自的工作。

    二、Follower 的链路创建过程

    // 如下: 1. 创建 Follower 实例; 2. 调用 followLeader() 执行业务逻辑;
        case FOLLOWING:
            try {
                LOG.info("FOLLOWING");
                // 先创建 Follower 实例
                setFollower(makeFollower(logFactory));
                // 然后调用 followLeader() 进行处理
                // 当然,它一般会进行循环处理任务,而不是处理完一次后就退出
                follower.followLeader();
            } catch (Exception e) {
                LOG.warn("Unexpected exception", e);
            } finally {
                follower.shutdown();
                setFollower(null);
                updateServerState();
            }
            break;
        // org.apache.zookeeper.server.quorum.QuorumPeer#makeFollower
        protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
            // FollowerZooKeeperServer 是个关键的实例
            // 将上下文传递到 Follower 中,备用
            return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
        }
        // org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#FollowerZooKeeperServer
        FollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
            super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self);
            // 使用 ConcurrentLinkedQueue 队列进行消息传递
            this.pendingSyncs = new ConcurrentLinkedQueue<Request>();
        }
        
        // 创建好 follower 实例后,就调用其主要方法 followLeader()
        // 这里承担了所有的 follower 的动作指令
        /**
         * the main method called by the follower to follow the leader
         *
         * @throws InterruptedException
         */
        void followLeader() throws InterruptedException {
            self.end_fle = Time.currentElapsedTime();
            long electionTimeTaken = self.end_fle - self.start_fle;
            self.setElectionTimeTaken(electionTimeTaken);
            ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
            LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT);
            self.start_fle = 0;
            self.end_fle = 0;
            fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
    
            long connectionTime = 0;
            boolean completedSync = false;
    
            // 先做一些准备工作
            try {
                self.setZabState(QuorumPeer.ZabState.DISCOVERY);
                QuorumServer leaderServer = findLeader();
                try {
                    connectToLeader(leaderServer.addr, leaderServer.hostname);
                    connectionTime = System.currentTimeMillis();
                    long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                    if (self.isReconfigStateChange()) {
                        throw new Exception("learned about role change");
                    }
                    //check to see if the leader zxid is lower than ours
                    //this should never happen but is just a safety check
                    long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                    if (newEpoch < self.getAcceptedEpoch()) {
                        LOG.error("Proposed leader epoch "
                                  + ZxidUtils.zxidToString(newEpochZxid)
                                  + " is less than our accepted epoch "
                                  + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                        throw new IOException("Error: Epoch of leader is lower");
                    }
                    long startTime = Time.currentElapsedTime();
                    try {
                        self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
                        self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                        // 与 Leader 建立同步,即发起 Socket 连接
                        syncWithLeader(newEpochZxid);
                        self.setZabState(QuorumPeer.ZabState.BROADCAST);
                        completedSync = true;
                    } finally {
                        long syncTime = Time.currentElapsedTime() - startTime;
                        ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
                    }
                    // 如果设置了 observeMasterPort() 则开启相应处理线程
                    if (self.getObserverMasterPort() > 0) {
                        LOG.info("Starting ObserverMaster");
    
                        om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
                        om.start();
                    } else {
                        om = null;
                    }
                    // create a reusable packet to reduce gc impact
                    QuorumPacket qp = new QuorumPacket();
                    // 会在此处一直循环等待 leader 节点的信息,进行任务处理,直到发生异常退出
                    while (this.isRunning()) {
                        readPacket(qp);             
                        processPacket(qp);
                    }
                } catch (Exception e) {
                    LOG.warn("Exception when following the leader", e);
                    closeSocket();
    
                    // clear pending revalidations
                    pendingRevalidations.clear();
                }
            } finally {
                if (om != null) {
                    om.stop();
                }
                zk.unregisterJMX(this);
    
                if (connectionTime != 0) {
                    long connectionDuration = System.currentTimeMillis() - connectionTime;
                    LOG.info(
                        "Disconnected from leader (with address: {}). Was connected for {}ms. Sync state: {}",
                        leaderAddr,
                        connectionDuration,
                        completedSync);
                    messageTracker.dumpToLog(leaderAddr.toString());
                }
            }
        }
    
        // org.apache.zookeeper.server.quorum.Learner#syncWithLeader
        /**
         * Finally, synchronize our history with the Leader (if Follower)
         * or the LearnerMaster (if Observer).
         * @param newLeaderZxid
         * @throws IOException
         * @throws InterruptedException
         */
        protected void syncWithLeader(long newLeaderZxid) throws Exception {
            QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
            QuorumPacket qp = new QuorumPacket();
            long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
    
            QuorumVerifier newLeaderQV = null;
    
            // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
            // For SNAP and TRUNC the snapshot is needed to save that history
            boolean snapshotNeeded = true;
            boolean syncSnapshot = false;
            readPacket(qp);
            Deque<Long> packetsCommitted = new ArrayDeque<>();
            Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
            synchronized (zk) {
                if (qp.getType() == Leader.DIFF) {
                    LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
                    self.setSyncMode(QuorumPeer.SyncMode.DIFF);
                    snapshotNeeded = false;
                } else if (qp.getType() == Leader.SNAP) {
                    self.setSyncMode(QuorumPeer.SyncMode.SNAP);
                    LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid()));
                    // The leader is going to dump the database
                    // db is clear as part of deserializeSnapshot()
                    zk.getZKDatabase().deserializeSnapshot(leaderIs);
                    // ZOOKEEPER-2819: overwrite config node content extracted
                    // from leader snapshot with local config, to avoid potential
                    // inconsistency of config node content during rolling restart.
                    if (!QuorumPeerConfig.isReconfigEnabled()) {
                        LOG.debug("Reset config node content from local config after deserialization of snapshot.");
                        zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
                    }
                    String signature = leaderIs.readString("signature");
                    if (!signature.equals("BenWasHere")) {
                        LOG.error("Missing signature. Got {}", signature);
                        throw new IOException("Missing signature");
                    }
                    zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
    
                    // immediately persist the latest snapshot when there is txn log gap
                    syncSnapshot = true;
                } else if (qp.getType() == Leader.TRUNC) {
                    //we need to truncate the log to the lastzxid of the leader
                    self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
                    LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid()));
                    boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
                    if (!truncated) {
                        // not able to truncate the log
                        LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid()));
                        System.exit(ExitCode.QUORUM_PACKET_ERROR.getValue());
                    }
                    zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
    
                } else {
                    LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp));
                    System.exit(ExitCode.QUORUM_PACKET_ERROR.getValue());
    
                }
                zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
                zk.createSessionTracker();
    
                long lastQueued = 0;
    
                // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
                // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
                // we need to make sure that we don't take the snapshot twice.
                boolean isPreZAB1_0 = true;
                //If we are not going to take the snapshot be sure the transactions are not applied in memory
                // but written out to the transaction log
                boolean writeToTxnLog = !snapshotNeeded;
                // we are now going to start getting transactions to apply followed by an UPTODATE
                outerLoop:
                while (self.isRunning()) {
                    readPacket(qp);
                    switch (qp.getType()) {
                    case Leader.PROPOSAL:
                        PacketInFlight pif = new PacketInFlight();
                        pif.hdr = new TxnHeader();
                        pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
                        if (pif.hdr.getZxid() != lastQueued + 1) {
                            LOG.warn(
                                "Got zxid 0x{} expected 0x{}",
                                Long.toHexString(pif.hdr.getZxid()),
                                Long.toHexString(lastQueued + 1));
                        }
                        lastQueued = pif.hdr.getZxid();
    
                        if (pif.hdr.getType() == OpCode.reconfig) {
                            SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
                            QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
                            self.setLastSeenQuorumVerifier(qv, true);
                        }
    
                        packetsNotCommitted.add(pif);
                        break;
                    case Leader.COMMIT:
                    case Leader.COMMITANDACTIVATE:
                        pif = packetsNotCommitted.peekFirst();
                        if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
                            QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData()));
                            boolean majorChange = self.processReconfig(
                                qv,
                                ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
                                true);
                            if (majorChange) {
                                throw new Exception("changes proposed in reconfig");
                            }
                        }
                        if (!writeToTxnLog) {
                            if (pif.hdr.getZxid() != qp.getZxid()) {
                                LOG.warn(
                                    "Committing 0x{}, but next proposal is 0x{}",
                                    Long.toHexString(qp.getZxid()),
                                    Long.toHexString(pif.hdr.getZxid()));
                            } else {
                                zk.processTxn(pif.hdr, pif.rec);
                                packetsNotCommitted.remove();
                            }
                        } else {
                            packetsCommitted.add(qp.getZxid());
                        }
                        break;
                    case Leader.INFORM:
                    case Leader.INFORMANDACTIVATE:
                        PacketInFlight packet = new PacketInFlight();
                        packet.hdr = new TxnHeader();
    
                        if (qp.getType() == Leader.INFORMANDACTIVATE) {
                            ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
                            long suggestedLeaderId = buffer.getLong();
                            byte[] remainingdata = new byte[buffer.remaining()];
                            buffer.get(remainingdata);
                            packet.rec = SerializeUtils.deserializeTxn(remainingdata, packet.hdr);
                            QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData()));
                            boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
                            if (majorChange) {
                                throw new Exception("changes proposed in reconfig");
                            }
                        } else {
                            packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
                            // Log warning message if txn comes out-of-order
                            if (packet.hdr.getZxid() != lastQueued + 1) {
                                LOG.warn(
                                    "Got zxid 0x{} expected 0x{}",
                                    Long.toHexString(packet.hdr.getZxid()),
                                    Long.toHexString(lastQueued + 1));
                            }
                            lastQueued = packet.hdr.getZxid();
                        }
                        if (!writeToTxnLog) {
                            // Apply to db directly if we haven't taken the snapshot
                            zk.processTxn(packet.hdr, packet.rec);
                        } else {
                            packetsNotCommitted.add(packet);
                            packetsCommitted.add(qp.getZxid());
                        }
    
                        break;
                    // 无需更新数据时,直接退出 while
                    case Leader.UPTODATE:
                        LOG.info("Learner received UPTODATE message");
                        if (newLeaderQV != null) {
                            boolean majorChange = self.processReconfig(newLeaderQV, null, null, true);
                            if (majorChange) {
                                throw new Exception("changes proposed in reconfig");
                            }
                        }
                        if (isPreZAB1_0) {
                            zk.takeSnapshot(syncSnapshot);
                            self.setCurrentEpoch(newEpoch);
                        }
                        self.setZooKeeperServer(zk);
                        self.adminServer.setZooKeeperServer(zk);
                        break outerLoop;
                    case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
                        // means this is Zab 1.0
                        LOG.info("Learner received NEWLEADER message");
                        if (qp.getData() != null && qp.getData().length > 1) {
                            try {
                                QuorumVerifier qv = self.configFromString(new String(qp.getData()));
                                self.setLastSeenQuorumVerifier(qv, true);
                                newLeaderQV = qv;
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
    
                        if (snapshotNeeded) {
                            zk.takeSnapshot(syncSnapshot);
                        }
    
                        self.setCurrentEpoch(newEpoch);
                        writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
                        isPreZAB1_0 = false;
                        writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                        break;
                    }
                }
            }
            ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
            writePacket(ack, true);
            sock.setSoTimeout(self.tickTime * self.syncLimit);
            self.setSyncMode(QuorumPeer.SyncMode.NONE);
            // 将 zk 拉起来
            zk.startup();
            /*
             * Update the election vote here to ensure that all members of the
             * ensemble report the same vote to new servers that start up and
             * send leader election notifications to the ensemble.
             *
             * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
             */
            self.updateElectionVote(newEpoch);
    
            // We need to log the stuff that came in between the snapshot and the uptodate
            if (zk instanceof FollowerZooKeeperServer) {
                FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
                for (PacketInFlight p : packetsNotCommitted) {
                    fzk.logRequest(p.hdr, p.rec);
                }
                for (Long zxid : packetsCommitted) {
                    fzk.commit(zxid);
                }
            } else if (zk instanceof ObserverZooKeeperServer) {
                // Similar to follower, we need to log requests between the snapshot
                // and UPTODATE
                ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
                for (PacketInFlight p : packetsNotCommitted) {
                    Long zxid = packetsCommitted.peekFirst();
                    if (p.hdr.getZxid() != zxid) {
                        // log warning message if there is no matching commit
                        // old leader send outstanding proposal to observer
                        LOG.warn(
                            "Committing 0x{}, but next proposal is 0x{}",
                            Long.toHexString(zxid),
                            Long.toHexString(p.hdr.getZxid()));
                        continue;
                    }
                    packetsCommitted.remove();
                    Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null);
                    request.setTxn(p.rec);
                    request.setHdr(p.hdr);
                    ozk.commitRequest(request);
                }
            } else {
                // New server type need to handle in-flight packets
                throw new UnsupportedOperationException("Unknown server type");
            }
        }
        // org.apache.zookeeper.server.ZooKeeperServer#startup
        public synchronized void startup() {
            if (sessionTracker == null) {
                createSessionTracker();
            }
            startSessionTracker();
            // 此实现由子类实现,创建处理链
            setupRequestProcessors();
    
            startRequestThrottler();
    
            registerJMX();
    
            startJvmPauseMonitor();
    
            registerMetrics();
    
            setState(State.RUNNING);
    
            requestPathMetricsCollector.start();
    
            localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
            notifyAll();
        }
        
        // org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#setupRequestProcessors
        @Override
        protected void setupRequestProcessors() {
            //最后一个处理器是 FinalRequestProcessor
            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
            // 倒数第二个处理器是 CommitProcessor
            commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
            commitProcessor.start();
            // 倒数第三个处理器是 FollowerRequestProcessor
            // 所以,最终的调用链是 FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor 
            firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
            ((FollowerRequestProcessor) firstProcessor).start();
            // 另外还有一个 SyncRequestProcessor 单独启动
            syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
            syncProcessor.start();
        }

    三、Leader 的处理链的建立

        // LEADING 时,创建 Leader 实例, 调用 lead() 处理,仍然会存在内部处理,抛出异常后进行重新获取状态判定
        case LEADING:
            LOG.info("LEADING");
            try {
                setLeader(makeLeader(logFactory));
                leader.lead();
                setLeader(null);
            } catch (Exception e) {
                LOG.warn("Unexpected exception", e);
            } finally {
                if (leader != null) {
                    leader.shutdown("Forcing shutdown");
                    setLeader(null);
                }
                updateServerState();
            }
            break;
        }
        // org.apache.zookeeper.server.quorum.QuorumPeer#makeLeader
        protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
            // LeaderZooKeeperServer 负责进行通信类,数据类操作
            // Leader 本身则只进行调度服务
            return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
        }
        
        // org.apache.zookeeper.server.quorum.Leader#lead
        /**
         * This method is main function that is called to lead
         *
         * @throws IOException
         * @throws InterruptedException
         */
        void lead() throws IOException, InterruptedException {
            self.end_fle = Time.currentElapsedTime();
            long electionTimeTaken = self.end_fle - self.start_fle;
            self.setElectionTimeTaken(electionTimeTaken);
            ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
            LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT);
            self.start_fle = 0;
            self.end_fle = 0;
    
            // 环境初始化
            zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
    
            try {
                self.setZabState(QuorumPeer.ZabState.DISCOVERY);
                self.tick.set(0);
                zk.loadData();
    
                leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
    
                // Start thread that waits for connection requests from
                // new followers.
                // 开启服务端接收数据线程, sock 端口已在构造函数中初始化好了
                cnxAcceptor = new LearnerCnxAcceptor();
                cnxAcceptor.start();
    
                long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
    
                zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
    
                synchronized (this) {
                    lastProposed = zk.getZxid();
                }
    
                newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);
    
                if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
                    LOG.info("NEWLEADER proposal has Zxid of {}", Long.toHexString(newLeaderProposal.packet.getZxid()));
                }
    
                QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
                QuorumVerifier curQV = self.getQuorumVerifier();
                if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
                    // This was added in ZOOKEEPER-1783. The initial config has version 0 (not explicitly
                    // specified by the user; the lack of version in a config file is interpreted as version=0).
                    // As soon as a config is established we would like to increase its version so that it
                    // takes presedence over other initial configs that were not established (such as a config
                    // of a server trying to join the ensemble, which may be a partial view of the system, not the full config).
                    // We chose to set the new version to the one of the NEWLEADER message. However, before we can do that
                    // there must be agreement on the new version, so we can only change the version when sending/receiving UPTODATE,
                    // not when sending/receiving NEWLEADER. In other words, we can't change curQV here since its the committed quorum verifier,
                    // and there's still no agreement on the new version that we'd like to use. Instead, we use
                    // lastSeenQuorumVerifier which is being sent with NEWLEADER message
                    // so its a good way to let followers know about the new version. (The original reason for sending
                    // lastSeenQuorumVerifier with NEWLEADER is so that the leader completes any potentially uncommitted reconfigs
                    // that it finds before starting to propose operations. Here we're reusing the same code path for
                    // reaching consensus on the new version number.)
    
                    // It is important that this is done before the leader executes waitForEpochAck,
                    // so before LearnerHandlers return from their waitForEpochAck
                    // hence before they construct the NEWLEADER message containing
                    // the last-seen-quorumverifier of the leader, which we change below
                    try {
                        QuorumVerifier newQV = self.configFromString(curQV.toString());
                        newQV.setVersion(zk.getZxid());
                        self.setLastSeenQuorumVerifier(newQV, true);
                    } catch (Exception e) {
                        throw new IOException(e);
                    }
                }
    
                newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
                if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
                    newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
                }
    
                // We have to get at least a majority of servers in sync with
                // us. We do this by waiting for the NEWLEADER packet to get
                // acknowledged
    
                waitForEpochAck(self.getId(), leaderStateSummary);
                self.setCurrentEpoch(epoch);
                self.setLeaderAddressAndId(self.getQuorumAddress(), self.getId());
                self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
    
                try {
                    waitForNewLeaderAck(self.getId(), zk.getZxid());
                } catch (InterruptedException e) {
                    shutdown("Waiting for a quorum of followers, only synced with sids: [ "
                             + newLeaderProposal.ackSetsToString()
                             + " ]");
                    HashSet<Long> followerSet = new HashSet<Long>();
    
                    for (LearnerHandler f : getLearners()) {
                        if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())) {
                            followerSet.add(f.getSid());
                        }
                    }
                    boolean initTicksShouldBeIncreased = true;
                    for (Proposal.QuorumVerifierAcksetPair qvAckset : newLeaderProposal.qvAcksetPairs) {
                        if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
                            initTicksShouldBeIncreased = false;
                            break;
                        }
                    }
                    if (initTicksShouldBeIncreased) {
                        LOG.warn("Enough followers present. Perhaps the initTicks need to be increased.");
                    }
                    return;
                }
    
                // 开启服务器,进入链路创建
                startZkServer();
    
                /**
                 * WARNING: do not use this for anything other than QA testing
                 * on a real cluster. Specifically to enable verification that quorum
                 * can handle the lower 32bit roll-over issue identified in
                 * ZOOKEEPER-1277. Without this option it would take a very long
                 * time (on order of a month say) to see the 4 billion writes
                 * necessary to cause the roll-over to occur.
                 *
                 * This field allows you to override the zxid of the server. Typically
                 * you'll want to set it to something like 0xfffffff0 and then
                 * start the quorum, run some operations and see the re-election.
                 */
                String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
                if (initialZxid != null) {
                    long zxid = Long.parseLong(initialZxid);
                    zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
                }
    
                if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                    self.setZooKeeperServer(zk);
                }
    
                self.setZabState(QuorumPeer.ZabState.BROADCAST);
                self.adminServer.setZooKeeperServer(zk);
    
                // Everything is a go, simply start counting the ticks
                // WARNING: I couldn't find any wait statement on a synchronized
                // block that would be notified by this notifyAll() call, so
                // I commented it out
                //synchronized (this) {
                //    notifyAll();
                //}
                // We ping twice a tick, so we only update the tick every other
                // iteration
                boolean tickSkip = true;
                // If not null then shutdown this leader
                String shutdownMessage = null;
    
                while (true) {
                    synchronized (this) {
                        long start = Time.currentElapsedTime();
                        long cur = start;
                        long end = start + self.tickTime / 2;
                        while (cur < end) {
                            wait(end - cur);
                            cur = Time.currentElapsedTime();
                        }
    
                        if (!tickSkip) {
                            self.tick.incrementAndGet();
                        }
    
                        // We use an instance of SyncedLearnerTracker to
                        // track synced learners to make sure we still have a
                        // quorum of current (and potentially next pending) view.
                        SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
                        syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
                        if (self.getLastSeenQuorumVerifier() != null
                            && self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
                            syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
                        }
    
                        syncedAckSet.addAck(self.getId());
    
                        for (LearnerHandler f : getLearners()) {
                            if (f.synced()) {
                                syncedAckSet.addAck(f.getSid());
                            }
                        }
    
                        // check leader running status
                        if (!this.isRunning()) {
                            // set shutdown flag
                            shutdownMessage = "Unexpected internal error";
                            break;
                        }
    
                        if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
                            // Lost quorum of last committed and/or last proposed
                            // config, set shutdown flag
                            shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
                                              + syncedAckSet.ackSetsToString()
                                              + " ]";
                            break;
                        }
                        tickSkip = !tickSkip;
                    }
                    for (LearnerHandler f : getLearners()) {
                        f.ping();
                    }
                }
                if (shutdownMessage != null) {
                    shutdown(shutdownMessage);
                    // leader goes in looking state
                }
            } finally {
                zk.unregisterJMX(this);
            }
        }
        
        // org.apache.zookeeper.server.quorum.Leader#startZkServer
        /**
         * Start up Leader ZooKeeper server and initialize zxid to the new epoch
         */
        private synchronized void startZkServer() {
            // Update lastCommitted and Db's zxid to a value representing the new epoch
            lastCommitted = zk.getZxid();
            LOG.info("Have quorum of supporters, sids: [{}]; starting up and setting last processed zxid: 0x{}",
                     newLeaderProposal.ackSetsToString(),
                     Long.toHexString(zk.getZxid()));
    
            /*
             * ZOOKEEPER-1324. the leader sends the new config it must complete
             *  to others inside a NEWLEADER message (see LearnerHandler where
             *  the NEWLEADER message is constructed), and once it has enough
             *  acks we must execute the following code so that it applies the
             *  config to itself.
             */
            QuorumVerifier newQV = self.getLastSeenQuorumVerifier();
    
            Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid());
    
            self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
            if (designatedLeader != self.getId()) {
                allowedToCommit = false;
            }
    
            leaderStartTime = Time.currentElapsedTime();
            // 仍然调用 zk 进行服务创建 
            zk.startup();
            /*
             * Update the election vote here to ensure that all members of the
             * ensemble report the same vote to new servers that start up and
             * send leader election notifications to the ensemble.
             *
             * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
             */
            self.updateElectionVote(getEpoch());
    
            zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
        }
    
        // org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#startup
        @Override
        public synchronized void startup() {
            super.startup();
            if (containerManager != null) {
                containerManager.start();
            }
        }
        // org.apache.zookeeper.server.ZooKeeperServer#startup
        // 父类框架
        public synchronized void startup() {
            if (sessionTracker == null) {
                createSessionTracker();
            }
            startSessionTracker();
            setupRequestProcessors();
    
            startRequestThrottler();
    
            registerJMX();
    
            startJvmPauseMonitor();
    
            registerMetrics();
    
            setState(State.RUNNING);
    
            requestPathMetricsCollector.start();
    
            localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
            notifyAll();
        }
    
        // org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#setupRequestProcessors
        @Override
        protected void setupRequestProcessors() {
            // 最后一个处理器是 FinalRequestProcessor
            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
            // 倒数第二个处理器是 ToBeAppliedRequestProcessor
            RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
            // 倒数第三个处理器是 CommitProcessor
            commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
            commitProcessor.start();
            // 倒数第四个处理器是 ProposalRequestProcessor
            ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
            proposalProcessor.initialize();
            // 第二个处理器是 PrepRequestProcessor
            prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
            prepRequestProcessor.start();
            // 第一个处理器是 LeaderRequestProcessor
            firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
            
            // 所以最终的链路是 LeaderRequestProcessor -> PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor
            setupContainerManager();
        }
        private synchronized void setupContainerManager() {
            containerManager = new ContainerManager(
                getZKDatabase(),
                prepRequestProcessor,
                Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
                Integer.getInteger("znode.container.maxPerMinute", 10000));
        }

    四、Observer 的链路创建过程

        // 创建 Observer, 调用 observeLeader() 方法
        case OBSERVING:
            try {
                LOG.info("OBSERVING");
                setObserver(makeObserver(logFactory));
                observer.observeLeader();
            } catch (Exception e) {
                LOG.warn("Unexpected exception", e);
            } finally {
                observer.shutdown();
                setObserver(null);
                updateServerState();
    
                // Add delay jitter before we switch to LOOKING
                // state to reduce the load of ObserverMaster
                if (isRunning()) {
                    Observer.waitForObserverElectionDelay();
                }
            }
            break;
    
        // org.apache.zookeeper.server.quorum.Observer#observeLeader
        /**
         * the main method called by the observer to observe the leader
         * @throws Exception
         */
        void observeLeader() throws Exception {
            zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
            long connectTime = 0;
            boolean completedSync = false;
            try {
                self.setZabState(QuorumPeer.ZabState.DISCOVERY);
                QuorumServer master = findLearnerMaster();
                try {
                    connectToLeader(master.addr, master.hostname);
                    connectTime = System.currentTimeMillis();
                    long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
                    if (self.isReconfigStateChange()) {
                        throw new Exception("learned about role change");
                    }
    
                    self.setLeaderAddressAndId(master.addr, master.getId());
                    self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                    // 与 Leader 同步时,建立处理链路,调用 父类 Learner 的方法
                    syncWithLeader(newLeaderZxid);
                    self.setZabState(QuorumPeer.ZabState.BROADCAST);
                    completedSync = true;
                    QuorumPacket qp = new QuorumPacket();
                    // 一直循环处理
                    while (this.isRunning() && nextLearnerMaster.get() == null) {
                        readPacket(qp);
                        processPacket(qp);
                    }
                } catch (Exception e) {
                    LOG.warn("Exception when observing the leader", e);
                    closeSocket();
    
                    // clear pending revalidations
                    pendingRevalidations.clear();
                }
            } finally {
                currentLearnerMaster = null;
                zk.unregisterJMX(this);
                if (connectTime != 0) {
                    long connectionDuration = System.currentTimeMillis() - connectTime;
    
                    LOG.info(
                        "Disconnected from leader (with address: {}). Was connected for {}ms. Sync state: {}",
                        leaderAddr,
                        connectionDuration,
                        completedSync);
                    messageTracker.dumpToLog(leaderAddr.toString());
                }
            }
        }
    
        // org.apache.zookeeper.server.quorum.Learner#syncWithLeader
        /**
         * Finally, synchronize our history with the Leader (if Follower)
         * or the LearnerMaster (if Observer).
         * @param newLeaderZxid
         * @throws IOException
         * @throws InterruptedException
         */
        protected void syncWithLeader(long newLeaderZxid) throws Exception {
            QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
            QuorumPacket qp = new QuorumPacket();
            long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
    
            QuorumVerifier newLeaderQV = null;
    
            // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
            // For SNAP and TRUNC the snapshot is needed to save that history
            boolean snapshotNeeded = true;
            boolean syncSnapshot = false;
            readPacket(qp);
            Deque<Long> packetsCommitted = new ArrayDeque<>();
            Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
            synchronized (zk) {
                if (qp.getType() == Leader.DIFF) {
                    LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
                    self.setSyncMode(QuorumPeer.SyncMode.DIFF);
                    snapshotNeeded = false;
                } else if (qp.getType() == Leader.SNAP) {
                    self.setSyncMode(QuorumPeer.SyncMode.SNAP);
                    LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid()));
                    // The leader is going to dump the database
                    // db is clear as part of deserializeSnapshot()
                    zk.getZKDatabase().deserializeSnapshot(leaderIs);
                    // ZOOKEEPER-2819: overwrite config node content extracted
                    // from leader snapshot with local config, to avoid potential
                    // inconsistency of config node content during rolling restart.
                    if (!QuorumPeerConfig.isReconfigEnabled()) {
                        LOG.debug("Reset config node content from local config after deserialization of snapshot.");
                        zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
                    }
                    String signature = leaderIs.readString("signature");
                    if (!signature.equals("BenWasHere")) {
                        LOG.error("Missing signature. Got {}", signature);
                        throw new IOException("Missing signature");
                    }
                    zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
    
                    // immediately persist the latest snapshot when there is txn log gap
                    syncSnapshot = true;
                } else if (qp.getType() == Leader.TRUNC) {
                    //we need to truncate the log to the lastzxid of the leader
                    self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
                    LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid()));
                    boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
                    if (!truncated) {
                        // not able to truncate the log
                        LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid()));
                        System.exit(ExitCode.QUORUM_PACKET_ERROR.getValue());
                    }
                    zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
    
                } else {
                    LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp));
                    System.exit(ExitCode.QUORUM_PACKET_ERROR.getValue());
    
                }
                zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
                zk.createSessionTracker();
    
                long lastQueued = 0;
    
                // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
                // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
                // we need to make sure that we don't take the snapshot twice.
                boolean isPreZAB1_0 = true;
                //If we are not going to take the snapshot be sure the transactions are not applied in memory
                // but written out to the transaction log
                boolean writeToTxnLog = !snapshotNeeded;
                // we are now going to start getting transactions to apply followed by an UPTODATE
                outerLoop:
                while (self.isRunning()) {
                    readPacket(qp);
                    switch (qp.getType()) {
                    case Leader.PROPOSAL:
                        PacketInFlight pif = new PacketInFlight();
                        pif.hdr = new TxnHeader();
                        pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
                        if (pif.hdr.getZxid() != lastQueued + 1) {
                            LOG.warn(
                                "Got zxid 0x{} expected 0x{}",
                                Long.toHexString(pif.hdr.getZxid()),
                                Long.toHexString(lastQueued + 1));
                        }
                        lastQueued = pif.hdr.getZxid();
    
                        if (pif.hdr.getType() == OpCode.reconfig) {
                            SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
                            QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
                            self.setLastSeenQuorumVerifier(qv, true);
                        }
    
                        packetsNotCommitted.add(pif);
                        break;
                    case Leader.COMMIT:
                    case Leader.COMMITANDACTIVATE:
                        pif = packetsNotCommitted.peekFirst();
                        if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
                            QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData()));
                            boolean majorChange = self.processReconfig(
                                qv,
                                ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
                                true);
                            if (majorChange) {
                                throw new Exception("changes proposed in reconfig");
                            }
                        }
                        if (!writeToTxnLog) {
                            if (pif.hdr.getZxid() != qp.getZxid()) {
                                LOG.warn(
                                    "Committing 0x{}, but next proposal is 0x{}",
                                    Long.toHexString(qp.getZxid()),
                                    Long.toHexString(pif.hdr.getZxid()));
                            } else {
                                zk.processTxn(pif.hdr, pif.rec);
                                packetsNotCommitted.remove();
                            }
                        } else {
                            packetsCommitted.add(qp.getZxid());
                        }
                        break;
                    case Leader.INFORM:
                    case Leader.INFORMANDACTIVATE:
                        PacketInFlight packet = new PacketInFlight();
                        packet.hdr = new TxnHeader();
    
                        if (qp.getType() == Leader.INFORMANDACTIVATE) {
                            ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
                            long suggestedLeaderId = buffer.getLong();
                            byte[] remainingdata = new byte[buffer.remaining()];
                            buffer.get(remainingdata);
                            packet.rec = SerializeUtils.deserializeTxn(remainingdata, packet.hdr);
                            QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData()));
                            boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
                            if (majorChange) {
                                throw new Exception("changes proposed in reconfig");
                            }
                        } else {
                            packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
                            // Log warning message if txn comes out-of-order
                            if (packet.hdr.getZxid() != lastQueued + 1) {
                                LOG.warn(
                                    "Got zxid 0x{} expected 0x{}",
                                    Long.toHexString(packet.hdr.getZxid()),
                                    Long.toHexString(lastQueued + 1));
                            }
                            lastQueued = packet.hdr.getZxid();
                        }
                        if (!writeToTxnLog) {
                            // Apply to db directly if we haven't taken the snapshot
                            zk.processTxn(packet.hdr, packet.rec);
                        } else {
                            packetsNotCommitted.add(packet);
                            packetsCommitted.add(qp.getZxid());
                        }
    
                        break;
                    case Leader.UPTODATE:
                        LOG.info("Learner received UPTODATE message");
                        if (newLeaderQV != null) {
                            boolean majorChange = self.processReconfig(newLeaderQV, null, null, true);
                            if (majorChange) {
                                throw new Exception("changes proposed in reconfig");
                            }
                        }
                        if (isPreZAB1_0) {
                            zk.takeSnapshot(syncSnapshot);
                            self.setCurrentEpoch(newEpoch);
                        }
                        self.setZooKeeperServer(zk);
                        self.adminServer.setZooKeeperServer(zk);
                        break outerLoop;
                    case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
                        // means this is Zab 1.0
                        LOG.info("Learner received NEWLEADER message");
                        if (qp.getData() != null && qp.getData().length > 1) {
                            try {
                                QuorumVerifier qv = self.configFromString(new String(qp.getData()));
                                self.setLastSeenQuorumVerifier(qv, true);
                                newLeaderQV = qv;
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
    
                        if (snapshotNeeded) {
                            zk.takeSnapshot(syncSnapshot);
                        }
    
                        self.setCurrentEpoch(newEpoch);
                        writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
                        isPreZAB1_0 = false;
                        writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                        break;
                    }
                }
            }
            ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
            writePacket(ack, true);
            sock.setSoTimeout(self.tickTime * self.syncLimit);
            self.setSyncMode(QuorumPeer.SyncMode.NONE);
            zk.startup();
            /*
             * Update the election vote here to ensure that all members of the
             * ensemble report the same vote to new servers that start up and
             * send leader election notifications to the ensemble.
             *
             * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
             */
            self.updateElectionVote(newEpoch);
    
            // We need to log the stuff that came in between the snapshot and the uptodate
            if (zk instanceof FollowerZooKeeperServer) {
                FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
                for (PacketInFlight p : packetsNotCommitted) {
                    fzk.logRequest(p.hdr, p.rec);
                }
                for (Long zxid : packetsCommitted) {
                    fzk.commit(zxid);
                }
            } else if (zk instanceof ObserverZooKeeperServer) {
                // Similar to follower, we need to log requests between the snapshot
                // and UPTODATE
                ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
                for (PacketInFlight p : packetsNotCommitted) {
                    Long zxid = packetsCommitted.peekFirst();
                    if (p.hdr.getZxid() != zxid) {
                        // log warning message if there is no matching commit
                        // old leader send outstanding proposal to observer
                        LOG.warn(
                            "Committing 0x{}, but next proposal is 0x{}",
                            Long.toHexString(zxid),
                            Long.toHexString(p.hdr.getZxid()));
                        continue;
                    }
                    packetsCommitted.remove();
                    Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null);
                    request.setTxn(p.rec);
                    request.setHdr(p.hdr);
                    ozk.commitRequest(request);
                }
            } else {
                // New server type need to handle in-flight packets
                throw new UnsupportedOperationException("Unknown server type");
            }
        }
    
        // org.apache.zookeeper.server.ZooKeeperServer#startup
        public synchronized void startup() {
            if (sessionTracker == null) {
                createSessionTracker();
            }
            startSessionTracker();
            setupRequestProcessors();
    
            startRequestThrottler();
    
            registerJMX();
    
            startJvmPauseMonitor();
    
            registerMetrics();
    
            setState(State.RUNNING);
    
            requestPathMetricsCollector.start();
    
            localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
            notifyAll();
        }
    
        // org.apache.zookeeper.server.quorum.ObserverZooKeeperServer#setupRequestProcessors
        /**
         * Set up the request processors for an Observer:
         * firstProcesor-&gt;commitProcessor-&gt;finalProcessor
         */
        @Override
        protected void setupRequestProcessors() {
            // We might consider changing the processor behaviour of
            // Observers to, for example, remove the disk sync requirements.
            // Currently, they behave almost exactly the same as followers.
            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
            commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
            commitProcessor.start();
            // 最终的处理链路是 ObserverRequestProcessor -> CommitProcessor -> FinalRequestProcessor
            firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
            ((ObserverRequestProcessor) firstProcessor).start();
    
            /*
             * Observer should write to disk, so that the it won't request
             * too old txn from the leader which may lead to getting an entire
             * snapshot.
             *
             * However, this may degrade performance as it has to write to disk
             * and do periodic snapshot which may double the memory requirements
             */
            if (syncRequestProcessorEnabled) {
                syncProcessor = new SyncRequestProcessor(this, null);
                syncProcessor.start();
            }
        }

    四、各个处理器的功能简要介绍

      AckRequestProcessor: 将前一阶段的请求作为ACK转发给Leader。
      CommitProcessor: 将到来的请求与本地提交的请求进行匹配,这是因为改变系统状态的本地请求的返回结果是到来的请求。
      FinalRequestProcessor: 通常是请求处理链的最后一个处理器。
      FollowerRequestProcessor: 将修改了系统状态的请求转发给Leader。
      ObserverRequestProcessor: 同FollowerRequestProcessor一样,将修改了系统状态的请求转发给Leader。
      PrepRequestProcessor: 通常是请求处理链的第一个处理器。
      ProposalRequestProcessor: 将请求转发给AckRequestProcessor和SyncRequestProcessor。
      ReadOnlyRequestProcessor: 是ReadOnlyZooKeeperServer请求处理链的第一个处理器,将只读请求传递给下个处理器,抛弃改变状态的请求。
      SendAckRequestProcessor: 发送ACK请求的处理器。
      SyncRequestProcessor: 发送Sync请求的处理器。
      ToBeAppliedRequestProcessor: 维护toBeApplied列表,下个处理器必须是FinalRequestProcessor并且FinalRequestProcessor必须同步处理请求。

      在这个创建过程,我们可以看到 几个设计模式的身影: 责任链模式、模板方法模式、状态模式。 以及如何及时处理来自来自服务端的请求。

      看完各个处理器的整体架构之后,后续我再从各个细节看问题,相信会更容易理解全局。

  • 相关阅读:
    Vue Errors
    npm学习笔记二
    npm安装package.json文件中的模块依赖
    浅析对浏览器内核的理解
    JavaScript中的匿名函数、立即执行函数和闭包
    ECMAScript中的函数
    JavaScript中的构造函数
    如何配置Tomcat上web.xml让浏览器能直接下载txt,xml类型文件
    Caused by: java.sql.SQLException: GC overhead limit exceeded处理百万数据出现的异常
    第3篇 Scrum 冲刺博客(专✌️团队)
  • 原文地址:https://www.cnblogs.com/yougewe/p/11739729.html
Copyright © 2011-2022 走看看