zoukankan      html  css  js  c++  java
  • LeanerHandler

    LeanerHandler是Leader接收Follower和Observer连接请求一个线程,当leader election结束后,follower或observer会主动连接leader,并向leader发送自己的 epoch 和 zxid,以供leader选出最大的epoch.

    zookeeper默认选用org.apache.zookeeper.server.quorum.FastLeaderElection,因此选出来的leader一定是日志最新的那个follower. leader需要接收集群1/2以上的follower的信息才可以统计出leader的epoch = 接收到的最大的epoch+1 = leader的epoch + 1,加入收到的follower的epoch比leader还大,则说明选举失败了,leader会退出重新选举.

    leaner处理逻辑主要如下:

      1.follower退出election后首先会连接leader,并向leader发送自己的epoch和zxid,实现是发送一个Leader.OBSERVERINFO的packet.

      2.leader统计出最大epoch后会告知follower自己的epoch和zxid,实现是向follower和observer发送一个类型为Leader.LEADERINFO的packet,并阻塞等待知道接收1/2以上follower的Leader.ACKEPOCH信息.

      3.leader接着会和follower进行diff,小于leader日志的会进行补齐,多余leader日志的会进行截取以和leader同步,相等的则不做操作. 接着 leader会把 toBeApplied队列的Proposal发送给follower进行同步,同步完成后才会加入leader的 forwardingFollowers列表.

      4.leader接着会向follower发送类型为Leader.NEWLEADER的packet告知最新的epoch和zxid,并阻塞等待收到1/2以上follower的Leader.ACK消息.

      5.leader接着会向foloower发送类型为Leader.UPTODATE的packet告知follower和observer可以接收来自client的连接了.

      6.LeanerHandler会循环处理来自follower和observer的消息,leader通过调用 LeanerHandler.queuePacket 来将client的请求发送给follower和observer.

    public void run() {
            try {
          //心跳超时时间,leader用来判断是否断开连接的截止时间 tickOfNextAckDeadline
    = leader.self.tick + leader.self.initLimit + leader.self.syncLimit;       
          //打开输入 输出流
    ia
    = BinaryInputArchive.getArchive(new BufferedInputStream(sock .getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); oa = BinaryOutputArchive.getArchive(bufferedOutput);
          //接收来自follower或observer的消息,消息包含了他们的epoch和zxid QuorumPacket qp
    = new QuorumPacket(); ia.readRecord(qp, "packet"); if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){ LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!"); return; } byte learnerInfoData[] = qp.getData(); if (learnerInfoData != null) { if (learnerInfoData.length == 8) { ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData); this.sid = bbsid.getLong(); } else { LearnerInfo li = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li); this.sid = li.getServerid(); this.version = li.getProtocolVersion(); } } else { this.sid = leader.followerCounter.getAndDecrement(); } LOG.info("Follower sid: " + sid + " : info : " + leader.self.quorumPeers.get(sid)); if (qp.getType() == Leader.OBSERVERINFO) { learnerType = LearnerType.OBSERVER; } long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long peerLastZxid; StateSummary ss = null; long zxid = qp.getZxid();
          
          //由leader统计出最大的epoch, leader.getEpochToPropose会阻塞直到收到1/2以上的Leader.FOLLOWERINFO消息
    long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
          //新版zookeeper的leader告知follower自己最新的epoch,通过发送Leader.LEADERINFO消息
    if (this.getVersion() < 0x10000) { // we are going to have to extrapolate the epoch information long epoch = ZxidUtils.getEpochFromZxid(zxid); ss = new StateSummary(epoch, zxid); // fake the message leader.waitForEpochAck(this.getSid(), ss); } else { byte ver[] = new byte[4]; ByteBuffer.wrap(ver).putInt(0x10000); QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null); oa.writeRecord(newEpochPacket, "packet"); bufferedOutput.flush(); QuorumPacket ackEpochPacket = new QuorumPacket(); ia.readRecord(ackEpochPacket, "packet"); if (ackEpochPacket.getType() != Leader.ACKEPOCH) { LOG.error(ackEpochPacket.toString() + " is not ACKEPOCH"); return; } ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData()); ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid()); leader.waitForEpochAck(this.getSid(), ss); } peerLastZxid = ss.getLastZxid(); /* the default to send to the follower */ int packetToSend = Leader.SNAP; long zxidToSend = 0; long leaderLastZxid = 0; /** the packets that the follower needs to get updates from **/ long updates = peerLastZxid;
           //
    /* we are sending the diff check if we have proposals in memory to be able to * send a diff to the */ ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock(); ReadLock rl = lock.readLock(); try { rl.lock(); final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog(); final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog(); LOG.info("Synchronizing with Follower sid: " + sid +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog) +" minCommittedLog=0x"+Long.toHexString(minCommittedLog) +" peerLastZxid=0x"+Long.toHexString(peerLastZxid)); LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();          if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) { // Follower is already sync with us, send empty diff LOG.info("leader and follower are in sync, zxid=0x{}", Long.toHexString(peerLastZxid)); packetToSend = Leader.DIFF; zxidToSend = peerLastZxid; } else if (proposals.size() != 0) { LOG.debug("proposal size is {}", proposals.size()); if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) { LOG.debug("Sending proposals to follower"); // as we look through proposals, this variable keeps track of previous // proposal Id. long prevProposalZxid = minCommittedLog; // Keep track of whether we are about to send the first packet. // Before sending the first packet, we have to tell the learner // whether to expect a trunc or a diff boolean firstPacket=true; // If we are here, we can use committedLog to sync with // follower. Then we only need to decide whether to // send trunc or not packetToSend = Leader.DIFF; zxidToSend = maxCommittedLog; for (Proposal propose: proposals) { // skip the proposals the peer already has if (propose.packet.getZxid() <= peerLastZxid) { prevProposalZxid = propose.packet.getZxid(); continue; } else { // If we are sending the first packet, figure out whether to trunc // in case the follower has some proposals that the leader doesn't if (firstPacket) { firstPacket = false; // Does the peer have some proposals that the leader hasn't seen yet if (prevProposalZxid < peerLastZxid) { // send a trunc message before sending the diff packetToSend = Leader.TRUNC; zxidToSend = prevProposalZxid; updates = zxidToSend; } } queuePacket(propose.packet); QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(), null, null); queuePacket(qcommit); } } } else if (peerLastZxid > maxCommittedLog) { LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}", Long.toHexString(maxCommittedLog), Long.toHexString(updates)); packetToSend = Leader.TRUNC; zxidToSend = maxCommittedLog; updates = zxidToSend; } else { LOG.warn("Unhandled proposal scenario"); } } else { // just let the state transfer happen LOG.debug("proposals is empty"); }

            //和leader内存中的propsal做同步,并加入leader的同步follower列表 LOG.info(
    "Sending " + Leader.getPacketType(packetToSend)); leaderLastZxid = leader.startForwarding(this, updates); } finally { rl.unlock(); }         
            //leader告知follower自己新的epoch和zxid
            //leader.waitForNewLeaderAck 将阻塞直到收到1/2以上follower的Leader.ACK
    QuorumPacket newLeaderQP
    = new QuorumPacket(Leader.NEWLEADER, ZxidUtils.makeZxid(newEpoch, 0), null, null); if (getVersion() < 0x10000) { oa.writeRecord(newLeaderQP, "packet"); } else { queuedPackets.add(newLeaderQP); } bufferedOutput.flush(); //Need to set the zxidToSend to the latest zxid if (packetToSend == Leader.SNAP) { zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); } oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet"); bufferedOutput.flush(); /* if we are not truncating or sending a diff just send a snapshot */ if (packetToSend == Leader.SNAP) { LOG.info("Sending snapshot last zxid of peer is 0x" + Long.toHexString(peerLastZxid) + " " + " zxid of leader is 0x" + Long.toHexString(leaderLastZxid) + "sent zxid of db as 0x" + Long.toHexString(zxidToSend)); // Dump data to peer leader.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); } bufferedOutput.flush(); // Start sending packets new Thread() { public void run() { Thread.currentThread().setName( "Sender-" + sock.getRemoteSocketAddress()); try { sendPackets(); } catch (InterruptedException e) { LOG.warn("Unexpected interruption",e); } } }.start(); /* * Have to wait for the first ACK, wait until * the leader is ready, and only then we can * start processing messages. */ qp = new QuorumPacket(); ia.readRecord(qp, "packet"); if(qp.getType() != Leader.ACK){ LOG.error("Next packet was supposed to be an ACK"); return; } LOG.info("Received NEWLEADER-ACK message from " + getSid()); leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType()); syncLimitCheck.start(); // now that the ack has been processed expect the syncLimit sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit); /* * Wait until leader starts up */ synchronized(leader.zk){ while(!leader.zk.isRunning() && !this.isInterrupted()){ leader.zk.wait(20); } } // Mutation packets will be queued during the serialize, // so we need to mark when the peer can actually start // using the data //
          //告知follower可以接收来自客户端的连接了
    queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); while (true) { qp = new QuorumPacket(); ia.readRecord(qp, "packet"); long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; if (qp.getType() == Leader.PING) { traceMask = ZooTrace.SERVER_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp); } tickOfNextAckDeadline = leader.self.tick + leader.self.syncLimit; ByteBuffer bb; long sessionId; int cxid; int type; switch (qp.getType()) { case Leader.ACK: if (this.learnerType == LearnerType.OBSERVER) { if (LOG.isDebugEnabled()) { LOG.debug("Received ACK from Observer " + this.sid); } } syncLimitCheck.updateAck(qp.getZxid());

                //leader.processAck将会阻塞直到收到1/2以上的Leader.ACK才会提交proposal leader.processAck(
    this.sid, qp.getZxid(), sock.getLocalSocketAddress()); break; case Leader.PING: // Process the touches ByteArrayInputStream bis = new ByteArrayInputStream(qp .getData()); DataInputStream dis = new DataInputStream(bis); while (dis.available() > 0) { long sess = dis.readLong(); int to = dis.readInt(); leader.zk.touch(sess, to); } break; case Leader.REVALIDATE: bis = new ByteArrayInputStream(qp.getData()); dis = new DataInputStream(bis); long id = dis.readLong(); int to = dis.readInt(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); dos.writeLong(id); boolean valid = leader.zk.touch(id, to); if (valid) { try { //set the session owner // as the follower that // owns the session leader.zk.setOwner(id, this); } catch (SessionExpiredException e) { LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e); } } if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "Session 0x" + Long.toHexString(id) + " is valid: "+ valid); } dos.writeBoolean(valid); qp.setData(bos.toByteArray()); queuedPackets.add(qp); break; case Leader.REQUEST: bb = ByteBuffer.wrap(qp.getData()); sessionId = bb.getLong(); cxid = bb.getInt(); type = bb.getInt(); bb = bb.slice(); Request si; if(type == OpCode.sync){ si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo()); } else { si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo()); } si.setOwner(this); leader.zk.submitRequest(si); break; default: LOG.warn("unexpected quorum packet, type: {}", packetToString(qp)); break; } } } catch (IOException e) { if (sock != null && !sock.isClosed()) { LOG.error("Unexpected exception causing shutdown while sock " + "still open", e); //close the socket to make sure the //other side can see it being close try { sock.close(); } catch(IOException ie) { // do nothing } } } catch (InterruptedException e) { LOG.error("Unexpected exception causing shutdown", e); } finally { LOG.warn("******* GOODBYE " + (sock != null ? sock.getRemoteSocketAddress() : "<null>") + " ********"); shutdown(); } }
  • 相关阅读:
    ClickOnce發布經驗
    reporting Server組件不全引起的致命錯誤
    異步調用
    Usercontrol Hosted in IE
    MATLAB命令大全(转载)
    一种保护眼睛的好方法
    关于oracle自动编号
    An Algorithm Summary of Programming Collective Intelligence (1)
    An Algorithm Summary of Programming Collective Intelligence (3)
    An Algorithm Summary of Programming Collective Intelligence (4)
  • 原文地址:https://www.cnblogs.com/ironroot/p/7403270.html
Copyright © 2011-2022 走看看