以下说说Leader选举的getView的解析流程
public Map<Long,QuorumPeer.QuorumServer> getView() { return Collections.unmodifiableMap(this.quorumPeers); }
getView 里面实际上返回的是一个 quorumPeers,就是参与本次投票的成员有哪些。这个属性在哪里赋值的呢?我们又得回到 runFromConfig 方法中
QuorumPeerMain.runFromConfig
设置了一个值为 config.getServers()
//… quorumPeer.setQuorumPeers(config.getServers()); //…
config 这个配置信息又是通过在 initializeAndRun 方法中初始化的,
protected void initializeAndRun(String[] args) throws ConfigException, IOException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } }
QuorumPeerConfig.parse
这里会根据一个外部的文件去进行解析,然后其中有一段是这样,解析对应的集群配置数据放到 servers 这个集合中
} else if (key.startsWith("server.")) { int dot = key.indexOf('.'); long sid = Long.parseLong(key.substring(dot + 1)); String parts[] = splitWithLeadingHostname(value); if ((parts.length != 2) && (parts.length != 3) && (parts.length !=4)) { LOG.error(value + " does not have the form host:port or host:port:port " + " or host:port:port:type"); }
ZkServer 服务启动的逻辑
在讲 leader 选举的时候,有一个 cnxnFactory.start()方法来启动 zk 服务,这块具体做了什么呢?我们来分析看看
QuorumPeerMain.runFromConfig
在 runFromConfig 中,有构建了一个 ServerCnxnFactory
public void runFromConfig(QuorumPeerConfig config) throws IOException { //… LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); //… quorumPeer.setCnxnFactory(cnxnFactory); //…并且将这个 factory 设置给了 quorumPeer 的成员属性 } }
这个很明显是一个工厂模式,基于这个工厂类创建什么呢? 打开createFactory 方法看看就知道了
ServerCnxnFactory.createFactory
这个方法里面是根据 ZOOKEEPER_SERVER_CNXN_FACTORY 来决定创建 NIO server 还是 Netty Server而默认情况下,应该是创建一个 NIOServerCnxnFactory
static public ServerCnxnFactory createFactory() throws IOException { String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY); if (serverCnxnFactoryName == null) { serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } try { ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName) .getDeclaredConstructor().newInstance(); LOG.info("Using {} as server connection factory", serverCnxnFactoryName); return serverCnxnFactory; } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName); ioe.initCause(e); throw ioe; } }
QuorumPeer.start
因此,我们再回到 QuorumPeer.start()方法中,cnxnFactory.start(),应该会调用 NIOServerCnxnFactory 这个类去启动一个线程
public synchronized void start() { loadDataBase(); cnxnFactory.start(); startLeaderElection(); super.start(); }
NIOServerCnxnFactory.start
这里通过 thread.start 启动一个线程,那 thread 是一个什么对象呢?
public void start() { // ensure thread is started once and only once if (thread.getState() == Thread.State.NEW) { thread.start(); } }
NIOServerCnxnFactory.configure
thread 其实构建的是一个 zookeeperThread 线程,并且线程的参数为 this,表示当前 NIOServerCnxnFactory 也是实现了线程的类,那么它必须要重写run 方法,因此定位到 NIOServerCnxnFactory.run。
到此,NIOServer 的初始化以及启动过程就完成了。并且对 2181 的这个端口进行监听。一旦发现有请求进来,就执行相应的处理即可。这块后续在分析数据同步的时候再做详细了解
Thread thread; @Override public void configure(InetSocketAddress addr, int maxcc) throws IOException { configureSaslLogin(); thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr); thread.setDaemon(true); maxClientCnxns = maxcc; this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port " + addr); ss.socket().bind(addr); ss.configureBlocking(false); ss.register(selector, SelectionKey.OP_ACCEPT); }