ZooKeeper 作为优秀的分布系统协调组件,值得一探究竟。它的启动类主要为:
1. 单机版的zk 使用 ZooKeeperServerMain
2. 集群版的zk 使用 QuorumPeerMain
与用户端各服务端之间存在着各种通信!当然主要分为三个:
1. 客户端与zk的通信;
2. 各zk服务端间的通信1, 操作投票通信;
3. 各zk服务端间的通信2, Leader选举通信;
那么,他们之间都是怎样建立通信机制的呢?让我们从本文一起看看吧!(本文将会倒序的形式展现整个过程)
集群版ZooKeeper的配置可能是这样的:
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/tmp/zk3/data dataLogDir=/tmp/zk3/log # 客户端连接端口 clientPort=2183 # 集群配置, host:操作内部通信端口:选举端口 server.1=localhost:2888:3888 server.2=localhost:2899:3899 server.3=localhost:2877:3877
一、客户端连接任务监听连接的建立
一个死循环一直轮询socket, 进行 select().
// org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread, 监听客户端的请求 public void run() { try { while (!stopped && !acceptSocket.socket().isClosed()) { try { select(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } } finally { closeSelector(); // This will wake up the selector threads, and tell the // worker thread pool to begin shutdown. if (!reconfiguring) { NIOServerCnxnFactory.this.stop(); } LOG.info("accept thread exitted run method"); } } private void select() { try { selector.select(); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { if (!doAccept()) { // If unable to pull a new connection off the accept // queue, pause accepting to give us time to free // up file descriptors and so the accept thread // doesn't spin in a tight loop. pauseAccept(10); } } else { LOG.warn("Unexpected ops in accept select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } // 上面的客户端监听,是由 configure() 时触发的。 // 而此处的的 addr 则是由解析配置文件的 clientPort 时获取的 // org.apache.zookeeper.server.NIOServerCnxnFactory @Override public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException { if (secure) { throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn"); } configureSaslLogin(); maxClientCnxns = maxcc; sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000); // We also use the sessionlessCnxnTimeout as expiring interval for // cnxnExpiryQueue. These don't need to be the same, but the expiring // interval passed into the ExpiryQueue() constructor below should be // less than or equal to the timeout. cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout); expirerThread = new ConnectionExpirerThread(); int numCores = Runtime.getRuntime().availableProcessors(); // 32 cores sweet spot seems to be 4 selector threads numSelectorThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores / 2), 1)); if (numSelectorThreads < 1) { throw new IOException("numSelectorThreads must be at least 1"); } numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores); workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000); String logMsg = "Configuring NIO connection handler with " + (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, " + numSelectorThreads + " selector thread(s), " + (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and " + (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers.")); LOG.info(logMsg); for (int i = 0; i < numSelectorThreads; ++i) { selectorThreads.add(new SelectorThread(i)); } listenBacklog = backlog; // 直接使用 java nio 创建端口监听 this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port {}", addr); if (listenBacklog == -1) { ss.socket().bind(addr); } else { ss.socket().bind(addr, listenBacklog); } ss.configureBlocking(false); acceptThread = new AcceptThread(ss, addr, selectorThreads); } // org.apache.zookeeper.server.quorum.QuorumPeerMain#runFromConfig public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); MetricsProvider metricsProvider; try { metricsProvider = MetricsProviderBootstrap.startMetricsProvider( config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration()); } catch (MetricsProviderLifeCycleException error) { throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error); } try { ServerMetrics.metricsProviderInitialized(metricsProvider); ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; // 1. 创建客户端连接监听 if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false); } // 1.1. 另一个客户端的安全监听 if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true); } // 创建操作投票线程 quorumPeer = getQuorumPeer(); quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); // 设置选举算法,为后面创建选举监听打下基础 quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit()); quorumPeer.setObserverMasterPort(config.getObserverMasterPort()); quorumPeer.setConfigFileName(config.getConfigFilename()); quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); // 设置通信端口,选举端口信息 quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); if (config.getLastSeenQuorumVerifier() != null) { quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); } quorumPeer.initConfigInZKDatabase(); // 此处的连接是由上面客户端创建的 quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setSecureCnxnFactory(secureCnxnFactory); quorumPeer.setSslQuorum(config.isSslQuorum()); quorumPeer.setUsePortUnification(config.shouldUsePortUnification()); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); if (config.sslQuorumReloadCertFiles) { quorumPeer.getX509Util().enableCertFileReloading(); } // sets quorum sasl authentication configurations quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if (quorumPeer.isQuorumSaslAuthEnabled()) { quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); // 创建 auth server & learner quorumPeer.initialize(); if (config.jvmPauseMonitorToRun) { quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config)); } quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } finally { if (metricsProvider != null) { try { metricsProvider.stop(); } catch (Throwable error) { LOG.warn("Error while stopping metrics", error); } } } } // org.apache.zookeeper.server.quorum.QuorumPeerMain#initializeAndRun protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager( config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); } }
总结来说就是,在启动时使用 NIOServerCnxnFactory NIO 基于配置文件创建 ServerSocket, 监听端口的请求, 具体请求进来后的处理逻辑后续再说!
二、选举端口连接的建立
// 内部通信线程,由 Listener 进行处理 // org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener#run /** * Sleeps on accept(). */ @Override public void run() { int numRetries = 0; InetSocketAddress addr; Socket client = null; Exception exitException = null; while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) { try { // ss 代表内部通信的 socket 实例 // 它会延迟到本线程时再进行创建 if (self.shouldUsePortUnification()) { LOG.info("Creating TLS-enabled quorum server socket"); ss = new UnifiedServerSocket(self.getX509Util(), true); } else if (self.isSslQuorum()) { LOG.info("Creating TLS-only quorum server socket"); ss = new UnifiedServerSocket(self.getX509Util(), false); } else { // 默认使用 ServerSocket 通信 ss = new ServerSocket(); } ss.setReuseAddress(true); if (self.getQuorumListenOnAllIPs()) { int port = self.getElectionAddress().getPort(); addr = new InetSocketAddress(port); } else { // Resolve hostname for this server in case the // underlying ip address has changed. self.recreateSocketAddresses(self.getId()); addr = self.getElectionAddress(); } LOG.info("My election bind port: {}", addr.toString()); setName(addr.toString()); // 绑定端口后,就可以进行选举通信了 ss.bind(addr); while (!shutdown) { try { // 阻塞等待其他节点发来请求 client = ss.accept(); setSockOpts(client); LOG.info("Received connection request {}", formatInetAddr((InetSocketAddress) client.getRemoteSocketAddress())); // Receive and handle the connection request // asynchronously if the quorum sasl authentication is // enabled. This is required because sasl server // authentication process may take few seconds to finish, // this may delay next peer connection requests. if (quorumSaslAuthEnabled) { receiveConnectionAsync(client); } else { receiveConnection(client); } numRetries = 0; } catch (SocketTimeoutException e) { LOG.warn( "The socket is listening for the election accepted " + "and it timed out unexpectedly, but will retry." + "see ZOOKEEPER-2836"); } } } catch (IOException e) { if (shutdown) { break; } LOG.error("Exception while listening", e); exitException = e; numRetries++; try { ss.close(); Thread.sleep(1000); } catch (IOException ie) { LOG.error("Error closing server socket", ie); } catch (InterruptedException ie) { LOG.error("Interrupted while sleeping. Ignoring exception", ie); } closeSocket(client); } } LOG.info("Leaving listener"); if (!shutdown) { LOG.error( "As I'm leaving the listener thread after {} errors. " + "I won't be able to participate in leader election any longer: {}." + "Use {} property to increase retry count.", numRetries, formatInetAddr(self.getElectionAddress()), ELECTION_PORT_BIND_RETRY); if (exitException instanceof SocketException) { // After leaving listener thread, the host cannot join the // quorum anymore, this is a severe error that we cannot // recover from, so we need to exit socketBindErrorHandler.run(); } } else if (ss != null) { // Clean up for shutdown. try { ss.close(); } catch (IOException ie) { // Don't log an error for shutdown. LOG.debug("Error closing server socket", ie); } } } // org.apache.zookeeper.server.quorum.QuorumPeer#start // 开启内部通信线程 @Override public synchronized void start() { if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } loadDataBase(); // 开启端口监听 startServerCnxnFactory(); try { // 管理线程开启 adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e); } // 选举线程开启 startLeaderElection(); // 开启jvm-gc的监控,仅用sleep进行日志打印处理 startJvmPauseMonitor(); super.start(); } // 内部通信线程的建立,主要有 WorkerReceiver, WorkerSender, 线程间通过队列通信 // org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerReceiver // 选举任务消息接收线程 public void run() { Message response; while (!stop) { // Sleeps on receive try { response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); if (response == null) { continue; } // The current protocol and two previous generations all send at least 28 bytes if (response.buffer.capacity() < 28) { LOG.error("Got a short response: {}", response.buffer.capacity()); continue; } // this is the backwardCompatibility mode in place before ZK-107 // It is for a version of the protocol in which we didn't send peer epoch // With peer epoch and version the message became 40 bytes boolean backCompatibility28 = (response.buffer.capacity() == 28); // this is the backwardCompatibility mode for no version information boolean backCompatibility40 = (response.buffer.capacity() == 40); response.buffer.clear(); // Instantiate Notification and set its attributes Notification n = new Notification(); int rstate = response.buffer.getInt(); long rleader = response.buffer.getLong(); long rzxid = response.buffer.getLong(); long relectionEpoch = response.buffer.getLong(); long rpeerepoch; int version = 0x0; if (!backCompatibility28) { rpeerepoch = response.buffer.getLong(); if (!backCompatibility40) { /* * Version added in 3.4.6 */ version = response.buffer.getInt(); } else { LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid); } } else { LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid); rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid); } QuorumVerifier rqv = null; // check if we have a version that includes config. If so extract config info from message. if (version > 0x1) { int configLength = response.buffer.getInt(); byte[] b = new byte[configLength]; response.buffer.get(b); synchronized (self) { try { rqv = self.configFromString(new String(b)); QuorumVerifier curQV = self.getQuorumVerifier(); if (rqv.getVersion() > curQV.getVersion()) { LOG.info("{} Received version: {} my version: {}", self.getId(), Long.toHexString(rqv.getVersion()), Long.toHexString(self.getQuorumVerifier().getVersion())); if (self.getPeerState() == ServerState.LOOKING) { LOG.debug("Invoking processReconfig(), state: {}", self.getServerState()); self.processReconfig(rqv, null, null, false); if (!rqv.equals(curQV)) { LOG.info("restarting leader election"); self.shuttingDownLE = true; self.getElectionAlg().shutdown(); break; } } else { LOG.debug("Skip processReconfig(), state: {}", self.getServerState()); } } } catch (IOException e) { LOG.error("Something went wrong while processing config received from {}", response.sid); } catch (ConfigException e) { LOG.error("Something went wrong while processing config received from {}", response.sid); } } } else { LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid); } /* * If it is from a non-voting server (such as an observer or * a non-voting follower), respond right away. */ if (!validVoter(response.sid)) { Vote current = self.getCurrentVote(); QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend( ToSend.mType.notification, current.getId(), current.getZxid(), logicalclock.get(), self.getPeerState(), response.sid, current.getPeerEpoch(), qv.toString().getBytes()); sendqueue.offer(notmsg); } else { // Receive new message LOG.debug("Receive new notification message. My id = {}", self.getId()); // State of peer that sent this message QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING; switch (rstate) { case 0: ackstate = QuorumPeer.ServerState.LOOKING; break; case 1: ackstate = QuorumPeer.ServerState.FOLLOWING; break; case 2: ackstate = QuorumPeer.ServerState.LEADING; break; case 3: ackstate = QuorumPeer.ServerState.OBSERVING; break; default: continue; } n.leader = rleader; n.zxid = rzxid; n.electionEpoch = relectionEpoch; n.state = ackstate; n.sid = response.sid; n.peerEpoch = rpeerepoch; n.version = version; n.qv = rqv; /* * Print notification info */ LOG.info( "Notification: my state:{}; n.sid:{}, n.state:{}, n.leader:{}, n.round:0x{}, " + "n.peerEpoch:0x{}, n.zxid:0x{}, message format version:0x{}, n.config version:0x{}", self.getPeerState(), n.sid, n.state, n.leader, Long.toHexString(n.electionEpoch), Long.toHexString(n.peerEpoch), Long.toHexString(n.zxid), Long.toHexString(n.version), (n.qv != null ? (Long.toHexString(n.qv.getVersion())) : "0")); /* * If this server is looking, then send proposed leader */ if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) { recvqueue.offer(n); /* * Send a notification back if the peer that sent this * message is also looking and its logical clock is * lagging behind. */ if ((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get())) { Vote v = getVote(); QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend( ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock.get(), self.getPeerState(), response.sid, v.getPeerEpoch(), qv.toString().getBytes()); sendqueue.offer(notmsg); } } else { /* * If this server is not looking, but the one that sent the ack * is looking, then send back what it believes to be the leader. */ Vote current = self.getCurrentVote(); if (ackstate == QuorumPeer.ServerState.LOOKING) { if (self.leader != null) { if (leadingVoteSet != null) { self.leader.setLeadingVoteSet(leadingVoteSet); leadingVoteSet = null; } self.leader.reportLookingSid(response.sid); } LOG.debug( "Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}", self.getId(), response.sid, Long.toHexString(current.getZxid()), current.getId(), Long.toHexString(self.getQuorumVerifier().getVersion())); QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend( ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), self.getPeerState(), response.sid, current.getPeerEpoch(), qv.toString().getBytes()); sendqueue.offer(notmsg); } } } } catch (InterruptedException e) { LOG.warn("Interrupted Exception while waiting for new message", e); } } LOG.info("WorkerReceiver is down"); } } // org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerSender // 选举操作消息发送线程 public void run() { while (!stop) { try { ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if (m == null) { continue; } process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } /** * Called by run() once there is a new message to send. * * @param m message to send */ void process(ToSend m) { ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData); manager.toSend(m.sid, requestBuffer); } // 此二线程都是在创建 Message 时初始化的 // org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger#Messenger /** * Constructor of class Messenger. * * @param manager Connection manager */ Messenger(QuorumCnxManager manager) { this.ws = new WorkerSender(manager); this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); this.wsThread.setDaemon(true); this.wr = new WorkerReceiver(manager); this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); this.wrThread.setDaemon(true); } // 而 Message 是在创建 FastLeaderElection 时初始化的 // org.apache.zookeeper.server.quorum.FastLeaderElection#starter /** * This method is invoked by the constructor. Because it is a * part of the starting procedure of the object that must be on * any constructor of this class, it is probably best to keep as * a separate method. As we have a single constructor currently, * it is not strictly necessary to have it separate. * * @param self QuorumPeer that created this object * @param manager Connection manager */ private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; sendqueue = new LinkedBlockingQueue<ToSend>(); recvqueue = new LinkedBlockingQueue<Notification>(); // 初始化 Sender, Receiver this.messenger = new Messenger(manager); } // org.apache.zookeeper.server.quorum.FastLeaderElection#FastLeaderElection /** * Constructor of FastLeaderElection. It takes two parameters, one * is the QuorumPeer object that instantiated this object, and the other * is the connection manager. Such an object should be created only once * by each peer during an instance of the ZooKeeper service. * * @param self QuorumPeer that created this object * @param manager Connection manager */ public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) { this.stop = false; this.manager = manager; starter(self, manager); } // org.apache.zookeeper.server.quorum.QuorumPeer#createElectionAlgorithm @SuppressWarnings("deprecation") protected Election createElectionAlgorithm(int electionAlgorithm) { Election le = null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: QuorumCnxManager qcm = createCnxnManager(); QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm); if (oldQcm != null) { LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)"); oldQcm.halt(); } QuorumCnxManager.Listener listener = qcm.listener; if (listener != null) { // 把选举端口的监听开启来 listener.start(); // 调用 FastLeaderElection, 发送消息与接收消息的线程开启来 FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start(); le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; } // org.apache.zookeeper.server.quorum.QuorumPeer#startLeaderElection public synchronized void startLeaderElection() { try { if (getPeerState() == ServerState.LOOKING) { currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } } catch (IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } // electionAlgorithm 默认是 3, 使用 FastLeaderElection this.electionAlg = createElectionAlgorithm(electionType); } // org.apache.zookeeper.server.quorum.QuorumPeer#start @Override public synchronized void start() { if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } loadDataBase(); startServerCnxnFactory(); try { adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e); } startLeaderElection(); startJvmPauseMonitor(); super.start(); }
总结来说就是,使用 QuorumCnxManager.Listener 来开启选举端口的监听,只要发生了状态变化,就立即重新发起选举操作。默认使用 FastLeaderElection 进行选举操作。
三、操作投票内部通信端口连接的监听
// 只有当前节点被选举为主节点之后,才会建立操作投票端口监听 // org.apache.zookeeper.server.quorum.Leader.LearnerCnxAcceptor#run @Override public void run() { try { while (!stop) { Socket s = null; boolean error = false; try { // 基于 Socket 的阻塞等待 s = ss.accept(); // start with the initLimit, once the ack is processed // in LearnerHandler switch to the syncLimit s.setSoTimeout(self.tickTime * self.initLimit); s.setTcpNoDelay(nodelay); BufferedInputStream is = new BufferedInputStream(s.getInputStream()); LearnerHandler fh = new LearnerHandler(s, is, Leader.this); fh.start(); } catch (SocketException e) { error = true; if (stop) { LOG.warn("exception while shutting down acceptor.", e); // When Leader.shutdown() calls ss.close(), // the call to accept throws an exception. // We catch and set stop to true. stop = true; } else { throw e; } } catch (SaslException e) { LOG.error("Exception while connecting to quorum learner", e); error = true; } catch (Exception e) { error = true; throw e; } finally { // Don't leak sockets on errors if (error && s != null && !s.isClosed()) { try { s.close(); } catch (IOException e) { LOG.warn("Error closing socket", e); } } } } } catch (Exception e) { LOG.warn("Exception while accepting follower", e); handleException(this.getName(), e); } } // 在选举动作完成之后,内部通信端口才开始监听,在构造函数中创建 // org.apache.zookeeper.server.quorum.Leader#Leader Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException { this.self = self; this.proposalStats = new BufferStats(); try { if (self.shouldUsePortUnification() || self.isSslQuorum()) { boolean allowInsecureConnection = self.shouldUsePortUnification(); if (self.getQuorumListenOnAllIPs()) { ss = new UnifiedServerSocket( self.getX509Util(), allowInsecureConnection, self.getQuorumAddress().getPort()); } else { ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection); } } else { // 仍然是直接使用 ServerSocket 实现的端口监听,即 BIO if (self.getQuorumListenOnAllIPs()) { ss = new ServerSocket(self.getQuorumAddress().getPort()); } else { ss = new ServerSocket(); } } ss.setReuseAddress(true); // 端口使用之前配置中解析出的通信端口 // 然后等待连接 if (!self.getQuorumListenOnAllIPs()) { ss.bind(self.getQuorumAddress()); } } catch (BindException e) { if (self.getQuorumListenOnAllIPs()) { LOG.error("Couldn't bind to port {}", self.getQuorumAddress().getPort(), e); } else { LOG.error("Couldn't bind to {}", self.getQuorumAddress(), e); } throw e; } this.zk = zk; } // 而 Follower 则主动建立连接到 Leader, 双方通信时, 直接通过这个连接通道进行 // org.apache.zookeeper.server.quorum.Follower#followLeader /** * the main method called by the follower to follow the leader * * @throws InterruptedException */ void followLeader() throws InterruptedException { self.end_fle = Time.currentElapsedTime(); long electionTimeTaken = self.end_fle - self.start_fle; self.setElectionTimeTaken(electionTimeTaken); ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken); LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT); self.start_fle = 0; self.end_fle = 0; fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean); long connectionTime = 0; boolean completedSync = false; try { self.setZabState(QuorumPeer.ZabState.DISCOVERY); // 找到 Leader, 与之建立连接即可 QuorumServer leaderServer = findLeader(); try { connectToLeader(leaderServer.addr, leaderServer.hostname); connectionTime = System.currentTimeMillis(); long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); if (self.isReconfigStateChange()) { throw new Exception("learned about role change"); } //check to see if the leader zxid is lower than ours //this should never happen but is just a safety check long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid); if (newEpoch < self.getAcceptedEpoch()) { LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid) + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch())); throw new IOException("Error: Epoch of leader is lower"); } long startTime = Time.currentElapsedTime(); try { self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId()); self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION); syncWithLeader(newEpochZxid); self.setZabState(QuorumPeer.ZabState.BROADCAST); completedSync = true; } finally { long syncTime = Time.currentElapsedTime() - startTime; ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime); } if (self.getObserverMasterPort() > 0) { LOG.info("Starting ObserverMaster"); om = new ObserverMaster(self, fzk, self.getObserverMasterPort()); om.start(); } else { om = null; } // create a reusable packet to reduce gc impact QuorumPacket qp = new QuorumPacket(); // 会在此处一直循环等待 leader 节点的信息,忙等 socket while (this.isRunning()) { readPacket(qp); processPacket(qp); } } catch (Exception e) { LOG.warn("Exception when following the leader", e); closeSocket(); // clear pending revalidations pendingRevalidations.clear(); } } finally { if (om != null) { om.stop(); } zk.unregisterJMX(this); if (connectionTime != 0) { long connectionDuration = System.currentTimeMillis() - connectionTime; LOG.info( "Disconnected from leader (with address: {}). Was connected for {}ms. Sync state: {}", leaderAddr, connectionDuration, completedSync); messageTracker.dumpToLog(leaderAddr.toString()); } } } // org.apache.zookeeper.server.quorum.QuorumPeer#makeLeader protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception { return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb)); } // QuorumPeer 线程是一个选举线程,它会一直查看 leader 的情况,只要leader变化,就会触发选举,然后做动态平衡 // 选举线程会在每一个 zk 节点上都打开端口监听,而 内部通信的端口监听则只有 Leader 会开启, 其他节点只会负责连接 // org.apache.zookeeper.server.quorum.QuorumPeer @Override public void run() { updateThreadName(); LOG.debug("Starting quorum peer"); try { jmxQuorumBean = new QuorumBean(this); MBeanRegistry.getInstance().register(jmxQuorumBean, null); for (QuorumServer s : getView().values()) { ZKMBeanInfo p; if (getId() == s.id) { p = jmxLocalPeerBean = new LocalPeerBean(this); try { MBeanRegistry.getInstance().register(p, jmxQuorumBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxLocalPeerBean = null; } } else { RemotePeerBean rBean = new RemotePeerBean(this, s); try { MBeanRegistry.getInstance().register(rBean, jmxQuorumBean); jmxRemotePeerBean.put(s.id, rBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); } } } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxQuorumBean = null; } try { /* * Main loop */ while (running) { switch (getPeerState()) { case LOOKING: LOG.info("LOOKING"); ServerMetrics.getMetrics().LOOKING_COUNT.add(1); if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); // Create read-only server but don't start it immediately final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb); // Instead of starting roZk immediately, wait some grace // period before we decide we're partitioned. // // Thread is used here because otherwise it would require // changes in each of election strategy classes which is // unnecessary code coupling. Thread roZkMgr = new Thread() { public void run() { try { // lower-bound grace period to 2 secs sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); } } catch (InterruptedException e) { LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception e) { LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); } } }; try { roZkMgr.start(); reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } finally { // If the thread is in the the grace period, interrupt // to come out of waiting. roZkMgr.interrupt(); roZk.shutdown(); } } else { try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; case OBSERVING: try { LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { observer.shutdown(); setObserver(null); updateServerState(); // Add delay jitter before we switch to LOOKING // state to reduce the load of ObserverMaster if (isRunning()) { Observer.waitForObserverElectionDelay(); } } break; case FOLLOWING: // 只要判定出当前状态后,就会自行处理异常,直到状态发生变化 try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { follower.shutdown(); setFollower(null); updateServerState(); } break; case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); } break; } } } finally { LOG.warn("QuorumPeer main thread exited"); MBeanRegistry instance = MBeanRegistry.getInstance(); instance.unregister(jmxQuorumBean); instance.unregister(jmxLocalPeerBean); for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) { instance.unregister(remotePeerBean); } jmxQuorumBean = null; jmxLocalPeerBean = null; jmxRemotePeerBean = null; } }
总结来说就是,依赖于选举线程的 QuorumCnxManager.Listener, 当被选举为 Leader 时,才开启操作投票端口的监听!~
整体zk的架构就是纯异步的架构,从一线程提交到一个线程。但是我们看到,io操作上,只有面向客户端的连接使用了nio技术,而其他内部通信都是采用bio通信,因为并非所有nio都要比bio要好,还得看具体场景。
作为网络应用,整个协议过程我们大概知道就行,tcp/ip, osi协议。但是对于应用层协议的起点,tcp/udp socket 是如何交给应用的,就是我们要从根本上理解网络应用的起点。否则,就只能看看热闹了。
看懂了zk的多个端口连接的建立过程,对于后续面对各种复杂交错的服务器间交互,是有很大帮助的。(当然了,如果你不求甚解那可能就没啥差别了)