zoukankan      html  css  js  c++  java
  • zookeeper(四)

    这篇文章会分析集群模式下服务器和客户端的初始化、数据同步和启动

    依旧是从zkServer.sh启动类QuorumPeerMain入手:
    初始化、启动

    main{
          main.initializeAndRun(args){...
                config.parse(args[0]);//解析zoo.cfg配置文件 如dataDir、dataLogDir等
                ...if (args.length == 1 && config.servers.size() > 0) {
                      runFromConfig(config);//集群模式
                   } else {
                      LOG.warn("Either no config or no quorum defined in config, running "
                        + " in standalone mode");
                      // there is only server in the quorum -- run as standalone
                      ZooKeeperServerMain.main(args);//单机模式
                  }
          ...}
    }
    runFromConfig{...
       try {...
          quorumPeer = getQuorumPeer();//集群每个...
          quorumPeer.initialize();
          quorumPeer.start();
          quorumPeer.join();
          ...}   
    ...}
    start{
            loadDataBase();//之前分析过从日志文件加载数据进内存
            cnxnFactory.start();        
            startLeaderElection();//领导者选举
            super.start();@1
        }
    @1 run{
          try {//Main loop
                while (running) {
                    switch (getPeerState()) {
                    case LOOKING:...
                    case FOLLOWING:
                        try {
                            LOG.info("FOLLOWING");
                            setFollower(makeFollower(logFactory));
                            follower.followLeader();
                    case LEADING: //之前已经完成了leader的选举
                        LOG.info("LEADING");
                        try {
                            setLeader(makeLeader(logFactory));
                            leader.lead();
                            setLeader(null);
    }
    
    //leader部分代码
    leader.lead{...
          try {
                self.tick.set(0);
                zk.loadData(); 
                cnxAcceptor = new LearnerCnxAcceptor();
                cnxAcceptor.start();// cnxAcceptor:the follower acceptor thread @2
                
                ...}
    ...}
    @2 run{...
          try{
                Socket s = ss.accept();
                LearnerHandler fh = new LearnerHandler(s, is, Leader.this);//每一个socket启用一个线程处理
                fh.start(); @3
    ...}
    @3 run{...
          QuorumPacket qp = new QuorumPacket();
          ia.readRecord(qp, "packet");//读取follow数据 qp ...
          byte learnerInfoData[] = qp.getData();
          if (learnerInfoData != null) {...
          long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());... //获取最新事务id 来自follow
          long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); //this.getSid(),来自follow的server.pid 获取代号,如第几届...@4
          //获取最新的epoch后告诉follow用新的
          QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
          oa.writeRecord(newEpochPacket, "packet");//发送socket
          bufferedOutput.flush();
          QuorumPacket ackEpochPacket = new QuorumPacket();
          ia.readRecord(ackEpochPacket, "packet");
          leader.waitForEpochAck(this.getSid(), ss);...//等待follow发送ack... @5
          int packetToSend = Leader.SNAP;
          long zxidToSend = 0;
          long leaderLastZxid = 0;
          LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();...
          queuePacket(propose.packet); //committedLog就是用来保存议案的列表 把需要同步的提议加入queuedPackets这个队列
          QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),null, null);
          queuePacket(qcommit);...
          //if we are not truncating or sending a diff just send a snapshot 根据follower的最新事务id找到需要同步的数据 如果只是同步快照数据
          if (packetToSend == Leader.SNAP) {
                leader.zk.getZKDatabase().serializeSnapshot(oa);
                oa.writeString("BenWasHere", "signature");
          }
          // Start sending packets 单独的线程去发送diff或者truncat数据
                new Thread() {
                    public void run() {
                        Thread.currentThread().setName(
                                "Sender-" + sock.getRemoteSocketAddress());
                        try {
                            sendPackets{...
                                  oa.writeRecord(p, "packet");                        
                            ...}
                        } catch (InterruptedException e) {
                            LOG.warn("Unexpected interruption",e);
                        }
                    }
                }.start();...
          @9  while (true) {
                    qp = new QuorumPacket();
                     case Leader.ACK:
                        if (this.learnerType == LearnerType.OBSERVER) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Received ACK from Observer  " + this.sid);
                            }
                        }
                        syncLimitCheck.updateAck(qp.getZxid());
                        leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); @10
                        break;...
    ...}
    @10 processAck{...
          p.ackSet.add(sid);...
          if (self.getQuorumVerifier().containsQuorum(p.ackSet)){   //过半机制
                commit{
                      QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null); //发送commit给follower 处理更新内存
                  sendPacket(qp);
          }
          inform(p); //通知观察者同理
    }
    @4 getEpochToPropose{...
           if (lastAcceptedEpoch >= epoch) {
                    epoch = lastAcceptedEpoch+1;
           }
          if (isParticipant(sid)) {// 从follow中挑选不包括obverse
                    connectingFollowers.add(sid);
           }
          QuorumVerifier verifier = self.getQuorumVerifier();
                if (connectingFollowers.contains(self.getId()) && 
                    verifier.containsQuorum(connectingFollowers)) {//选取较大的epoch作为新的 containsQuorum过半机制 在满足条件前会进入else wait()
                    waitingForNewEpoch = false;
                    self.setAcceptedEpoch(epoch);
                    connectingFollowers.notifyAll();// 唤醒所有的
                } else {
                    long start = Time.currentElapsedTime();
                    long cur = start;
                    long end = start + self.getInitLimit()*self.getTickTime();
                    while(waitingForNewEpoch && cur < end) {
                        connectingFollowers.wait(end - cur); 
    ...}
    
    @6 queuePacket{...
    
    ...}
    
    //follower 部分代码
    follower.followLeader{...
          QuorumServer leaderServer = findLeader();            
          try {
               connectToLeader(leaderServer.addr, leaderServer.hostname);
               long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);...
                syncWithLeader(newEpochZxid);
    ...}
    connectToLeader{...// 连接leader
    sock = new Socket();        
            sock.setSoTimeout(self.tickTime * self.initLimit);
            for (int tries = 0; tries < 5; tries++) {
                try {
                    sock.connect(addr, self.tickTime * self.syncLimit);
                    sock.setTcpNoDelay(nodelay);
    ...}
    registerWithLeader{....
    QuorumPacket qp = new QuorumPacket();                
            qp.setType(pktType);
            qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));...
            qp.setData(bsid.toByteArray());
            writePacket(qp, true);//数据写
            readPacket(qp); //正好读到leader发送消息包括epoch
            final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
    	if (qp.getType() == Leader.LEADERINFO) {...
            QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
            writePacket(ackNewEpoch, true); //返回ack信息 @5
            return ZxidUtils.makeZxid(newEpoch, 0);
    ...}
    syncWithLeader{...//Finally, synchronize our history with the Leader. 同步主节点数据 服务器初始化阶段
            readPacket(qp);
            LinkedList<Long> packetsCommitted = new LinkedList<Long>();
            LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
            synchronized (zk) {
                if (qp.getType() == Leader.DIFF) {
                    LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
                    snapshotNeeded = false;
                }
                else if (qp.getType() == Leader.SNAP) { //先清空后反序列化数据到datatree 
                    // The leader is going to dump the database clear our own database and read
                    zk.getZKDatabase().clear();
                    zk.getZKDatabase().deserializeSnapshot(leaderIs);
                    String signature = leaderIs.readString("signature");
                    if (!signature.equals("BenWasHere")) {
                        LOG.error("Missing signature. Got " + signature);
                        throw new IOException("Missing signature");                   
                    }
                    zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
                } else if (qp.getType() == Leader.TRUNC) {...
                while (self.isRunning()) { //当读到leader的UPTODATE会停止循环
                    readPacket(qp);
                    switch(qp.getType()) {
                    case Leader.PROPOSAL:...
                    case Leader.COMMIT:
                        if (!writeToTxnLog) {
                            pif = packetsNotCommitted.peekFirst();
                            if (pif.hdr.getZxid() != qp.getZxid()) {
                                LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                            } else {
                                zk.processTxn(pif.hdr, pif.rec);
                                packetsNotCommitted.remove();
                            }
                        } else {
                            packetsCommitted.add(qp.getZxid());
                        }
                        break;...
            // We need to log the stuff that came in between the snapshot and the uptodate
            if (zk instanceof FollowerZooKeeperServer) {
                FollowerZooKeeperServer fzk = (FollowerZooKeeperServer)zk;
                for(PacketInFlight p: packetsNotCommitted) {
                    fzk.logRequest(p.hdr, p.rec);
                }
                for(Long zxid: packetsCommitted) {
                    fzk.commit(zxid); //数据同步完成
                }
    ...}
    

    follower处理请求
    org.apache.zookeeper.server.quorum.Learner#syncWithLeader

    followLeader{...
          syncWithLeader(newEpochZxid);                
           QuorumPacket qp = new QuorumPacket();
           while (this.isRunning()) {
               readPacket(qp);
               processPacket(qp);@7
            }
    ...}
    
    syncWithLeader{...
          zk.startup();
    ...}
    startup{
            if (sessionTracker == null) {
                createSessionTracker();
            }
            startSessionTracker();
            setupRequestProcessors();//follower 与 leader的实现类不同,有不同的处理链
            registerJMX();
            setState(State.RUNNING);
            notifyAll();
        }
    
    @7 processPacket{...
          switch (qp.getType()) {
            case Leader.PING:            
                ping(qp);            
                break;
            case Leader.PROPOSAL:            
                TxnHeader hdr = new TxnHeader();
                Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
                fzk.logRequest(hdr, txn){...
                      syncProcessor.processRequest(request);//持久化并且返回ack @8 syncProcessor初始化的下一个处理器SendAckRequestProcessor(发送一个ack)leader learnHandle中接受处理ack @9
                ...}
    ...}
    @8 setupRequestProcessors() {
            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
            commitProcessor = new CommitProcessor(finalProcessor,
                    Long.toString(getServerId()), true,
                    getZooKeeperServerListener());
            commitProcessor.start();
            firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
            ((FollowerRequestProcessor) firstProcessor).start();
            syncProcessor = new SyncRequestProcessor(this,
                    new SendAckRequestProcessor((Learner)getFollower()));
            syncProcessor.start();
        }
    FollowerRequestProcessor->CommitProcessor->FinalRequestProcessor
    SendAckRequestProcessor.start()//持久txn快照
    

    leader处理请求
    org.apache.zookeeper.server.quorum.Leader#lead

    lead{...
          startZkServer{
          zk.startup()}
    ...}
    PrepRequestProcessor(check ACL构造txn)->ProposalRequestProcessor(主要是投票的)->CommitProcessor(提交的)->ToBeAppliedRequestProcessor->FinalRequestProcessor(更新内存返回response)
    
    public ProposalRequestProcessor(LeaderZooKeeperServer zks,
                RequestProcessor nextProcessor) {
            this.zks = zks;
            this.nextProcessor = nextProcessor;
            AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
            syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
    }
    
    ProposalRequestProcessor.processRequest{...
    nextProcessor.processRequest(request); //下一个CommitProcessor
    zks.getLeader().propose(request); //发起提议 @7
    syncProcessor.processRequest(request); //持久化
    ...}
    CommitProcessor.run{...
    if ((queuedRequests.size() == 0 || nextPending != null)
                                && committedRequests.size() == 0) {
                            wait(); //等待提议
    ...}
    
  • 相关阅读:
    [转载]ASP.NET实现数字和字符相混合的验证码
    [分享]软件开发全套规范
    [转载]混沌理论简介
    [原创]利用WM_COPYDATA实现进程间通信
    [转载]I like the subtle...
    [原创]DES算法的介绍以及实现(含上次DES程序1.0的源码)
    [转载]高校自动排课系统的实践
    [公告]对DES算法源码的bug道歉
    [转载]基于混沌理论的资本投资研究
    使用 异步多线程TCP Socket 实现进程间通信(VC 6.0 , BCB6.0调试通过)
  • 原文地址:https://www.cnblogs.com/leifonlyone/p/12840180.html
Copyright © 2011-2022 走看看