zk 集群中有3种节点:leader,follower,observer,其中 observer 节点没有投票权,即它不参与选举和写请求的投票。
比较 Follower 和 Observer 的代码:
//void org.apache.zookeeper.server.quorum.Observer.processPacket(QuorumPacket qp) throws IOException protected void processPacket(QuorumPacket qp) throws IOException{ switch (qp.getType()) { case Leader.PING: ping(qp); break; case Leader.PROPOSAL: LOG.warn("Ignoring proposal"); break; case Leader.COMMIT: LOG.warn("Ignoring commit"); break; case Leader.UPTODATE: LOG.error("Received an UPTODATE message after Observer started"); break; case Leader.REVALIDATE: revalidate(qp); break; case Leader.SYNC: ((ObserverZooKeeperServer)zk).sync(); break; case Leader.INFORM: TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); Request request = new Request (null, hdr.getClientId(), hdr.getCxid(), hdr.getType(), null, null); request.txn = txn; request.hdr = hdr; ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk; obs.commitRequest(request); break; } }
// void org.apache.zookeeper.server.quorum.Follower.processPacket(QuorumPacket qp) throws IOException protected void processPacket(QuorumPacket qp) throws IOException{ switch (qp.getType()) { case Leader.PING: ping(qp); break; case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); if (hdr.getZxid() != lastQueued + 1) { LOG.warn("Got zxid 0x" + Long.toHexString(hdr.getZxid()) + " expected 0x" + Long.toHexString(lastQueued + 1)); } lastQueued = hdr.getZxid(); fzk.logRequest(hdr, txn); break; case Leader.COMMIT: fzk.commit(qp.getZxid()); break; case Leader.UPTODATE: LOG.error("Received an UPTODATE message after Follower started"); break; case Leader.REVALIDATE: revalidate(qp); break; case Leader.SYNC: fzk.sync(); break; } }
可以看到,observer 不处理 leader 的 proposal 和 commit,但是它会接收 leader 的 inform,所以它是能听到 write 的。
// void org.apache.zookeeper.server.quorum.Leader.processAck(long sid, long zxid, SocketAddress followerAddr) // 代码片段 commit(zxid); // 向 followers 发送 COMMIT inform(p); // 向 observers 发送 INFORM zk.commitProcessor.commit(p.request); // leader 提交