zoukankan      html  css  js  c++  java
  • Leader

    Leader中重要的数据结构:

      1.ConcurrentMap<Long, Proposal> outstandingProposals: 维护了待follower同步的proposal. 由  leader.propose负责插入, 由  leader.processAck负责移除.

      2.ConcurrentLinkedQueue<Proposal> toBeApplied: 已经在多数的follower上写入成功的proposal,但是尚未提交. 由 leader.ToBeAppliedRequestProcessor请求处理器负责移除;由leader.processAck负责插入; 这就是为什么新的follower连接到leader后需要将toBeApplied也要进行同步的原因,此时这个proposal已经在多数的follower上写入成功了,因此新的follower也需要和其他的follower保持一致,同步后即可接收leader的下一步提交命令.

      3.HashSet<LearnerHandler> forwardingFollowers: 和leader保持同步的follower集合

    Leader主要功能有两个:

      1.启动接收follower连接的服务并维持心跳. 处理逻辑在 leader.lead

        1).启动服务接收来自follower的连接

        2).统计epoch并日志同步

        3).启动服务接收client连接

        4).维护和follower和observer的心跳

     void lead() throws IOException, InterruptedException {
            self.end_fle = System.currentTimeMillis();
            LOG.info("LEADING - LEADER ELECTION TOOK - " +
                  (self.end_fle - self.start_fle));
            self.start_fle = 0;
            self.end_fle = 0;
    
            zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
    
            try {
                self.tick = 0;
                zk.loadData();
                
                leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
            
          //启动服务,每个follower的连接对应一个 LeanerHandler
    // Start thread that waits for connection requests from // new followers. cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start(); readyToStart = true; long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch()); zk.setZxid(ZxidUtils.makeZxid(epoch, 0)); synchronized(this){ lastProposed = zk.getZxid(); } newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null); if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) { LOG.info("NEWLEADER proposal has Zxid of " + Long.toHexString(newLeaderProposal.packet.getZxid())); }
          //阻塞直到收到1/2以上follower回复 Leader.epochAck waitForEpochAck(self.getId(), leaderStateSummary); self.setCurrentEpoch(epoch);         
            //阻塞直到大多数的follower已经完成了同步 = 接收1/2以上follower的Leader.ack
    // We have to get at least a majority of servers in sync with // us. We do this by waiting for the NEWLEADER packet to get // acknowledged try { waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT); } catch (InterruptedException e) { shutdown("Waiting for a quorum of followers, only synced with sids: [ " + getSidSetString(newLeaderProposal.ackSet) + " ]"); HashSet<Long> followerSet = new HashSet<Long>(); for (LearnerHandler f : learners) followerSet.add(f.getSid()); if (self.getQuorumVerifier().containsQuorum(followerSet)) { LOG.warn("Enough followers present. " + "Perhaps the initTicks need to be increased."); } Thread.sleep(self.tickTime); self.tick++; return; }
          //启动服务以处理来自client的连接 startZkServer();
    /** * WARNING: do not use this for anything other than QA testing * on a real cluster. Specifically to enable verification that quorum * can handle the lower 32bit roll-over issue identified in * ZOOKEEPER-1277. Without this option it would take a very long * time (on order of a month say) to see the 4 billion writes * necessary to cause the roll-over to occur. * * This field allows you to override the zxid of the server. Typically * you'll want to set it to something like 0xfffffff0 and then * start the quorum, run some operations and see the re-election. */ String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid"); if (initialZxid != null) { long zxid = Long.parseLong(initialZxid); zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid); } if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) { self.cnxnFactory.setZooKeeperServer(zk); } // Everything is a go, simply start counting the ticks // WARNING: I couldn't find any wait statement on a synchronized // block that would be notified by this notifyAll() call, so // I commented it out //synchronized (this) { // notifyAll(); //} // We ping twice a tick, so we only update the tick every other // iteration boolean tickSkip = true;     
          //维护心跳
    while (true) { Thread.sleep(self.tickTime / 2); if (!tickSkip) { self.tick++; } HashSet<Long> syncedSet = new HashSet<Long>(); // lock on the followers when we use it. syncedSet.add(self.getId()); for (LearnerHandler f : getLearners()) { // Synced set is used to check we have a supporting quorum, so only // PARTICIPANT, not OBSERVER, learners should be used if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) { syncedSet.add(f.getSid()); } f.ping(); } // check leader running status if (!this.isRunning()) { shutdown("Unexpected internal error"); return; } if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) { //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) { // Lost quorum, shutdown shutdown("Not sufficient followers synced, only synced with sids: [ " + getSidSetString(syncedSet) + " ]"); // make sure the order is the same! // the leader goes to looking return; } tickSkip = !tickSkip; } } finally { zk.unregisterJMX(this); } }

      2.和follower和observer做日志同步

        1).接收客户端的request,并将proposal发往follower.  处理逻辑位于 org.apache.zookeeper.server.quorum.Leader.propose(Request)

        

     public Proposal propose(Request request) throws XidRolloverException {
            /**
             * Address the rollover issue. All lower 32bits set indicate a new leader
             * election. Force a re-election instead. See ZOOKEEPER-1277
             */
        //消息一旦超过zxid的低32位就会触发重新选举
    if ((request.zxid & 0xffffffffL) == 0xffffffffL) { String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start"; shutdown(msg); throw new XidRolloverException(msg); }      ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); try { request.hdr.serialize(boa, "hdr"); if (request.txn != null) { request.txn.serialize(boa, "txn"); } baos.close(); } catch (IOException e) { LOG.warn("This really should be impossible", e); } QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos.toByteArray(), null); Proposal p = new Proposal(); p.packet = pp; p.request = request; synchronized (this) { if (LOG.isDebugEnabled()) { LOG.debug("Proposing:: " + request); } lastProposed = p.packet.getZxid(); outstandingProposals.put(lastProposed, p);
          //发送给所有leaner,通过调用LearnerHandler.queuePacket, leanerhandler会异步的发送给leaner sendPacket(pp); }
    return p; }

        2).提交proposal. 处理逻辑位于 org.apache.zookeeper.server.quorum.Leader.processAck(long, long, SocketAddress)

     synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
                for (Proposal p : outstandingProposals.values()) {
                    long packetZxid = p.packet.getZxid();
                    LOG.trace("outstanding proposal: 0x{}",
                            Long.toHexString(packetZxid));
                }
                LOG.trace("outstanding proposals all");
            }
          
          //忽略 follower收到Leader.UPTODATE回复的ACK
    if ((zxid & 0xffffffffL) == 0) { /* * We no longer process NEWLEADER ack by this method. However, * the learner sends ack back to the leader after it gets UPTODATE * so we just ignore the message. */ return; }   //outstandingProposals 保存了待follower同步的proposal if (outstandingProposals.size() == 0) { if (LOG.isDebugEnabled()) { LOG.debug("outstanding is 0"); } return; }
        //已经提交的就忽略,毕竟只需要超过一半follower提交就可以了
    if (lastCommitted >= zxid) { if (LOG.isDebugEnabled()) { LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid)); } // The proposal has already been committed return; } Proposal p = outstandingProposals.get(zxid); if (p == null) { LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr); return; } p.ackSet.add(sid); if (LOG.isDebugEnabled()) { LOG.debug("Count for zxid: 0x{} is {}", Long.toHexString(zxid), p.ackSet.size()); }

        //超过一半follower已经同步了这个proposal
    if (self.getQuorumVerifier().containsQuorum(p.ackSet)){ if (zxid != lastCommitted+1) { LOG.warn("Commiting zxid 0x{} from {} not first!", Long.toHexString(zxid), followerAddr); LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1)); } outstandingProposals.remove(zxid); if (p.request != null) { toBeApplied.add(p); } if (p.request == null) { LOG.warn("Going to commmit null request for proposal: {}", p); }
            //告知follower提交 commit(zxid);
            //告知observer提交 inform(p);
            //leader对提议做提交并且回复client结果 zk.commitProcessor.commit(p.request);
    if(pendingSyncs.containsKey(zxid)){ for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) { sendSync(r); } } } }

    CommitProcessor : 是一条装饰模式的执行链,初始化如下:
    @Override
        protected void setupRequestProcessors() {
            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
            RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
                    finalProcessor, getLeader().toBeApplied);
            commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                    Long.toString(getServerId()), false,
                    getZooKeeperServerListener());
            commitProcessor.start();
            ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                    commitProcessor);
            proposalProcessor.initialize();
            firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
            ((PrepRequestProcessor)firstProcessor).start();
        }

     

    
    
    ToBeAppliedProcessor:  待提交的proposal,此时proposal已经在大多数的follower已经同步过了,但是尚未提交. 提交成功后会将proposal从 leader.toBeApplied队列中移除. 
    FinalRequestProcessor : 执行链的终端,负责对proposal做持久化提交和回复client请求结果,处理逻辑如下:

     public void processRequest(Request request) {
            //处理结果
            ProcessTxnResult rc = null;
            synchronized (zks.outstandingChanges) {
                while (!zks.outstandingChanges.isEmpty()
                        && zks.outstandingChanges.get(0).zxid <= request.zxid) {
                    ChangeRecord cr = zks.outstandingChanges.remove(0);
                    ......// 持久化提议
                if (Request.isQuorum(request.type)) {
                    zks.getZKDatabase().addCommittedProposal(request);
                }
            }
    
         
    
            //client连接
            ServerCnxn cnxn = request.cnxn;
    
            String lastOp = "NA";
            zks.decInProcess();
            Code err = Code.OK;
            Record rsp = null;
            boolean closeSession = false;
            try {
                 .....
            //组装回复结果
    } } catch (SessionMovedException e) { ......
          //失败逻辑
    } long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue()); zks.serverStats().updateLatency(request.createTime); cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, System.currentTimeMillis());     
        //回复client请求结果
    try { cnxn.sendResponse(hdr, rsp, "response"); if (closeSession) { cnxn.sendCloseSession(); } } catch (IOException e) { LOG.error("FIXMSG",e); } }
     
     
  • 相关阅读:
    洛谷 P1194 飞扬的小鸟 题解
    洛谷 P1197 星球大战 题解
    洛谷 P1879 玉米田Corn Fields 题解
    洛谷 P2796 Facer的程序 题解
    洛谷 P2398 GCD SUM 题解
    洛谷 P2051 中国象棋 题解
    洛谷 P1472 奶牛家谱 Cow Pedigrees 题解
    洛谷 P1004 方格取数 题解
    洛谷 P2331 最大子矩阵 题解
    洛谷 P1073 最优贸易 题解
  • 原文地址:https://www.cnblogs.com/ironroot/p/7403787.html
Copyright © 2011-2022 走看看