zoukankan      html  css  js  c++  java
  • ZooKeeper(五):事务处理之更新数据逻辑解析

      通过前些文章,我们已经完全从整体架构和数据接入方面理解了ZK的前情工作。接下来,我们就来看ZK的正式工作吧。

      本文以 setData /a data 这个命令作为出发点,来观察zk是如何处理来自客户端的数据更新操作的吧!

      首先,我们需要明确各个角色所担任的工作,然后才能更好的理解其工作流程。我们以 Leader 为当前客户端连接的角色,揭开其 更新数据的面纱。其责任链是这样的:

      LeaderRequestProcessor -> PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor

      当然了, LeaderRequestProcessor 仅起到一个流程转发的作用,我们也不管它了。所以,起点是 PrepRequestProcessor 。

    一、 PrepRequestProcessor 初步处理数据

      有一点明确的是,每个独立运行的 RequestProcessor, 都是通过队列与一个 RequestProcessor 通信的。 PrepRequestProcessor 也一样,在接到 LeaderRequestProcessor sumit 过来的数据后,开始自己的工作。

        // 提交任务为直接添加到队列过程
        // org.apache.zookeeper.server.PrepRequestProcessor#processRequest
        public void processRequest(Request request) {
            request.prepQueueStartTime = Time.currentElapsedTime();
            submittedRequests.add(request);
            ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1);
        }
        // org.apache.zookeeper.server.PrepRequestProcessor#run
        @Override
        public void run() {
            try {
                // 永远是死循环处理任务
                while (true) {
                    ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
                    // submittedRequests 阻塞队列获取
                    Request request = submittedRequests.take();
                    ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
                        .add(Time.currentElapsedTime() - request.prepQueueStartTime);
                    long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                    if (request.type == OpCode.ping) {
                        traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                    }
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                    }
                    if (Request.requestOfDeath == request) {
                        break;
                    }
    
                    request.prepStartTime = Time.currentElapsedTime();
                    // 主要处理逻辑交由 pRequest 
                    pRequest(request);
                }
            } catch (Exception e) {
                handleException(this.getName(), e);
            }
            LOG.info("PrepRequestProcessor exited loop!");
        }
        
        /**
         * This method will be called inside the ProcessRequestThread, which is a
         * singleton, so there will be a single thread calling this code.
         *
         * @param request
         */
        protected void pRequest(Request request) throws RequestProcessorException {
            // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
            // request.type + " id = 0x" + Long.toHexString(request.sessionId));
            request.setHdr(null);
            request.setTxn(null);
    
            try {
                switch (request.type) {
                case OpCode.createContainer:
                case OpCode.create:
                case OpCode.create2:
                    CreateRequest create2Request = new CreateRequest();
                    pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
                    break;
                case OpCode.createTTL:
                    CreateTTLRequest createTtlRequest = new CreateTTLRequest();
                    pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
                    break;
                case OpCode.deleteContainer:
                case OpCode.delete:
                    DeleteRequest deleteRequest = new DeleteRequest();
                    pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
                    break;
                // setData 的 type = 5
                case OpCode.setData:
                    SetDataRequest setDataRequest = new SetDataRequest();
                    // 针写数据请求,转交给 2Txn 处理
                    pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
                    break;
                case OpCode.reconfig:
                    ReconfigRequest reconfigRequest = new ReconfigRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
                    pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
                    break;
                case OpCode.setACL:
                    SetACLRequest setAclRequest = new SetACLRequest();
                    pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
                    break;
                case OpCode.check:
                    CheckVersionRequest checkRequest = new CheckVersionRequest();
                    pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
                    break;
                case OpCode.multi:
                    MultiOperationRecord multiRequest = new MultiOperationRecord();
                    try {
                        ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
                    } catch (IOException e) {
                        request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
                        throw e;
                    }
                    List<Txn> txns = new ArrayList<Txn>();
                    //Each op in a multi-op must have the same zxid!
                    long zxid = zks.getNextZxid();
                    KeeperException ke = null;
    
                    //Store off current pending change records in case we need to rollback
                    Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
    
                    for (Op op : multiRequest) {
                        Record subrequest = op.toRequestRecord();
                        int type;
                        Record txn;
    
                        /* If we've already failed one of the ops, don't bother
                         * trying the rest as we know it's going to fail and it
                         * would be confusing in the logfiles.
                         */
                        if (ke != null) {
                            type = OpCode.error;
                            txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
                        } else {
                            /* Prep the request and convert to a Txn */
                            try {
                                pRequest2Txn(op.getType(), zxid, request, subrequest, false);
                                type = request.getHdr().getType();
                                txn = request.getTxn();
                            } catch (KeeperException e) {
                                ke = e;
                                type = OpCode.error;
                                txn = new ErrorTxn(e.code().intValue());
    
                                if (e.code().intValue() > Code.APIERROR.intValue()) {
                                    LOG.info("Got user-level KeeperException when processing {} aborting"
                                             + " remaining multi ops. Error Path:{} Error:{}",
                                             request.toString(),
                                             e.getPath(),
                                             e.getMessage());
                                }
    
                                request.setException(e);
    
                                /* Rollback change records from failed multi-op */
                                rollbackPendingChanges(zxid, pendingChanges);
                            }
                        }
    
                        // TODO: I don't want to have to serialize it here and then
                        //       immediately deserialize in next processor. But I'm
                        //       not sure how else to get the txn stored into our list.
                        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
                            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                            txn.serialize(boa, "request");
                            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
                            txns.add(new Txn(type, bb.array()));
                        }
                    }
    
                    request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), request.type));
                    request.setTxn(new MultiTxn(txns));
    
                    break;
    
                //create/close session don't require request record
                case OpCode.createSession:
                case OpCode.closeSession:
                    if (!request.isLocalSession()) {
                        pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
                    }
                    break;
    
                //All the rest don't need to create a Txn - just verify session
                case OpCode.sync:
                case OpCode.exists:
                case OpCode.getData:
                case OpCode.getACL:
                case OpCode.getChildren:
                case OpCode.getAllChildrenNumber:
                case OpCode.getChildren2:
                case OpCode.ping:
                case OpCode.setWatches:
                case OpCode.checkWatches:
                case OpCode.removeWatches:
                case OpCode.getEphemerals:
                case OpCode.multiRead:
                    zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                    break;
                default:
                    LOG.warn("unknown type {}", request.type);
                    break;
                }
            } catch (KeeperException e) {
                if (request.getHdr() != null) {
                    request.getHdr().setType(OpCode.error);
                    request.setTxn(new ErrorTxn(e.code().intValue()));
                }
    
                if (e.code().intValue() > Code.APIERROR.intValue()) {
                    LOG.info(
                        "Got user-level KeeperException when processing {} Error Path:{} Error:{}",
                        request.toString(),
                        e.getPath(),
                        e.getMessage());
                }
                request.setException(e);
            } catch (Exception e) {
                // log at error level as we are returning a marshalling
                // error to the user
                LOG.error("Failed to process {}", request, e);
    
                StringBuilder sb = new StringBuilder();
                ByteBuffer bb = request.request;
                if (bb != null) {
                    bb.rewind();
                    while (bb.hasRemaining()) {
                        sb.append(Integer.toHexString(bb.get() & 0xff));
                    }
                } else {
                    sb.append("request buffer is null");
                }
    
                LOG.error("Dumping request buffer: 0x{}", sb.toString());
                if (request.getHdr() != null) {
                    request.getHdr().setType(OpCode.error);
                    request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
                }
            }
            // 将新的事务id获取到,传入下一个 RequestProcessor
            request.zxid = zks.getZxid();
            ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
            // 这里的 nextProcessor 是 ProposalRequestProcessor
            nextProcessor.processRequest(request);
        }
        
        /**
         * This method will be called inside the ProcessRequestThread, which is a
         * singleton, so there will be a single thread calling this code.
         *
         * @param type
         * @param zxid
         * @param request
         * @param record
         */
        protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
            request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));
    
            // 再次进行类型判定,确认处理逻辑
            switch (type) {
            case OpCode.create:
            case OpCode.create2:
            case OpCode.createTTL:
            case OpCode.createContainer: {
                pRequest2TxnCreate(type, request, record, deserialize);
                break;
            }
            case OpCode.deleteContainer: {
                // ... 
                break;
            }
            case OpCode.delete:
                //...
                break;
            // 同样我们只看 setData 情形
            case OpCode.setData:
                // 分几步走: 
                // 1. 检查session; 
                // 2. 反序列化数据; 
                // 3. 路径检查; 
                // 4. 检查权限; 
                // 5. 检查版本号;
                // 6. 添加到变更队列中,备用
                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                SetDataRequest setDataRequest = (SetDataRequest) record;
                if (deserialize) {
                    ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
                }
                path = setDataRequest.getPath();
                validatePath(path, request.sessionId);
                nodeRecord = getRecordForPath(path);
                zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);
                int newVersion = checkAndIncVersion(nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
                request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
                nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
                nodeRecord.stat.setVersion(newVersion);
                addChangeRecord(nodeRecord);
                break;
            case OpCode.reconfig:
                //...
                addChangeRecord(nodeRecord);
                break;
            case OpCode.setACL:
                // ...
                addChangeRecord(nodeRecord);
                break;
            case OpCode.createSession:
                // ...
                break;
            case OpCode.closeSession:
                // ...
                break;
            case OpCode.check:
                // ...
                break;
            default:
                LOG.warn("unknown type {}", type);
                break;
            }
        }

    二、 ProposalRequestProcessor 处理写事务数据,投票发起

      ProposalRequestProcessor 不作为独立的线程运行,只是在处理写事务时被调用。其入口也只有 processRequest 方法而已,主要作用则是组件投票逻辑。

        // org.apache.zookeeper.server.quorum.ProposalRequestProcessor#processRequest
        public void processRequest(Request request) throws RequestProcessorException {
            // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
            // request.type + " id = " + request.sessionId);
            // request.addRQRec(">prop");
    
    
            /* In the following IF-THEN-ELSE block, we process syncs on the leader.
             * If the sync is coming from a follower, then the follower
             * handler adds it to syncHandler. Otherwise, if it is a client of
             * the leader that issued the sync command, then syncHandler won't
             * contain the handler. In this case, we add it to syncHandler, and
             * call processRequest on the next processor.
             */
    
            if (request instanceof LearnerSyncRequest) {
                zks.getLeader().processSync((LearnerSyncRequest) request);
            } else {
                // 此处 nextProcessor 是 CommitProcessor
                // 先交其处理,然后再判断是否需要本地化数据
                nextProcessor.processRequest(request);
                // 只要是写事务, hdr 不会为空
                if (request.getHdr() != null) {
                    // We need to sync and get consensus on any transactions
                    // 投票决定是否更新数据
                    try {
                        // 投票逻辑则由 leader 类去处理
                        zks.getLeader().propose(request);
                    } catch (XidRolloverException e) {
                        throw new RequestProcessorException(e.getMessage(), e);
                    }
                    // 交由 SyncRequestProcessor 进行数据持久化操作
                    syncProcessor.processRequest(request);
                }
            }
        }
    
        // 提交给 CommitProcessor 的任务队列
        // org.apache.zookeeper.server.quorum.CommitProcessor#processRequest
        @Override
        public void processRequest(Request request) {
            if (stopped) {
                return;
            }
            LOG.debug("Processing request:: {}", request);
            request.commitProcQueueStartTime = Time.currentElapsedTime();
            queuedRequests.add(request);
            // If the request will block, add it to the queue of blocking requests
            if (needCommit(request)) {
                queuedWriteRequests.add(request);
                numWriteQueuedRequests.incrementAndGet();
            } else {
                numReadQueuedRequests.incrementAndGet();
            }
            wakeup();
        }
        
        // 简单看一下事务投票过程
        // org.apache.zookeeper.server.quorum.Leader#propose 
        /**
         * create a proposal and send it out to all the members
         *
         * @param request
         * @return the proposal that is queued to send to all the members
         */
        public Proposal propose(Request request) throws XidRolloverException {
            /**
             * Address the rollover issue. All lower 32bits set indicate a new leader
             * election. Force a re-election instead. See ZOOKEEPER-1277
             */
            if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
                String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
                shutdown(msg);
                throw new XidRolloverException(msg);
            }
    
            // 封装 QuorumPacket, 进行节点间通信
            byte[] data = SerializeUtils.serializeRequest(request);
            proposalStats.setLastBufferSize(data.length);
            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
    
            Proposal p = new Proposal();
            p.packet = pp;
            p.request = request;
    
            synchronized (this) {
                // 将自己算入投票队列
                p.addQuorumVerifier(self.getQuorumVerifier());
    
                if (request.getHdr().getType() == OpCode.reconfig) {
                    self.setLastSeenQuorumVerifier(request.qv, true);
                }
    
                if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) {
                    p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
                }
    
                LOG.debug("Proposing:: {}", request);
    
                lastProposed = p.packet.getZxid();
                outstandingProposals.put(lastProposed, p);
                // 此处会将 QuorumPacket 数据包,全部发送到各个通信节点
                sendPacket(pp);
            }
            ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
            return p;
        }
    
        // 同步处理器的加入逻辑,也是将其放入队列中,然后自行轮询队列处理
        // org.apache.zookeeper.server.SyncRequestProcessor#processRequest
        public void processRequest(final Request request) {
            Objects.requireNonNull(request, "Request cannot be null");
    
            request.syncQueueStartTime = Time.currentElapsedTime();
            queuedRequests.add(request);
            ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
        }

    三、 CommitProcessor 处理数据的提交操作,触发ack等

      由 PrepRequestProcessor 提交任务过来,自身独立处理request.

        // org.apache.zookeeper.server.quorum.CommitProcessor#processRequest
        @Override
        public void processRequest(Request request) {
            if (stopped) {
                return;
            }
            LOG.debug("Processing request:: {}", request);
            request.commitProcQueueStartTime = Time.currentElapsedTime();
            queuedRequests.add(request);
            // If the request will block, add it to the queue of blocking requests
            if (needCommit(request)) {
                queuedWriteRequests.add(request);
                numWriteQueuedRequests.incrementAndGet();
            } else {
                numReadQueuedRequests.incrementAndGet();
            }
            wakeup();
        }
        
        // org.apache.zookeeper.server.quorum.CommitProcessor#run
        @Override
        public void run() {
            try {
                /*
                 * In each iteration of the following loop we process at most
                 * requestsToProcess requests of queuedRequests. We have to limit
                 * the number of request we poll from queuedRequests, since it is
                 * possible to endlessly poll read requests from queuedRequests, and
                 * that will lead to a starvation of non-local committed requests.
                 */
                int requestsToProcess = 0;
                boolean commitIsWaiting = false;
                do {
                    /*
                     * Since requests are placed in the queue before being sent to
                     * the leader, if commitIsWaiting = true, the commit belongs to
                     * the first update operation in the queuedRequests or to a
                     * request from a client on another server (i.e., the order of
                     * the following two lines is important!).
                     */
                    commitIsWaiting = !committedRequests.isEmpty();
                    requestsToProcess = queuedRequests.size();
                    // Avoid sync if we have something to do
                    // wait/notify 等待任务处理完成
                    if (requestsToProcess == 0 && !commitIsWaiting) {
                        // Waiting for requests to process
                        synchronized (this) {
                            while (!stopped && requestsToProcess == 0 && !commitIsWaiting) {
                                wait();
                                commitIsWaiting = !committedRequests.isEmpty();
                                requestsToProcess = queuedRequests.size();
                            }
                        }
                    }
    
                    ServerMetrics.getMetrics().READS_QUEUED_IN_COMMIT_PROCESSOR.add(numReadQueuedRequests.get());
                    ServerMetrics.getMetrics().WRITES_QUEUED_IN_COMMIT_PROCESSOR.add(numWriteQueuedRequests.get());
                    ServerMetrics.getMetrics().COMMITS_QUEUED_IN_COMMIT_PROCESSOR.add(committedRequests.size());
    
                    long time = Time.currentElapsedTime();
    
                    /*
                     * Processing up to requestsToProcess requests from the incoming
                     * queue (queuedRequests). If maxReadBatchSize is set then no
                     * commits will be processed until maxReadBatchSize number of
                     * reads are processed (or no more reads remain in the queue).
                     * After the loop a single committed request is processed if
                     * one is waiting (or a batch of commits if maxCommitBatchSize
                     * is set).
                     */
                    Request request;
                    int readsProcessed = 0;
                    // 处理单批次数据,由前面取得的 requestsToProcess 决定
                    while (!stopped
                           && requestsToProcess > 0
                           && (maxReadBatchSize < 0 || readsProcessed <= maxReadBatchSize)
                           && (request = queuedRequests.poll()) != null) {
                        // 将队列弹出后,就相当于处理了该任务
                        requestsToProcess--;
                        // 更新类操作将会进入第一个逻辑
                        if (needCommit(request) || pendingRequests.containsKey(request.sessionId)) {
                            // Add request to pending
                            Deque<Request> requests = pendingRequests.computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>());
                            requests.addLast(request);
                            ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(requests.size());
                        } else {
                            readsProcessed++;
                            numReadQueuedRequests.decrementAndGet();
                            sendToNextProcessor(request);
                        }
                        /*
                         * Stop feeding the pool if there is a local pending update
                         * and a committed request that is ready. Once we have a
                         * pending request with a waiting committed request, we know
                         * we can process the committed one. This is because commits
                         * for local requests arrive in the order they appeared in
                         * the queue, so if we have a pending request and a
                         * committed request, the committed request must be for that
                         * pending write or for a write originating at a different
                         * server. We skip this if maxReadBatchSize is set.
                         */
                        if (maxReadBatchSize < 0 && !pendingRequests.isEmpty() && !committedRequests.isEmpty()) {
                            /*
                             * We set commitIsWaiting so that we won't check
                             * committedRequests again.
                             */
                            commitIsWaiting = true;
                            break;
                        }
                    }
                    ServerMetrics.getMetrics().READS_ISSUED_IN_COMMIT_PROC.add(readsProcessed);
    
                    if (!commitIsWaiting) {
                        commitIsWaiting = !committedRequests.isEmpty();
                    }
    
                    /*
                     * Handle commits, if any.
                     */
                    if (commitIsWaiting && !stopped) {
                        /*
                         * Drain outstanding reads
                         */
                        waitForEmptyPool();
    
                        if (stopped) {
                            return;
                        }
    
                        int commitsToProcess = maxCommitBatchSize;
    
                        /*
                         * Loop through all the commits, and try to drain them.
                         */
                        Set<Long> queuesToDrain = new HashSet<>();
                        long startWriteTime = Time.currentElapsedTime();
                        int commitsProcessed = 0;
                        while (commitIsWaiting && !stopped && commitsToProcess > 0) {
                            // Process committed head
                            request = committedRequests.peek();
    
                            /*
                             * Check if this is a local write request is pending,
                             * if so, update it with the committed info. If the commit matches
                             * the first write queued in the blockedRequestQueue, we know this is
                             * a commit for a local write, as commits are received in order. Else
                             * it must be a commit for a remote write.
                             */
                            if (!queuedWriteRequests.isEmpty()
                                && queuedWriteRequests.peek().sessionId == request.sessionId
                                && queuedWriteRequests.peek().cxid == request.cxid) {
                                /*
                                 * Commit matches the earliest write in our write queue.
                                 */
                                Deque<Request> sessionQueue = pendingRequests.get(request.sessionId);
                                ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
                                if (sessionQueue == null || sessionQueue.isEmpty() || !needCommit(sessionQueue.peek())) {
                                    /*
                                     * Can't process this write yet.
                                     * Either there are reads pending in this session, or we
                                     * haven't gotten to this write yet.
                                     */
                                    break;
                                } else {
                                    ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
                                    // If session queue != null, then it is also not empty.
                                    Request topPending = sessionQueue.poll();
                                    /*
                                     * Generally, we want to send to the next processor our version of the request,
                                     * since it contains the session information that is needed for post update processing.
                                     * In more details, when a request is in the local queue, there is (or could be) a client
                                     * attached to this server waiting for a response, and there is other bookkeeping of
                                     * requests that are outstanding and have originated from this server
                                     * (e.g., for setting the max outstanding requests) - we need to update this info when an
                                     * outstanding request completes. Note that in the other case, the operation
                                     * originated from a different server and there is no local bookkeeping or a local client
                                     * session that needs to be notified.
                                     */
                                    topPending.setHdr(request.getHdr());
                                    topPending.setTxn(request.getTxn());
                                    topPending.zxid = request.zxid;
                                    topPending.commitRecvTime = request.commitRecvTime;
                                    request = topPending;
                                    // Only decrement if we take a request off the queue.
                                    numWriteQueuedRequests.decrementAndGet();
                                    queuedWriteRequests.poll();
                                    queuesToDrain.add(request.sessionId);
                                }
                            }
                            /*
                             * Pull the request off the commit queue, now that we are going
                             * to process it.
                             */
                            committedRequests.remove();
                            commitsToProcess--;
                            commitsProcessed++;
    
                            // Process the write inline.
                            // 处理 commit, ToBeAppliedRequestProcessor
                            processWrite(request);
    
                            commitIsWaiting = !committedRequests.isEmpty();
                        }
                        ServerMetrics.getMetrics().WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR
                            .add(Time.currentElapsedTime() - startWriteTime);
                        ServerMetrics.getMetrics().WRITES_ISSUED_IN_COMMIT_PROC.add(commitsProcessed);
    
                        /*
                         * Process following reads if any, remove session queue(s) if
                         * empty.
                         */
                        readsProcessed = 0;
                        for (Long sessionId : queuesToDrain) {
                            Deque<Request> sessionQueue = pendingRequests.get(sessionId);
                            int readsAfterWrite = 0;
                            while (!stopped && !sessionQueue.isEmpty() && !needCommit(sessionQueue.peek())) {
                                numReadQueuedRequests.decrementAndGet();
                                // 构造 org.apache.zookeeper.server.quorum.CommitProcessor.CommitWorkRequest#CommitWorkRequest, 提交到线程池运行
                                sendToNextProcessor(sessionQueue.poll());
                                readsAfterWrite++;
                            }
                            ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(readsAfterWrite);
                            readsProcessed += readsAfterWrite;
    
                            // Remove empty queues
                            if (sessionQueue.isEmpty()) {
                                pendingRequests.remove(sessionId);
                            }
                        }
                        ServerMetrics.getMetrics().SESSION_QUEUES_DRAINED.add(queuesToDrain.size());
                        ServerMetrics.getMetrics().READ_ISSUED_FROM_SESSION_QUEUE.add(readsProcessed);
                    }
    
                    ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - time);
                    endOfIteration();
                } while (!stoppedMainLoop);
            } catch (Throwable e) {
                handleException(this.getName(), e);
            }
            LOG.info("CommitProcessor exited loop!");
        }
        
        // 当commitProcessor 检测到可以进行事务提交时,就会进行提交动作(这个动作是在投票已结束的时候才开始的)
        // org.apache.zookeeper.server.quorum.CommitProcessor#processWrite
        private void processWrite(Request request) throws RequestProcessorException {
            processCommitMetrics(request, true);
    
            long timeBeforeFinalProc = Time.currentElapsedTime();
            // ToBeAppliedRequestProcessor
            nextProcessor.processRequest(request);
            ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add(Time.currentElapsedTime() - timeBeforeFinalProc);
        }
            // org.apache.zookeeper.server.quorum.Leader.ToBeAppliedRequestProcessor#processRequest
            public void processRequest(Request request) throws RequestProcessorException {
                // FinalRequestProcessor, 由 FinalRequestProcessor 进行数据持久化
                next.processRequest(request);
    
                // The only requests that should be on toBeApplied are write
                // requests, for which we will have a hdr. We can't simply use
                // request.zxid here because that is set on read requests to equal
                // the zxid of the last write op.
                if (request.getHdr() != null) {
                    long zxid = request.getHdr().getZxid();
                    Iterator<Proposal> iter = leader.toBeApplied.iterator();
                    if (iter.hasNext()) {
                        Proposal p = iter.next();
                        if (p.request != null && p.request.zxid == zxid) {
                            iter.remove();
                            return;
                        }
                    }
                    LOG.error("Committed request not found on toBeApplied: {}", request);
                }
            }
        // org.apache.zookeeper.server.FinalRequestProcessor#processRequest
        public void processRequest(Request request) {
            LOG.debug("Processing request:: {}", request);
    
            // request.addRQRec(">final");
            long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
            if (request.type == OpCode.ping) {
                traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
            }
            if (LOG.isTraceEnabled()) {
                ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
            }
    
            // DB 入库
            ProcessTxnResult rc = zks.processTxn(request);
    
            // ZOOKEEPER-558:
            // In some cases the server does not close the connection (e.g., closeconn buffer
            // was not being queued — ZOOKEEPER-558) properly. This happens, for example,
            // when the client closes the connection. The server should still close the session, though.
            // Calling closeSession() after losing the cnxn, results in the client close session response being dropped.
            if (request.type == OpCode.closeSession && connClosedByClient(request)) {
                // We need to check if we can close the session id.
                // Sometimes the corresponding ServerCnxnFactory could be null because
                // we are just playing diffs from the leader.
                if (closeSession(zks.serverCnxnFactory, request.sessionId)
                    || closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
                    return;
                }
            }
    
            if (request.getHdr() != null) {
                /*
                 * Request header is created only by the leader, so this must be
                 * a quorum request. Since we're comparing timestamps across hosts,
                 * this metric may be incorrect. However, it's still a very useful
                 * metric to track in the happy case. If there is clock drift,
                 * the latency can go negative. Note: headers use wall time, not
                 * CLOCK_MONOTONIC.
                 */
                long propagationLatency = Time.currentWallTime() - request.getHdr().getTime();
                if (propagationLatency >= 0) {
                    ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency);
                }
            }
    
            if (request.cnxn == null) {
                return;
            }
            ServerCnxn cnxn = request.cnxn;
    
            long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
    
            String lastOp = "NA";
            // Notify ZooKeeperServer that the request has finished so that it can
            // update any request accounting/throttling limits
            zks.decInProcess();
            zks.requestFinished(request);
            Code err = Code.OK;
            Record rsp = null;
            String path = null;
            try {
                if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
                    /*
                     * When local session upgrading is disabled, leader will
                     * reject the ephemeral node creation due to session expire.
                     * However, if this is the follower that issue the request,
                     * it will have the correct error code, so we should use that
                     * and report to user
                     */
                    if (request.getException() != null) {
                        throw request.getException();
                    } else {
                        throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));
                    }
                }
    
                KeeperException ke = request.getException();
                if (ke instanceof SessionMovedException) {
                    throw ke;
                }
                if (ke != null && request.type != OpCode.multi) {
                    throw ke;
                }
    
                LOG.debug("{}", request);
    
                if (request.isStale()) {
                    ServerMetrics.getMetrics().STALE_REPLIES.add(1);
                }
    
                switch (request.type) {
                case OpCode.ping: {
                    lastOp = "PING";
                    updateStats(request, lastOp, lastZxid);
    
                    cnxn.sendResponse(new ReplyHeader(-2, lastZxid, 0), null, "response");
                    return;
                }
                case OpCode.createSession: {
                    lastOp = "SESS";
                    updateStats(request, lastOp, lastZxid);
    
                    zks.finishSessionInit(request.cnxn, true);
                    return;
                }
                case OpCode.multi: {
                    lastOp = "MULT";
                    rsp = new MultiResponse();
    
                    for (ProcessTxnResult subTxnResult : rc.multiResult) {
    
                        OpResult subResult;
    
                        switch (subTxnResult.type) {
                        case OpCode.check:
                            subResult = new CheckResult();
                            break;
                        case OpCode.create:
                            subResult = new CreateResult(subTxnResult.path);
                            break;
                        case OpCode.create2:
                        case OpCode.createTTL:
                        case OpCode.createContainer:
                            subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);
                            break;
                        case OpCode.delete:
                        case OpCode.deleteContainer:
                            subResult = new DeleteResult();
                            break;
                        case OpCode.setData:
                            subResult = new SetDataResult(subTxnResult.stat);
                            break;
                        case OpCode.error:
                            subResult = new ErrorResult(subTxnResult.err);
                            if (subTxnResult.err == Code.SESSIONMOVED.intValue()) {
                                throw new SessionMovedException();
                            }
                            break;
                        default:
                            throw new IOException("Invalid type of op");
                        }
    
                        ((MultiResponse) rsp).add(subResult);
                    }
    
                    break;
                }
                case OpCode.multiRead: {
                    lastOp = "MLTR";
                    MultiOperationRecord multiReadRecord = new MultiOperationRecord();
                    ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord);
                    rsp = new MultiResponse();
                    OpResult subResult;
                    for (Op readOp : multiReadRecord) {
                        try {
                            Record rec;
                            switch (readOp.getType()) {
                            case OpCode.getChildren:
                                rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
                                subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren());
                                break;
                            case OpCode.getData:
                                rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
                                GetDataResponse gdr = (GetDataResponse) rec;
                                subResult = new GetDataResult(gdr.getData(), gdr.getStat());
                                break;
                            default:
                                throw new IOException("Invalid type of readOp");
                            }
                        } catch (KeeperException e) {
                            subResult = new ErrorResult(e.code().intValue());
                        }
                        ((MultiResponse) rsp).add(subResult);
                    }
                    break;
                }
                case OpCode.create: {
                    lastOp = "CREA";
                    rsp = new CreateResponse(rc.path);
                    err = Code.get(rc.err);
                    requestPathMetricsCollector.registerRequest(request.type, rc.path);
                    break;
                }
                case OpCode.create2:
                case OpCode.createTTL:
                case OpCode.createContainer: {
                    lastOp = "CREA";
                    rsp = new Create2Response(rc.path, rc.stat);
                    err = Code.get(rc.err);
                    requestPathMetricsCollector.registerRequest(request.type, rc.path);
                    break;
                }
                case OpCode.delete:
                case OpCode.deleteContainer: {
                    lastOp = "DELE";
                    err = Code.get(rc.err);
                    requestPathMetricsCollector.registerRequest(request.type, rc.path);
                    break;
                }
                case OpCode.setData: {
                    // 写数据
                    lastOp = "SETD";
                    rsp = new SetDataResponse(rc.stat);
                    err = Code.get(rc.err);
                    requestPathMetricsCollector.registerRequest(request.type, rc.path);
                    break;
                }
                case OpCode.reconfig: {
                    lastOp = "RECO";
                    rsp = new GetDataResponse(
                        ((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(),
                        rc.stat);
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.setACL: {
                    lastOp = "SETA";
                    rsp = new SetACLResponse(rc.stat);
                    err = Code.get(rc.err);
                    requestPathMetricsCollector.registerRequest(request.type, rc.path);
                    break;
                }
                case OpCode.closeSession: {
                    lastOp = "CLOS";
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.sync: {
                    lastOp = "SYNC";
                    SyncRequest syncRequest = new SyncRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest);
                    rsp = new SyncResponse(syncRequest.getPath());
                    requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
                    break;
                }
                case OpCode.check: {
                    lastOp = "CHEC";
                    rsp = new SetDataResponse(rc.stat);
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.exists: {
                    lastOp = "EXIS";
                    // TODO we need to figure out the security requirement for this!
                    ExistsRequest existsRequest = new ExistsRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
                    path = existsRequest.getPath();
                    if (path.indexOf('') != -1) {
                        throw new KeeperException.BadArgumentsException();
                    }
                    Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
                    rsp = new ExistsResponse(stat);
                    requestPathMetricsCollector.registerRequest(request.type, path);
                    break;
                }
                case OpCode.getData: {
                    lastOp = "GETD";
                    GetDataRequest getDataRequest = new GetDataRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
                    path = getDataRequest.getPath();
                    rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
                    requestPathMetricsCollector.registerRequest(request.type, path);
                    break;
                }
                case OpCode.setWatches: {
                    lastOp = "SETW";
                    SetWatches setWatches = new SetWatches();
                    // TODO We really should NOT need this!!!!
                    request.request.rewind();
                    ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
                    long relativeZxid = setWatches.getRelativeZxid();
                    zks.getZKDatabase()
                       .setWatches(
                           relativeZxid,
                           setWatches.getDataWatches(),
                           setWatches.getExistWatches(),
                           setWatches.getChildWatches(),
                           cnxn);
                    break;
                }
                case OpCode.getACL: {
                    lastOp = "GETA";
                    GetACLRequest getACLRequest = new GetACLRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest);
                    path = getACLRequest.getPath();
                    DataNode n = zks.getZKDatabase().getNode(path);
                    if (n == null) {
                        throw new KeeperException.NoNodeException();
                    }
                    zks.checkACL(
                        request.cnxn,
                        zks.getZKDatabase().aclForNode(n),
                        ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN, request.authInfo, path,
                        null);
    
                    Stat stat = new Stat();
                    List<ACL> acl = zks.getZKDatabase().getACL(path, stat);
                    requestPathMetricsCollector.registerRequest(request.type, getACLRequest.getPath());
    
                    try {
                        zks.checkACL(
                            request.cnxn,
                            zks.getZKDatabase().aclForNode(n),
                            ZooDefs.Perms.ADMIN,
                            request.authInfo,
                            path,
                            null);
                        rsp = new GetACLResponse(acl, stat);
                    } catch (KeeperException.NoAuthException e) {
                        List<ACL> acl1 = new ArrayList<ACL>(acl.size());
                        for (ACL a : acl) {
                            if ("digest".equals(a.getId().getScheme())) {
                                Id id = a.getId();
                                Id id1 = new Id(id.getScheme(), id.getId().replaceAll(":.*", ":x"));
                                acl1.add(new ACL(a.getPerms(), id1));
                            } else {
                                acl1.add(a);
                            }
                        }
                        rsp = new GetACLResponse(acl1, stat);
                    }
                    break;
                }
                case OpCode.getChildren: {
                    lastOp = "GETC";
                    GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
                    path = getChildrenRequest.getPath();
                    rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
                    requestPathMetricsCollector.registerRequest(request.type, path);
                    break;
                }
                case OpCode.getAllChildrenNumber: {
                    lastOp = "GETACN";
                    GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest);
                    path = getAllChildrenNumberRequest.getPath();
                    DataNode n = zks.getZKDatabase().getNode(path);
                    if (n == null) {
                        throw new KeeperException.NoNodeException();
                    }
                    zks.checkACL(
                        request.cnxn,
                        zks.getZKDatabase().aclForNode(n),
                        ZooDefs.Perms.READ,
                        request.authInfo,
                        path,
                        null);
                    int number = zks.getZKDatabase().getAllChildrenNumber(path);
                    rsp = new GetAllChildrenNumberResponse(number);
                    break;
                }
                case OpCode.getChildren2: {
                    lastOp = "GETC";
                    GetChildren2Request getChildren2Request = new GetChildren2Request();
                    ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request);
                    Stat stat = new Stat();
                    path = getChildren2Request.getPath();
                    DataNode n = zks.getZKDatabase().getNode(path);
                    if (n == null) {
                        throw new KeeperException.NoNodeException();
                    }
                    zks.checkACL(
                        request.cnxn,
                        zks.getZKDatabase().aclForNode(n),
                        ZooDefs.Perms.READ,
                        request.authInfo, path,
                        null);
                    List<String> children = zks.getZKDatabase()
                                               .getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
                    rsp = new GetChildren2Response(children, stat);
                    requestPathMetricsCollector.registerRequest(request.type, path);
                    break;
                }
                case OpCode.checkWatches: {
                    lastOp = "CHKW";
                    CheckWatchesRequest checkWatches = new CheckWatchesRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches);
                    WatcherType type = WatcherType.fromInt(checkWatches.getType());
                    path = checkWatches.getPath();
                    boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
                    if (!containsWatcher) {
                        String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
                        throw new KeeperException.NoWatcherException(msg);
                    }
                    requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath());
                    break;
                }
                case OpCode.removeWatches: {
                    lastOp = "REMW";
                    RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches);
                    WatcherType type = WatcherType.fromInt(removeWatches.getType());
                    path = removeWatches.getPath();
                    boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
                    if (!removed) {
                        String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
                        throw new KeeperException.NoWatcherException(msg);
                    }
                    requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath());
                    break;
                }
                case OpCode.getEphemerals: {
                    lastOp = "GETE";
                    GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals);
                    String prefixPath = getEphemerals.getPrefixPath();
                    Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
                    List<String> ephemerals = new ArrayList<>();
                    if (StringUtils.isBlank(prefixPath) || "/".equals(prefixPath.trim())) {
                        ephemerals.addAll(allEphems);
                    } else {
                        for (String p : allEphems) {
                            if (p.startsWith(prefixPath)) {
                                ephemerals.add(p);
                            }
                        }
                    }
                    rsp = new GetEphemeralsResponse(ephemerals);
                    break;
                }
                }
            } catch (SessionMovedException e) {
                // session moved is a connection level error, we need to tear
                // down the connection otw ZOOKEEPER-710 might happen
                // ie client on slow follower starts to renew session, fails
                // before this completes, then tries the fast follower (leader)
                // and is successful, however the initial renew is then
                // successfully fwd/processed by the leader and as a result
                // the client and leader disagree on where the client is most
                // recently attached (and therefore invalid SESSION MOVED generated)
                cnxn.sendCloseSession();
                return;
            } catch (KeeperException e) {
                err = e.code();
            } catch (Exception e) {
                // log at error level as we are returning a marshalling
                // error to the user
                LOG.error("Failed to process {}", request, e);
                StringBuilder sb = new StringBuilder();
                ByteBuffer bb = request.request;
                bb.rewind();
                while (bb.hasRemaining()) {
                    sb.append(Integer.toHexString(bb.get() & 0xff));
                }
                LOG.error("Dumping request buffer: 0x{}", sb.toString());
                err = Code.MARSHALLINGERROR;
            }
    
            ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
            // 更新状态
            updateStats(request, lastOp, lastZxid);
    
            try {
                if (request.type == OpCode.getData && path != null && rsp != null) {
                    // Serialized read responses could be cached by the connection object.
                    // Cache entries are identified by their path and last modified zxid,
                    // so these values are passed along with the response.
                    GetDataResponse getDataResponse = (GetDataResponse) rsp;
                    Stat stat = null;
                    if (getDataResponse.getStat() != null) {
                        stat = getDataResponse.getStat();
                    }
                    cnxn.sendResponse(hdr, rsp, "response", path, stat);
                } else {
                    // 返回数据给客户端
                    cnxn.sendResponse(hdr, rsp, "response");
                }
                if (request.type == OpCode.closeSession) {
                    cnxn.sendCloseSession();
                }
            } catch (IOException e) {
                LOG.error("FIXMSG", e);
            }
        }
    
        // org.apache.zookeeper.server.ZooKeeperServer#processTxn
        // entry point for FinalRequestProcessor.java
        public ProcessTxnResult processTxn(Request request) {
            TxnHeader hdr = request.getHdr();
            processTxnForSessionEvents(request, hdr, request.getTxn());
    
            final boolean writeRequest = (hdr != null);
            final boolean quorumRequest = request.isQuorum();
    
            // return fast w/o synchronization when we get a read
            if (!writeRequest && !quorumRequest) {
                return new ProcessTxnResult();
            }
            synchronized (outstandingChanges) {
                ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn());
    
                // request.hdr is set for write requests, which are the only ones
                // that add to outstandingChanges.
                if (writeRequest) {
                    long zxid = hdr.getZxid();
                    while (!outstandingChanges.isEmpty()
                            && outstandingChanges.peek().zxid <= zxid) {
                        ChangeRecord cr = outstandingChanges.remove();
                        ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
                        if (cr.zxid < zxid) {
                            LOG.warn(
                                "Zxid outstanding 0x{} is less than current 0x{}",
                                Long.toHexString(cr.zxid),
                                Long.toHexString(zxid));
                        }
                        if (outstandingChangesForPath.get(cr.path) == cr) {
                            outstandingChangesForPath.remove(cr.path);
                        }
                    }
                }
    
                // do not add non quorum packets to the queue.
                // 此处的队列只是为了方便 follower 能够快速复制数据
                if (quorumRequest) {
                    getZKDatabase().addCommittedProposal(request);
                }
                return rc;
            }
        }

    四、Leader#propose 进行事务操作投票保证数据的高可用性

        // 半数支持原则,主要是 组装 QuorumPacket, 然后发送给各个投票节点
        // org.apache.zookeeper.server.quorum.Leader#propose
        /**
         * create a proposal and send it out to all the members
         *
         * @param request
         * @return the proposal that is queued to send to all the members
         */
        public Proposal propose(Request request) throws XidRolloverException {
            /**
             * Address the rollover issue. All lower 32bits set indicate a new leader
             * election. Force a re-election instead. See ZOOKEEPER-1277
             */
            if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
                String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
                shutdown(msg);
                throw new XidRolloverException(msg);
            }
    
            byte[] data = SerializeUtils.serializeRequest(request);
            proposalStats.setLastBufferSize(data.length);
            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
    
            Proposal p = new Proposal();
            p.packet = pp;
            p.request = request;
    
            synchronized (this) {
                p.addQuorumVerifier(self.getQuorumVerifier());
    
                if (request.getHdr().getType() == OpCode.reconfig) {
                    self.setLastSeenQuorumVerifier(request.qv, true);
                }
    
                if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) {
                    p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
                }
    
                LOG.debug("Proposing:: {}", request);
    
                lastProposed = p.packet.getZxid();
                outstandingProposals.put(lastProposed, p);
                // 发送数据包给各投票节点
                sendPacket(pp);
            }
            ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
            return p;
        }
        
        /**
         * send a packet to all the followers ready to follow
         *
         * @param qp
         *                the packet to be sent
         */
        void sendPacket(QuorumPacket qp) {
            synchronized (forwardingFollowers) {
                for (LearnerHandler f : forwardingFollowers) {
                    // 添加到 LearnerHandler 的队列,就返回,可见 LearnerHandler 肯定又是一个异步任务
                    f.queuePacket(qp);
                }
            }
        }
        
        // org.apache.zookeeper.server.quorum.LearnerHandler#queuePacket
        void queuePacket(QuorumPacket p) {
            queuedPackets.add(p);
            // Add a MarkerQuorumPacket at regular intervals.
            // 周期性的放一个 Marker 到队列中
            if (shouldSendMarkerPacketForLogging() && packetCounter.getAndIncrement() % markerPacketInterval == 0) {
                queuedPackets.add(new MarkerQuorumPacket(System.nanoTime()));
            }
            queuedPacketsSize.addAndGet(packetSize(p));
        }

    五、 LearnerHandler 投票通信线程处理提交过来的投票队列

      上一节我们看到,Leader 只是交事务投票的包添加到 LearnerHandler 的队列后就返回了,所以, LearnerHandler 肯定有自己另外的处理逻辑。否则如何跟其他节点通信呢?

        // org.apache.zookeeper.server.quorum.LearnerHandler#sendPackets
        /**
         * This method will use the thread to send packets added to the
         * queuedPackets list
         *
         * @throws InterruptedException
         */
        private void sendPackets() throws InterruptedException {
            long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
            // 一个死循环一直处理发送队列
            while (true) {
                try {
                    QuorumPacket p;
                    p = queuedPackets.poll();
                    if (p == null) {
                        bufferedOutput.flush();
                        p = queuedPackets.take();
                    }
    
                    ServerMetrics.getMetrics().LEARNER_HANDLER_QP_SIZE.add(Long.toString(this.sid), queuedPackets.size());
    
                    // 前面看到会定期的提交 MarkerQuorumPacket 过来,但这里会忽略掉,无实际影响
                    if (p instanceof MarkerQuorumPacket) {
                        MarkerQuorumPacket m = (MarkerQuorumPacket) p;
                        ServerMetrics.getMetrics().LEARNER_HANDLER_QP_TIME
                            .add(Long.toString(this.sid), (System.nanoTime() - m.time) / 1000000L);
                        continue;
                    }
    
                    queuedPacketsSize.addAndGet(-packetSize(p));
                    if (p == proposalOfDeath) {
                        // Packet of death!
                        break;
                    }
                    // ping 包
                    if (p.getType() == Leader.PING) {
                        traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
                    }
                    // 投票包,我们要观察的包在这里
                    if (p.getType() == Leader.PROPOSAL) {
                        syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
                    }
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
                    }
    
                    // Log the zxid of the last request, if it is a valid zxid.
                    if (p.getZxid() > 0) {
                        lastZxid = p.getZxid();
                    }
                    // 往socket中写入投票请求
                    oa.writeRecord(p, "packet");
                    packetsSent.incrementAndGet();
                    messageTracker.trackSent(p.getType());
                } catch (IOException e) {
                    if (!sock.isClosed()) {
                        LOG.warn("Unexpected exception at {}", this, e);
                        try {
                            // this will cause everything to shutdown on
                            // this learner handler and will help notify
                            // the learner/observer instantaneously
                            sock.close();
                        } catch (IOException ie) {
                            LOG.warn("Error closing socket for handler {}", this, ie);
                        }
                    }
                    break;
                }
            }
        }
        // 上面这个发送线程,是在 LearnerHandler 被调用时生成的
        /**
         * Start thread that will forward any packet in the queue to the follower
         */
        protected void startSendingPackets() {
            if (!sendingThreadStarted) {
                // Start sending packets
                new Thread() {
                    public void run() {
                        Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());
                        try {
                            sendPackets();
                        } catch (InterruptedException e) {
                            LOG.warn("Unexpected interruption", e);
                        }
                    }
                }.start();
                sendingThreadStarted = true;
            } else {
                LOG.error("Attempting to start sending thread after it already started");
            }
        }
        
        /**
         * This thread will receive packets from the peer and process them and
         * also listen to new connections from new peers.
         */
        @Override
        public void run() {
            try {
                learnerMaster.addLearnerHandler(this);
                tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline();
    
                ia = BinaryInputArchive.getArchive(bufferedInput);
                bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
                oa = BinaryOutputArchive.getArchive(bufferedOutput);
    
                QuorumPacket qp = new QuorumPacket();
                ia.readRecord(qp, "packet");
    
                messageTracker.trackReceived(qp.getType());
                if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
                    LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString());
    
                    return;
                }
    
                if (learnerMaster instanceof ObserverMaster && qp.getType() != Leader.OBSERVERINFO) {
                    throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType());
                }
                byte[] learnerInfoData = qp.getData();
                if (learnerInfoData != null) {
                    ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
                    if (learnerInfoData.length >= 8) {
                        this.sid = bbsid.getLong();
                    }
                    if (learnerInfoData.length >= 12) {
                        this.version = bbsid.getInt(); // protocolVersion
                    }
                    if (learnerInfoData.length >= 20) {
                        long configVersion = bbsid.getLong();
                        if (configVersion > learnerMaster.getQuorumVerifierVersion()) {
                            throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                        }
                    }
                } else {
                    this.sid = learnerMaster.getAndDecrementFollowerCounter();
                }
    
                String followerInfo = learnerMaster.getPeerInfo(this.sid);
                if (followerInfo.isEmpty()) {
                    LOG.info(
                        "Follower sid: {} not in the current config {}",
                        this.sid,
                        Long.toHexString(learnerMaster.getQuorumVerifierVersion()));
                } else {
                    LOG.info("Follower sid: {} : info : {}", this.sid, followerInfo);
                }
    
                if (qp.getType() == Leader.OBSERVERINFO) {
                    learnerType = LearnerType.OBSERVER;
                }
    
                learnerMaster.registerLearnerHandlerBean(this, sock);
    
                long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
    
                long peerLastZxid;
                StateSummary ss = null;
                long zxid = qp.getZxid();
                long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
                long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
    
                if (this.getVersion() < 0x10000) {
                    // we are going to have to extrapolate the epoch information
                    long epoch = ZxidUtils.getEpochFromZxid(zxid);
                    ss = new StateSummary(epoch, zxid);
                    // fake the message
                    learnerMaster.waitForEpochAck(this.getSid(), ss);
                } else {
                    byte[] ver = new byte[4];
                    ByteBuffer.wrap(ver).putInt(0x10000);
                    QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
                    oa.writeRecord(newEpochPacket, "packet");
                    messageTracker.trackSent(Leader.LEADERINFO);
                    bufferedOutput.flush();
                    QuorumPacket ackEpochPacket = new QuorumPacket();
                    ia.readRecord(ackEpochPacket, "packet");
                    messageTracker.trackReceived(ackEpochPacket.getType());
                    if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                        LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString());
                        return;
                    }
                    ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                    ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
                    learnerMaster.waitForEpochAck(this.getSid(), ss);
                }
                peerLastZxid = ss.getLastZxid();
    
                // Take any necessary action if we need to send TRUNC or DIFF
                // startForwarding() will be called in all cases
                boolean needSnap = syncFollower(peerLastZxid, learnerMaster);
    
                // syncs between followers and the leader are exempt from throttling because it
                // is importatnt to keep the state of quorum servers up-to-date. The exempted syncs
                // are counted as concurrent syncs though
                boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
                /* if we are not truncating or sending a diff just send a snapshot */
                if (needSnap) {
                    syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
                    syncThrottler.beginSync(exemptFromThrottle);
                    try {
                        long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
                        oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                        messageTracker.trackSent(Leader.SNAP);
                        bufferedOutput.flush();
    
                        LOG.info(
                            "Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
                                + "send zxid of db as 0x{}, {} concurrent snapshot sync, "
                                + "snapshot sync was {} from throttle",
                            Long.toHexString(peerLastZxid),
                            Long.toHexString(leaderLastZxid),
                            Long.toHexString(zxidToSend),
                            syncThrottler.getSyncInProgress(),
                            exemptFromThrottle ? "exempt" : "not exempt");
                        // Dump data to peer
                        learnerMaster.getZKDatabase().serializeSnapshot(oa);
                        oa.writeString("BenWasHere", "signature");
                        bufferedOutput.flush();
                    } finally {
                        ServerMetrics.getMetrics().SNAP_COUNT.add(1);
                    }
                } else {
                    syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
                    syncThrottler.beginSync(exemptFromThrottle);
                    ServerMetrics.getMetrics().DIFF_COUNT.add(1);
                }
    
                LOG.debug("Sending NEWLEADER message to {}", sid);
                // the version of this quorumVerifier will be set by leader.lead() in case
                // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
                // we got here, so the version was set
                if (getVersion() < 0x10000) {
                    QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
                    oa.writeRecord(newLeaderQP, "packet");
                } else {
                    QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
                    queuedPackets.add(newLeaderQP);
                }
                bufferedOutput.flush();
    
                // Start thread that blast packets in the queue to learner
                startSendingPackets();
    
                /*
                 * Have to wait for the first ACK, wait until
                 * the learnerMaster is ready, and only then we can
                 * start processing messages.
                 */
                qp = new QuorumPacket();
                ia.readRecord(qp, "packet");
    
                messageTracker.trackReceived(qp.getType());
                if (qp.getType() != Leader.ACK) {
                    LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp));
                    return;
                }
    
                LOG.debug("Received NEWLEADER-ACK message from {}", sid);
    
                learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());
    
                syncLimitCheck.start();
                // sync ends when NEWLEADER-ACK is received
                syncThrottler.endSync();
                syncThrottler = null;
    
                // now that the ack has been processed expect the syncLimit
                sock.setSoTimeout(learnerMaster.syncTimeout());
    
                /*
                 * Wait until learnerMaster starts up
                 */
                learnerMaster.waitForStartup();
    
                // Mutation packets will be queued during the serialize,
                // so we need to mark when the peer can actually start
                // using the data
                //
                LOG.debug("Sending UPTODATE message to {}", sid);
                queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
    
                while (true) {
                    qp = new QuorumPacket();
                    ia.readRecord(qp, "packet");
                    messageTracker.trackReceived(qp.getType());
    
                    long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
                    if (qp.getType() == Leader.PING) {
                        traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
                    }
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
                    }
                    tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline();
    
                    packetsReceived.incrementAndGet();
    
                    ByteBuffer bb;
                    long sessionId;
                    int cxid;
                    int type;
    
                    switch (qp.getType()) {
                    case Leader.ACK:
                        if (this.learnerType == LearnerType.OBSERVER) {
                            LOG.debug("Received ACK from Observer {}", this.sid);
                        }
                        syncLimitCheck.updateAck(qp.getZxid());
                        // 提交 ack, 即收到了一提交的请求,则调用 Leader 进行处理
                        learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                        break;
                    case Leader.PING:
                        // Process the touches
                        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
                        DataInputStream dis = new DataInputStream(bis);
                        while (dis.available() > 0) {
                            long sess = dis.readLong();
                            int to = dis.readInt();
                            learnerMaster.touch(sess, to);
                        }
                        break;
                    case Leader.REVALIDATE:
                        ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1);
                        learnerMaster.revalidateSession(qp, this);
                        break;
                    case Leader.REQUEST:
                        bb = ByteBuffer.wrap(qp.getData());
                        sessionId = bb.getLong();
                        cxid = bb.getInt();
                        type = bb.getInt();
                        bb = bb.slice();
                        Request si;
                        if (type == OpCode.sync) {
                            si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                        } else {
                            si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                        }
                        si.setOwner(this);
                        learnerMaster.submitLearnerRequest(si);
                        requestsReceived.incrementAndGet();
                        break;
                    default:
                        LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
                        break;
                    }
                }
            } catch (IOException e) {
                if (sock != null && !sock.isClosed()) {
                    LOG.error("Unexpected exception causing shutdown while sock still open", e);
                    //close the socket to make sure the
                    //other side can see it being close
                    try {
                        sock.close();
                    } catch (IOException ie) {
                        // do nothing
                    }
                }
            } catch (InterruptedException e) {
                LOG.error("Unexpected exception in LearnerHandler.", e);
            } catch (SyncThrottleException e) {
                LOG.error("too many concurrent sync.", e);
                syncThrottler = null;
            } catch (Exception e) {
                LOG.error("Unexpected exception in LearnerHandler.", e);
                throw e;
            } finally {
                if (syncThrottler != null) {
                    syncThrottler.endSync();
                    syncThrottler = null;
                }
                String remoteAddr = getRemoteAddress();
                LOG.warn("******* GOODBYE {} ********", remoteAddr);
                messageTracker.dumpToLog(remoteAddr);
                shutdown();
            }
        }
    
        // org.apache.zookeeper.server.quorum.Leader#processAck
        /**
         * Keep a count of acks that are received by the leader for a particular
         * proposal
         *
         * @param zxid, the zxid of the proposal sent out
         * @param sid, the id of the server that sent the ack
         * @param followerAddr
         */
        @Override
        public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {
            if (!allowedToCommit) {
                return; // last op committed was a leader change - from now on
            }
            // the new leader should commit
            if (LOG.isTraceEnabled()) {
                LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
                for (Proposal p : outstandingProposals.values()) {
                    long packetZxid = p.packet.getZxid();
                    LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid));
                }
                LOG.trace("outstanding proposals all");
            }
    
            if ((zxid & 0xffffffffL) == 0) {
                /*
                 * We no longer process NEWLEADER ack with this method. However,
                 * the learner sends an ack back to the leader after it gets
                 * UPTODATE, so we just ignore the message.
                 */
                return;
            }
    
            if (outstandingProposals.size() == 0) {
                LOG.debug("outstanding is 0");
                return;
            }
            if (lastCommitted >= zxid) {
                LOG.debug(
                    "proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
                    Long.toHexString(lastCommitted),
                    Long.toHexString(zxid));
                // The proposal has already been committed
                return;
            }
            Proposal p = outstandingProposals.get(zxid);
            if (p == null) {
                LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr);
                return;
            }
    
            if (ackLoggingFrequency > 0 && (zxid % ackLoggingFrequency == 0)) {
                p.request.logLatency(ServerMetrics.getMetrics().ACK_LATENCY, Long.toString(sid));
            }
    
            p.addAck(sid);
    
            // 尝试 commit
            boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
    
            // If p is a reconfiguration, multiple other operations may be ready to be committed,
            // since operations wait for different sets of acks.
            // Currently we only permit one outstanding reconfiguration at a time
            // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
            // pending all wait for a quorum of old and new config, so its not possible to get enough acks
            // for an operation without getting enough acks for preceding ops. But in the future if multiple
            // concurrent reconfigs are allowed, this can happen and then we need to check whether some pending
            // ops may already have enough acks and can be committed, which is what this code does.
    
            if (hasCommitted && p.request != null && p.request.getHdr().getType() == OpCode.reconfig) {
                long curZxid = zxid;
                while (allowedToCommit && hasCommitted && p != null) {
                    curZxid++;
                    p = outstandingProposals.get(curZxid);
                    if (p != null) {
                        hasCommitted = tryToCommit(p, curZxid, null);
                    }
                }
            }
        }
        
        // org.apache.zookeeper.server.quorum.Leader#tryToCommit
        /**
         * @return True if committed, otherwise false.
         **/
        public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
            // make sure that ops are committed in order. With reconfigurations it is now possible
            // that different operations wait for different sets of acks, and we still want to enforce
            // that they are committed in order. Currently we only permit one outstanding reconfiguration
            // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
            // pending all wait for a quorum of old and new config, so it's not possible to get enough acks
            // for an operation without getting enough acks for preceding ops. But in the future if multiple
            // concurrent reconfigs are allowed, this can happen.
            if (outstandingProposals.containsKey(zxid - 1)) {
                return false;
            }
    
            // in order to be committed, a proposal must be accepted by a quorum.
            //
            // getting a quorum from all necessary configurations.
            if (!p.hasAllQuorums()) {
                return false;
            }
    
            // commit proposals in order
            if (zxid != lastCommitted + 1) {
                LOG.warn(
                    "Commiting zxid 0x{} from {} noy first!",
                    Long.toHexString(zxid),
                    followerAddr);
                LOG.warn("First is {}", (lastCommitted + 1));
            }
    
            outstandingProposals.remove(zxid);
    
            if (p.request != null) {
                toBeApplied.add(p);
            }
    
            if (p.request == null) {
                LOG.warn("Going to commit null: {}", p);
            } else if (p.request.getHdr().getType() == OpCode.reconfig) {
                LOG.debug("Committing a reconfiguration! {}", outstandingProposals.size());
    
                //if this server is voter in new config with the same quorum address,
                //then it will remain the leader
                //otherwise an up-to-date follower will be designated as leader. This saves
                //leader election time, unless the designated leader fails
                Long designatedLeader = getDesignatedLeader(p, zxid);
                //LOG.warn("designated leader is: " + designatedLeader);
    
                QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size() - 1).getQuorumVerifier();
    
                self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
    
                if (designatedLeader != self.getId()) {
                    allowedToCommit = false;
                }
    
                // we're sending the designated leader, and if the leader is changing the followers are
                // responsible for closing the connection - this way we are sure that at least a majority of them
                // receive the commit message.
                commitAndActivate(zxid, designatedLeader);
                informAndActivate(p, designatedLeader);
                //turnOffFollowers();
            } else {
                p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY);
                commit(zxid);
                inform(p);
            }
            // 依次提交事务 request, 其实就是放到 CommitProcessor 的提交队列
            zk.commitProcessor.commit(p.request);
            if (pendingSyncs.containsKey(zxid)) {
                // 为 Learner 发送 SYNC 通知
                for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) {
                    sendSync(r);
                }
            }
    
            return true;
        }

    六、 SyncRequestProcessor 数据持久化处理器

        // org.apache.zookeeper.server.quorum.ProposalRequestProcessor#processRequest
        public void processRequest(Request request) throws RequestProcessorException {
            // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
            // request.type + " id = " + request.sessionId);
            // request.addRQRec(">prop");
    
    
            /* In the following IF-THEN-ELSE block, we process syncs on the leader.
             * If the sync is coming from a follower, then the follower
             * handler adds it to syncHandler. Otherwise, if it is a client of
             * the leader that issued the sync command, then syncHandler won't
             * contain the handler. In this case, we add it to syncHandler, and
             * call processRequest on the next processor.
             */
    
            if (request instanceof LearnerSyncRequest) {
                zks.getLeader().processSync((LearnerSyncRequest) request);
            } else {
                nextProcessor.processRequest(request);
                if (request.getHdr() != null) {
                    // We need to sync and get consensus on any transactions
                    try {
                        zks.getLeader().propose(request);
                    } catch (XidRolloverException e) {
                        throw new RequestProcessorException(e.getMessage(), e);
                    }
                    // 只有写事务,才需要进行持久化操作
                    syncProcessor.processRequest(request);
                }
            }
        }
    
        // org.apache.zookeeper.server.SyncRequestProcessor#processRequest
        public void processRequest(final Request request) {
            Objects.requireNonNull(request, "Request cannot be null");
    
            request.syncQueueStartTime = Time.currentElapsedTime();
            // 添加到队列后返回
            queuedRequests.add(request);
            ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
        }
        
        // org.apache.zookeeper.server.SyncRequestProcessor#run
        @Override
        public void run() {
            try {
                // we do this in an attempt to ensure that not all of the servers
                // in the ensemble take a snapshot at the same time
                resetSnapshotStats();
                lastFlushTime = Time.currentElapsedTime();
                while (true) {
                    ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
    
                    long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
                    // 从 queuedRequests 队列取数据
                    Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
                    if (si == null) {
                        /* We timed out looking for more writes to batch, go ahead and flush immediately */
                        flush();
                        si = queuedRequests.take();
                    }
    
                    if (si == REQUEST_OF_DEATH) {
                        break;
                    }
    
                    long startProcessTime = Time.currentElapsedTime();
                    ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
    
                    // track the number of records written to the log
                    // 添加到 事务日志中
                    if (zks.getZKDatabase().append(si)) {
                        if (shouldSnapshot()) {
                            resetSnapshotStats();
                            // roll the log
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if (!snapThreadMutex.tryAcquire()) {
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                new ZooKeeperThread("Snapshot Thread") {
                                    public void run() {
                                        try {
                                            zks.takeSnapshot();
                                        } catch (Exception e) {
                                            LOG.warn("Unexpected exception", e);
                                        } finally {
                                            snapThreadMutex.release();
                                        }
                                    }
                                }.start();
                            }
                        }
                    } else if (toFlush.isEmpty()) {
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        if (nextProcessor != null) {
                            nextProcessor.processRequest(si);
                            if (nextProcessor instanceof Flushable) {
                                ((Flushable) nextProcessor).flush();
                            }
                        }
                        continue;
                    }
                    toFlush.add(si);
                    // 2
                    // 刷写数据到磁盘,完成 Sync 操作
                    if (shouldFlush()) {
                        flush();
                    }
                    ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
                }
            } catch (Throwable t) {
                handleException(this.getName(), t);
            }
            LOG.info("SyncRequestProcessor exited!");
        }
    
        /** If both flushDelay and maxMaxBatchSize are set (bigger than 0), flush
         * whenever either condition is hit. If only one or the other is
         * set, flush only when the relevant condition is hit.
         */
        private boolean shouldFlush() {
            long flushDelay = zks.getFlushDelay();
            long maxBatchSize = zks.getMaxBatchSize();
            if ((flushDelay > 0) && (getRemainingDelay() == 0)) {
                return true;
            }
            return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
        }
        

      至此,一个数据的更新操作就完成了。看起来是有点复杂呢。

      主要有:

        1. 判断写事务;
        2. 发起投票操作;
        3. 发送 ack 回应;
        4. 可以提交事务, 交给 CommitProcessor;
        5. 刷写数据到磁盘;

    七、 等等,各个事务的投票是如何处理的?

      是的,前面我们看到通过队列,将请求包给到了 LearnerHandler 的两个线程。顶多也只看到了发送的过程,并未看到有什么超过半数投票之类的动作,它在哪里呢?

      其实是在 tryToCommit 的时候进行判断的。

        // org.apache.zookeeper.server.quorum.Leader#tryToCommit
        /**
         * @return True if committed, otherwise false.
         **/
        public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
            // make sure that ops are committed in order. With reconfigurations it is now possible
            // that different operations wait for different sets of acks, and we still want to enforce
            // that they are committed in order. Currently we only permit one outstanding reconfiguration
            // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
            // pending all wait for a quorum of old and new config, so it's not possible to get enough acks
            // for an operation without getting enough acks for preceding ops. But in the future if multiple
            // concurrent reconfigs are allowed, this can happen.
            if (outstandingProposals.containsKey(zxid - 1)) {
                return false;
            }
    
            // in order to be committed, a proposal must be accepted by a quorum.
            //
            // getting a quorum from all necessary configurations.
            // 确认是否达到了可以通过的确认数量,如果没有则不提交。
            // 如果超过了半数提交,则直接进入提交后续,当然,这会有并发控制
            if (!p.hasAllQuorums()) {
                return false;
            }
    
            // commit proposals in order
            if (zxid != lastCommitted + 1) {
                LOG.warn(
                    "Commiting zxid 0x{} from {} noy first!",
                    Long.toHexString(zxid),
                    followerAddr);
                LOG.warn("First is {}", (lastCommitted + 1));
            }
    
            outstandingProposals.remove(zxid);
    
            if (p.request != null) {
                toBeApplied.add(p);
            }
    
            if (p.request == null) {
                LOG.warn("Going to commit null: {}", p);
            } else if (p.request.getHdr().getType() == OpCode.reconfig) {
                LOG.debug("Committing a reconfiguration! {}", outstandingProposals.size());
    
                //if this server is voter in new config with the same quorum address,
                //then it will remain the leader
                //otherwise an up-to-date follower will be designated as leader. This saves
                //leader election time, unless the designated leader fails
                Long designatedLeader = getDesignatedLeader(p, zxid);
                //LOG.warn("designated leader is: " + designatedLeader);
    
                QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size() - 1).getQuorumVerifier();
    
                self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
    
                if (designatedLeader != self.getId()) {
                    allowedToCommit = false;
                }
    
                // we're sending the designated leader, and if the leader is changing the followers are
                // responsible for closing the connection - this way we are sure that at least a majority of them
                // receive the commit message.
                commitAndActivate(zxid, designatedLeader);
                informAndActivate(p, designatedLeader);
                //turnOffFollowers();
            } else {
                p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY);
                // 提交后,会把 lastCommitted 设置为当前 zxid
                // 会给 各节点发送 COMMIT 信息提交
                commit(zxid);
                inform(p);
            }
            zk.commitProcessor.commit(p.request);
            if (pendingSyncs.containsKey(zxid)) {
                for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) {
                    sendSync(r);
                }
            }
    
            return true;
        }
        // org.apache.zookeeper.server.quorum.SyncedLearnerTracker#hasAllQuorums
        public boolean hasAllQuorums() {
            for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
                // 此处是判断是否超过半数的请求被 ack
                if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())) {
                    return false;
                }
            }
            // 如果超过半数 ack, 则进行提交动作
            return true;
        }
        // org.apache.zookeeper.server.quorum.flexible.QuorumMaj#containsQuorum
        /**
         * Verifies if a set is a majority. Assumes that ackSet contains acks only
         * from votingMembers
         */
        public boolean containsQuorum(Set<Long> ackSet) {
            return (ackSet.size() > half);
        }
        // 通知提交事务
        // org.apache.zookeeper.server.quorum.Leader#commit
        /**
         * Create a commit packet and send it to all the members of the quorum
         *
         * @param zxid
         */
        public void commit(long zxid) {
            synchronized (this) {
                lastCommitted = zxid;
            }
            QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
            sendPacket(qp);
            ServerMetrics.getMetrics().COMMIT_COUNT.add(1);
        }
        // 从上面可以看出,每收到一个节点的响应,就会走一次提交判断逻辑,且只要超过一半就会进行提交,如果不作后续限制,则肯定会出重复提交问题,这个问题是在 processAck 里通过 zxid 来判断的
        
        /**
         * Keep a count of acks that are received by the leader for a particular
         * proposal
         *
         * @param zxid, the zxid of the proposal sent out
         * @param sid, the id of the server that sent the ack
         * @param followerAddr
         */
        @Override
        public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {
            if (!allowedToCommit) {
                return; // last op committed was a leader change - from now on
            }
            // the new leader should commit
            if (LOG.isTraceEnabled()) {
                LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
                for (Proposal p : outstandingProposals.values()) {
                    long packetZxid = p.packet.getZxid();
                    LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid));
                }
                LOG.trace("outstanding proposals all");
            }
    
            if ((zxid & 0xffffffffL) == 0) {
                /*
                 * We no longer process NEWLEADER ack with this method. However,
                 * the learner sends an ack back to the leader after it gets
                 * UPTODATE, so we just ignore the message.
                 */
                return;
            }
    
            if (outstandingProposals.size() == 0) {
                LOG.debug("outstanding is 0");
                return;
            }
            // 如果已提交,则 lastCommitted 会变大,从而使本次提交失效返回
            if (lastCommitted >= zxid) {
                LOG.debug(
                    "proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
                    Long.toHexString(lastCommitted),
                    Long.toHexString(zxid));
                // The proposal has already been committed
                return;
            }
            Proposal p = outstandingProposals.get(zxid);
            if (p == null) {
                LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr);
                return;
            }
    
            if (ackLoggingFrequency > 0 && (zxid % ackLoggingFrequency == 0)) {
                p.request.logLatency(ServerMetrics.getMetrics().ACK_LATENCY, Long.toString(sid));
            }
    
            p.addAck(sid);
    
            boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
    
            // If p is a reconfiguration, multiple other operations may be ready to be committed,
            // since operations wait for different sets of acks.
            // Currently we only permit one outstanding reconfiguration at a time
            // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
            // pending all wait for a quorum of old and new config, so its not possible to get enough acks
            // for an operation without getting enough acks for preceding ops. But in the future if multiple
            // concurrent reconfigs are allowed, this can happen and then we need to check whether some pending
            // ops may already have enough acks and can be committed, which is what this code does.
    
            if (hasCommitted && p.request != null && p.request.getHdr().getType() == OpCode.reconfig) {
                long curZxid = zxid;
                while (allowedToCommit && hasCommitted && p != null) {
                    curZxid++;
                    p = outstandingProposals.get(curZxid);
                    if (p != null) {
                        hasCommitted = tryToCommit(p, curZxid, null);
                    }
                }
            }
        }

      至此,整体流程完成!Leader 任务完成, Follower 的流程略有差异。但是肯定没Leader的复杂,感兴趣同学可以去看看。

      虽然一个看起来很简单的更新动作,在这里显得如此之复杂,但是在准确性和高可用面前,内部的复杂性是必要的。至少在你使用时只需set一下就可以了,却享受了api带给你的关键保障,这也是基础设施给我们带来的好处。

  • 相关阅读:
    error: <item> inner element must either be a resource reference or empty.
    PEM routines:PEM_read_bio:no start line
    Android Https双向认证 + GRPC
    git tag用法
    Linux文件查找
    Detected problems with API compatibility(visit g.co/dev/appcompat for more info)
    Android NDK开发调试
    Beyond-Compare 4 -linux 破解
    Ubuntu下Gradle环境配置
    多线程系列一
  • 原文地址:https://www.cnblogs.com/yougewe/p/11807390.html
Copyright © 2011-2022 走看看