zoukankan      html  css  js  c++  java
  • zookeeper源码分析之五服务端(集群leader)处理请求流程

     leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer。它规定了请求到达leader时需要经历的路径:

    PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor

    具体情况可以参看代码:

    @Override
        protected void setupRequestProcessors() {
            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
            RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
            commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                    Long.toString(getServerId()), false,
                    getZooKeeperServerListener());
            commitProcessor.start();
            ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                    commitProcessor);
            proposalProcessor.initialize();
            prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
            prepRequestProcessor.start();
            firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
    
            setupContainerManager();
        }

     让我们一步步分析这些RP都做了什么工作?其中PrepRequestProcessor、FinalRequestProcessor已经在上篇文章中做了分析:

    zookeeper源码分析之四服务端(单机)处理请求流程

    那我们就开始余下的RP吧

    1. ProposalRequestProcessor

    这个RP仅仅将请求转发到AckRequestProcessor和SyncRequestProcessor上,看具体代码:

    public void processRequest(Request request) throws RequestProcessorException {
            // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
            // request.type + " id = " + request.sessionId);
            // request.addRQRec(">prop");
    
    
            /* In the following IF-THEN-ELSE block, we process syncs on the leader.
             * If the sync is coming from a follower, then the follower
             * handler adds it to syncHandler. Otherwise, if it is a client of
             * the leader that issued the sync command, then syncHandler won't
             * contain the handler. In this case, we add it to syncHandler, and
             * call processRequest on the next processor.
             */
    
            if (request instanceof LearnerSyncRequest){
                zks.getLeader().processSync((LearnerSyncRequest)request);
            } else {
                nextProcessor.processRequest(request);
                if (request.getHdr() != null) {
                    // We need to sync and get consensus on any transactions
                    try {
                        zks.getLeader().propose(request);
                    } catch (XidRolloverException e) {
                        throw new RequestProcessorException(e.getMessage(), e);
                    }
                    syncProcessor.processRequest(request);
                }
            }
        }

    SyncRequestProcessor 我们已经在上文中进行了分析,这里就不在赘述了,那就看看AckRequestProcessor的工作是什么吧?

    AckRequestProcessor仅仅将发送过来的请求作为ACk转发给leader。代码见明细:

        /**
         * Forward the request as an ACK to the leader
         */
        public void processRequest(Request request) {
            QuorumPeer self = leader.self;
            if(self != null)
                leader.processAck(self.getId(), request.zxid, null);
            else
                LOG.error("Null QuorumPeer");
        }

    leader处理请求如下所示:

     /**
         * Keep a count of acks that are received by the leader for a particular
         * proposal
         *
         * @param zxid, the zxid of the proposal sent out
         * @param sid, the id of the server that sent the ack
         * @param followerAddr
         */
        synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {        
            if (!allowedToCommit) return; // last op committed was a leader change - from now on 
                                         // the new leader should commit        
            if (LOG.isTraceEnabled()) {
                LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
                for (Proposal p : outstandingProposals.values()) {
                    long packetZxid = p.packet.getZxid();
                    LOG.trace("outstanding proposal: 0x{}",
                            Long.toHexString(packetZxid));
                }
                LOG.trace("outstanding proposals all");
            }
            
            if ((zxid & 0xffffffffL) == 0) {
                /*
                 * We no longer process NEWLEADER ack with this method. However,
                 * the learner sends an ack back to the leader after it gets
                 * UPTODATE, so we just ignore the message.
                 */
                return;
            }
                
                
            if (outstandingProposals.size() == 0) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("outstanding is 0");
                }
                return;
            }
            if (lastCommitted >= zxid) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
                            Long.toHexString(lastCommitted), Long.toHexString(zxid));
                }
                // The proposal has already been committed
                return;
            }
            Proposal p = outstandingProposals.get(zxid);
            if (p == null) {
                LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
                        Long.toHexString(zxid), followerAddr);
                return;
            }
            
            p.addAck(sid);        
            /*if (LOG.isDebugEnabled()) {
                LOG.debug("Count for zxid: 0x{} is {}",
                        Long.toHexString(zxid), p.ackSet.size());
            }*/
            
            boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
    
            // If p is a reconfiguration, multiple other operations may be ready to be committed,
            // since operations wait for different sets of acks.
           // Currently we only permit one outstanding reconfiguration at a time
           // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
           // pending all wait for a quorum of old and new config, so its not possible to get enough acks
           // for an operation without getting enough acks for preceding ops. But in the future if multiple
           // concurrent reconfigs are allowed, this can happen and then we need to check whether some pending
            // ops may already have enough acks and can be committed, which is what this code does.
    
            if (hasCommitted && p.request!=null && p.request.getHdr().getType() == OpCode.reconfig){
                   long curZxid = zxid;
               while (allowedToCommit && hasCommitted && p!=null){
                   curZxid++;
                   p = outstandingProposals.get(curZxid);
                   if (p !=null) hasCommitted = tryToCommit(p, curZxid, null);             
               }
            }
        }
        

    调用实现,最终由CommitProcessor 接着处理请求:

     /**
         * @return True if committed, otherwise false.
         * @param a proposal p
         **/
        synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {       
           // make sure that ops are committed in order. With reconfigurations it is now possible
           // that different operations wait for different sets of acks, and we still want to enforce
           // that they are committed in order. Currently we only permit one outstanding reconfiguration
           // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
           // pending all wait for a quorum of old and new config, so its not possible to get enough acks
           // for an operation without getting enough acks for preceding ops. But in the future if multiple
           // concurrent reconfigs are allowed, this can happen.
           if (outstandingProposals.containsKey(zxid - 1)) return false;
           
           // getting a quorum from all necessary configurations
            if (!p.hasAllQuorums()) {
               return false;                 
            }
            
            // commit proposals in order
            if (zxid != lastCommitted+1) {    
               LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
                        + " from " + followerAddr + " not first!");
                LOG.warn("First is "
                        + (lastCommitted+1));
            }     
            
            // in order to be committed, a proposal must be accepted by a quorum              
            
            outstandingProposals.remove(zxid);
            
            if (p.request != null) {
                 toBeApplied.add(p);
            }
    
            if (p.request == null) {
                LOG.warn("Going to commmit null: " + p);
            } else if (p.request.getHdr().getType() == OpCode.reconfig) {                                   
                LOG.debug("Committing a reconfiguration! " + outstandingProposals.size()); 
                     
                //if this server is voter in new config with the same quorum address, 
                //then it will remain the leader
                //otherwise an up-to-date follower will be designated as leader. This saves
                //leader election time, unless the designated leader fails                             
                Long designatedLeader = getDesignatedLeader(p, zxid);
                //LOG.warn("designated leader is: " + designatedLeader);
    
                QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
           
                self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
           
                if (designatedLeader != self.getId()) {
                    allowedToCommit = false;
                }
                       
                // we're sending the designated leader, and if the leader is changing the followers are 
                // responsible for closing the connection - this way we are sure that at least a majority of them 
                // receive the commit message.
                commitAndActivate(zxid, designatedLeader);
                informAndActivate(p, designatedLeader);
                //turnOffFollowers();
            } else {
                commit(zxid);
                inform(p);
            }
            zk.commitProcessor.commit(p.request);
            if(pendingSyncs.containsKey(zxid)){
                for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                    sendSync(r);
                }               
            } 
            
            return  true;   
        }

    该程序第一步是发送一个请求到Quorum的所有成员

        /**
         * Create a commit packet and send it to all the members of the quorum
         *
         * @param zxid
         */
        public void commit(long zxid) {
            synchronized(this){
                lastCommitted = zxid;
            }
            QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
            sendPacket(qp);
        }

    发送报文如下:

        /**
         * send a packet to all the followers ready to follow
         *
         * @param qp
         *                the packet to be sent
         */
        void sendPacket(QuorumPacket qp) {
            synchronized (forwardingFollowers) {
                for (LearnerHandler f : forwardingFollowers) {
                    f.queuePacket(qp);
                }
            }
        }

    第二步是通知Observer

        /**
         * Create an inform packet and send it to all observers.
         * @param zxid
         * @param proposal
         */
        public void inform(Proposal proposal) {
            QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,
                                                proposal.packet.getData(), null);
            sendObserverPacket(qp);
        }

    发送observer程序如下:

        /**
         * send a packet to all observers
         */
        void sendObserverPacket(QuorumPacket qp) {
            for (LearnerHandler f : getObservingLearners()) {
                f.queuePacket(qp);
            }
        }

    第三步到

     zk.commitProcessor.commit(p.request);

    2. CommitProcessor

    CommitProcessor是多线程的,线程间通信通过queue,atomic和wait/notifyAll同步。CommitProcessor扮演一个网关角色,允许请求到剩下的处理管道。在同一瞬间,它支持多个读请求而仅支持一个写请求,这是为了保证写请求在事务中的顺序。

        1个commit处理主线程,它监控请求队列,并将请求分发到工作线程,分发过程基于sessionId,这样特定session的读写请求通常分发到同一个线程,因而可以保证运行的顺序。

      0~N个工作进程,他们在请求上运行剩下的请求处理管道。如果配置为0个工作线程,主commit线程将会直接运行管道。

      经典(默认)线程数是:在32核的机器上,一个commit处理线程和32个工作线程。

    多线程的限制:

      每个session的请求处理必须是顺序的。

      写请求处理必须按照zxid顺序。

      必须保证一个session内不会出现写条件竞争,条件竞争可能导致另外一个session的读请求触发监控。

    当前实现解决第三个限制,仅仅通过不允许在写请求时允许读进程的处理。

     @Override
        public void run() {
            Request request;
            try {
                while (!stopped) {
                    synchronized(this) {
                        while (
                            !stopped &&
                            ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) &&
                             (committedRequests.isEmpty() || isProcessingRequest()))) {
                            wait();
                        }
                    }
    
                    /*
                     * Processing queuedRequests: Process the next requests until we
                     * find one for which we need to wait for a commit. We cannot
                     * process a read request while we are processing write request.
                     */
                    while (!stopped && !isWaitingForCommit() &&
                           !isProcessingCommit() &&
                           (request = queuedRequests.poll()) != null) {
                        if (needCommit(request)) {
                            nextPending.set(request);
                        } else {
                            sendToNextProcessor(request);
                        }
                    }
    
                    /*
                     * Processing committedRequests: check and see if the commit
                     * came in for the pending request. We can only commit a
                     * request when there is no other request being processed.
                     */
                    processCommitted();
                }
            } catch (Throwable e) {
                handleException(this.getName(), e);
            }
            LOG.info("CommitProcessor exited loop!");
        }

    主逻辑程序如下:

     /*
         * Separated this method from the main run loop
         * for test purposes (ZOOKEEPER-1863)
         */
        protected void processCommitted() {
            Request request;
    
            if (!stopped && !isProcessingRequest() &&
                    (committedRequests.peek() != null)) {
    
                /*
                 * ZOOKEEPER-1863: continue only if there is no new request
                 * waiting in queuedRequests or it is waiting for a
                 * commit. 
                 */
                if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {
                    return;
                }
                request = committedRequests.poll();
    
                /*
                 * We match with nextPending so that we can move to the
                 * next request when it is committed. We also want to
                 * use nextPending because it has the cnxn member set
                 * properly.
                 */
                Request pending = nextPending.get();
                if (pending != null &&
                    pending.sessionId == request.sessionId &&
                    pending.cxid == request.cxid) {
                    // we want to send our version of the request.
                    // the pointer to the connection in the request
                    pending.setHdr(request.getHdr());
                    pending.setTxn(request.getTxn());
                    pending.zxid = request.zxid;
                    // Set currentlyCommitting so we will block until this
                    // completes. Cleared by CommitWorkRequest after
                    // nextProcessor returns.
                    currentlyCommitting.set(pending);
                    nextPending.set(null);
                    sendToNextProcessor(pending);
                } else {
                    // this request came from someone else so just
                    // send the commit packet
                    currentlyCommitting.set(request);
                    sendToNextProcessor(request);
                }
            }      
        }

    启动多线程处理程序

        /**
         * Schedule final request processing; if a worker thread pool is not being
         * used, processing is done directly by this thread.
         */
        private void sendToNextProcessor(Request request) {
            numRequestsProcessing.incrementAndGet();
            workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
        }

    真实逻辑是

     /**
         * Schedule work to be done by the thread assigned to this id. Thread
         * assignment is a single mod operation on the number of threads.  If a
         * worker thread pool is not being used, work is done directly by
         * this thread.
         */
        public void schedule(WorkRequest workRequest, long id) {
            if (stopped) {
                workRequest.cleanup();
                return;
            }
    
            ScheduledWorkRequest scheduledWorkRequest =
                new ScheduledWorkRequest(workRequest);
    
            // If we have a worker thread pool, use that; otherwise, do the work
            // directly.
            int size = workers.size();
            if (size > 0) {
                try {
                    // make sure to map negative ids as well to [0, size-1]
                    int workerNum = ((int) (id % size) + size) % size;
                    ExecutorService worker = workers.get(workerNum);
                    worker.execute(scheduledWorkRequest);
                } catch (RejectedExecutionException e) {
                    LOG.warn("ExecutorService rejected execution", e);
                    workRequest.cleanup();
                }
            } else {
                // When there is no worker thread pool, do the work directly
                // and wait for its completion
                scheduledWorkRequest.start();
                try {
                    scheduledWorkRequest.join();
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected exception", e);
                    Thread.currentThread().interrupt();
                }
            }
        }

    请求处理线程run方法:

           @Override
            public void run() {
                try {
                    // Check if stopped while request was on queue
                    if (stopped) {
                        workRequest.cleanup();
                        return;
                    }
                    workRequest.doWork();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception", e);
                    workRequest.cleanup();
                }
            }

    调用commitProcessor的doWork方法

            public void doWork() throws RequestProcessorException {
                try {
                    nextProcessor.processRequest(request);
                } finally {
                    // If this request is the commit request that was blocking
                    // the processor, clear.
                    currentlyCommitting.compareAndSet(request, null);
    
                    /*
                     * Decrement outstanding request count. The processor may be
                     * blocked at the moment because it is waiting for the pipeline
                     * to drain. In that case, wake it up if there are pending
                     * requests.
                     */
                    if (numRequestsProcessing.decrementAndGet() == 0) {
                        if (!queuedRequests.isEmpty() ||
                            !committedRequests.isEmpty()) {
                            wakeup();
                        }
                    }
                }
            }

    将请求传递给下一个RP:Leader.ToBeAppliedRequestProcessor

    3.Leader.ToBeAppliedRequestProcessor

    Leader.ToBeAppliedRequestProcessor仅仅维护一个toBeApplied列表。

     /**
             * This request processor simply maintains the toBeApplied list. For
             * this to work next must be a FinalRequestProcessor and
             * FinalRequestProcessor.processRequest MUST process the request
             * synchronously!
             *
             * @param next
             *                a reference to the FinalRequestProcessor
             */
            ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {
                if (!(next instanceof FinalRequestProcessor)) {
                    throw new RuntimeException(ToBeAppliedRequestProcessor.class
                            .getName()
                            + " must be connected to "
                            + FinalRequestProcessor.class.getName()
                            + " not "
                            + next.getClass().getName());
                }
                this.leader = leader;
                this.next = next;
            }
    
            /*
             * (non-Javadoc)
             *
             * @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request)
             */
            public void processRequest(Request request) throws RequestProcessorException {
                next.processRequest(request);
    
                // The only requests that should be on toBeApplied are write
                // requests, for which we will have a hdr. We can't simply use
                // request.zxid here because that is set on read requests to equal
                // the zxid of the last write op.
                if (request.getHdr() != null) {
                    long zxid = request.getHdr().getZxid();
                    Iterator<Proposal> iter = leader.toBeApplied.iterator();
                    if (iter.hasNext()) {
                        Proposal p = iter.next();
                        if (p.request != null && p.request.zxid == zxid) {
                            iter.remove();
                            return;
                        }
                    }
                    LOG.error("Committed request not found on toBeApplied: "
                              + request);
                }
            }

    4. FinalRequestProcessor前文已经说明,本文不在赘述。

    小结:从上面的分析可以知道,leader处理请求的顺序分别是:PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor。

    请求先通过PrepRequestProcessor接收请求,并进行包装,然后请求类型的不同,设置同享数据;主要负责通知所有follower和observer;CommitProcessor 启动多线程处理请求;Leader.ToBeAppliedRequestProcessor仅仅维护一个toBeApplied列表;

    FinalRequestProcessor来作为消息处理器的终结者,发送响应消息,并触发watcher的处理程序。

  • 相关阅读:
    WPF中C#代码触发鼠标点击事件
    PHP代码实现强制换行
    C#中判断系统的架构(32位,还是64位)
    WPF的System.Windows.Threading.DispatcherTimer的使用(每隔一定的时间重复做某事)
    LINQ to Objects系列(2)两种查询语法介绍
    LINQ to Objects系列(1)相关技术准备
    常用技术社区和网站总结
    .net项目技术选型总结
    java与.net比较学习系列(7) 属性
    java与.net比较学习系列(6) 数组
  • 原文地址:https://www.cnblogs.com/davidwang456/p/5004599.html
Copyright © 2011-2022 走看看