zoukankan      html  css  js  c++  java
  • Zookeeper 源码(四)Zookeeper 服务端源码

    Zookeeper 源码(四)Zookeeper 服务端源码

    Zookeeper 服务端架构

    Zookeeper 服务端的启动入口为 QuorumPeerMain

    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        main.initializeAndRun(args);
    }
    
    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException, AdminServerException {
        // 1. 读取配置文件
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }
    
        // 2. 创建并启动历史文件清理器
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
    
        if (args.length == 1 && config.isDistributed()) {
            // 3. 集群启动
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // 4. 单机启动
            ZooKeeperServerMain.main(args);
        }
    }
    

    一、单机启动

    单机启动流程

    (1) 启动入口【ZooKeeperServerMain】

    public static void main(String[] args) {
        ZooKeeperServerMain main = new ZooKeeperServerMain();
        main.initializeAndRun(args);
    }
    
    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException, AdminServerException {
        try {
            ManagedUtil.registerLog4jMBeans();
        } catch (JMException e) {
            LOG.warn("Unable to register log4j JMX control", e);
        }
    
        ServerConfig config = new ServerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        } else {
            config.parse(args);
        }
    
        runFromConfig(config);
    }
    

    (2) 核心启动方法【ZooKeeperServerMain】

    public void runFromConfig(ServerConfig config) throws IOException, AdminServerException {
        LOG.info("Starting server");
        FileTxnSnapLog txnLog = null;
        try {
            // 1. 事务日志文件和快照数据文件处理器
            txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
    
            // 2. 创建服务实例
            ZooKeeperServer zkServer = new ZooKeeperServer( txnLog,
                    config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);
    
            // 省略...
            boolean needStartZKServer = true;
            if (config.getClientPortAddress() != null) {
                // 3. 创建底层通信实现,默认为 NIOServerCnxnFactory
                cnxnFactory = ServerCnxnFactory.createFactory();
                cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
    
                // 4. 启动服务(核心)
                cnxnFactory.startup(zkServer);
                // zkServer has been started. So we don't need to start it again in secureCnxnFactory.
                needStartZKServer = false;
            }
            
            // 省略...
            if (cnxnFactory != null) {
                cnxnFactory.join();
            }
            if (zkServer.isRunning()) {
                zkServer.shutdown();
            }
        } catch (InterruptedException e) {
            // warn, but generally this is ok
            LOG.warn("Server interrupted", e);
        } finally {
            if (txnLog != null) {
                txnLog.close();
            }
        }
    }
    

    (3) ZooKeeperServer【ZooKeeperServer】

    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
            int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb) {
        serverStats = new ServerStats(this);
        this.txnLogFactory = txnLogFactory;
        this.zkDb = zkDb;
        this.tickTime = tickTime;
        setMinSessionTimeout(minSessionTimeout);
        setMaxSessionTimeout(maxSessionTimeout);
    }
    

    ServerStats 记录了服务端的以下信息:

    属性 说明
    packetsSent Zookeeper 启动后响应的次数
    packetsReceived Zookeeper 启动后接收请求的次数
    maxLatency、minLatency、totalLatency Zookeeper 启动后最大、最小、总延迟时间
    count Zookeeper 启动后处理客户端请求的总次数

    (4) createFactory【ServerCnxnFactory】

    // 创建底层通信的 ServerCnxnFactory
    public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
    static public ServerCnxnFactory createFactory() throws IOException {
        String serverCnxnFactoryName =
            System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
        if (serverCnxnFactoryName == null) {
            serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
        }
        try {
            return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                                                .newInstance();
        } catch (Exception e) {
            IOException ioe = new IOException("Couldn't instantiate "
                    + serverCnxnFactoryName);
            ioe.initCause(e);
            throw ioe;
        }
    }
    

    (5) startup【NettyServerCnxnFactory】

    @Override
    public void startup(ZooKeeperServer zks, boolean startServer)
            throws IOException, InterruptedException {
        // 1. 启动 netty
        start();
        setZooKeeperServer(zks);
        if (startServer) {
            // 2. 恢复本地数据
            zks.startdata();
            // 3. 启动会话管理器和请求处理链等
            zks.startup();
        }
    }
    
    // 启动 netty
    @Override
    public void start() {
        LOG.info("binding to port " + localAddress);
        parentChannel = bootstrap.bind(localAddress);
    }
    

    (6) startdata【ZooKeeperServer】

    // 恢复本地数据
    public void startdata() throws IOException, InterruptedException {
        //check to see if zkDb is not null
        if (zkDb == null) {
            zkDb = new ZKDatabase(this.txnLogFactory);
        }
        if (!zkDb.isInitialized()) {
            loadData();
        }
    }
    

    (7) startup【ZooKeeperServer】

    // 启动会话管理器、注册请求处理链
    public synchronized void startup() {
        if (sessionTracker == null) {
            createSessionTracker();
        }
        startSessionTracker();
        setupRequestProcessors();
    
        registerJMX();
    
        state = State.RUNNING;
        notifyAll();
    }
    
    // 启动会话管理器
    protected void createSessionTracker() {
        sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                tickTime, 1, getZooKeeperServerListener());
    }
    protected void startSessionTracker() {
        ((SessionTrackerImpl)sessionTracker).start();
    }
    
    // 注册请求处理链(核心,处理客户端请求)
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                finalProcessor);
        ((SyncRequestProcessor)syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }
    

    二、集群启动

    集群启动流程

    集群相比单机多个一个 Leader 选举的过程。Quorum 指多数,Peer 指法人,QuorumPeer 合起来表示多数派。

    (1) 核心启动方法【QuorumPeerMain】

    public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
        try {
                ManagedUtil.registerLog4jMBeans();
            } catch (JMException e) {
                LOG.warn("Unable to register log4j JMX control", e);
        }
    
        LOG.info("Starting quorum peer");
        try {
            ServerCnxnFactory cnxnFactory = null;
            ServerCnxnFactory secureCnxnFactory = null;
    
            if (config.getClientPortAddress() != null) {
              cnxnFactory = ServerCnxnFactory.createFactory();
              cnxnFactory.configure(config.getClientPortAddress(),
                        config.getMaxClientCnxns(), false);
            }
    
            if (config.getSecureClientPortAddress() != null) {
                secureCnxnFactory = ServerCnxnFactory.createFactory();
                secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                        config.getMaxClientCnxns(), true);
            }
            
            // 1. 初始化 QuorumPeer 并设置配置参数
            quorumPeer = new QuorumPeer();
            quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      config.getDataLogDir(),
                      config.getDataDir()));
            quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
            quorumPeer.enableLocalSessionsUpgrading(
              config.isLocalSessionsUpgradingEnabled());
            //quorumPeer.setQuorumPeers(config.getAllMembers());
            quorumPeer.setElectionType(config.getElectionAlg());
            quorumPeer.setMyid(config.getServerId());
            quorumPeer.setTickTime(config.getTickTime());
            quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
            quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
            quorumPeer.setInitLimit(config.getInitLimit());
            quorumPeer.setSyncLimit(config.getSyncLimit());
            quorumPeer.setConfigFileName(config.getConfigFilename());
            // 2. 设置内存数据库
            quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
            quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
            if (config.getLastSeenQuorumVerifier()!=null) {
                quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
            }
            quorumPeer.initConfigInZKDatabase();
            // 3. 设置底层通信 ServerCnxnFactory
            quorumPeer.setCnxnFactory(cnxnFactory);
            quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
            quorumPeer.setLearnerType(config.getPeerType());
            quorumPeer.setSyncEnabled(config.getSyncEnabled());
            quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
    
            // 4. 启动
            quorumPeer.start();
            quorumPeer.join();
        } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
        }
    }
    

    (2) start【QuorumPeer】

    public synchronized void start() {
        if (!getView().containsKey(myid)) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
         }
        // 1. 恢复本地数据
        loadDataBase();
        // 2. 启动 server
        startServerCnxnFactory();
        try {
            adminServer.start();
        } catch (AdminServerException e) {
            LOG.warn("Problem starting AdminServer", e);
            System.out.println(e);
        }
        // 3. 设置选举算法
        startLeaderElection();
        // 4. 启动线程(QuorumPeer 继承自 Thread)
        super.start();
    }
    
    // 绑定端口,启动 server 端
    private void startServerCnxnFactory() {
        if (cnxnFactory != null) {
            cnxnFactory.start();
        }
        if (secureCnxnFactory != null) {
            secureCnxnFactory.start();
        }
    }
    

    (3) startLeaderElection【QuorumPeer】

    electionType 是从配置文件的 electionAlg 设置,在 QuorumPeerConfig 中默认为 3,也就是说默认会采用 FastLeaderElection 算法进行 Leader 选举。

    // 默认采用 FastLeaderElection 算法进行选举
    synchronized public void startLeaderElection() {
        try {
           if (getPeerState() == ServerState.LOOKING) {
               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
           }
        } catch(IOException e) {
           RuntimeException re = new RuntimeException(e.getMessage());
           re.setStackTrace(e.getStackTrace());
           throw re;
        }
    
        if (electionType == 0) {
            try {
                udpSocket = new DatagramSocket(myQuorumAddr.getPort());
                responder = new ResponderThread();
                responder.start();
            } catch (SocketException e) {
                throw new RuntimeException(e);
            }
        }
        this.electionAlg = createElectionAlgorithm(electionType);
    }
    
    protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;
    
        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 0:
            le = new LeaderElection(this);
            break;
        case 1:
            le = new AuthFastLeaderElection(this);
            break;
        case 2:
            le = new AuthFastLeaderElection(this, true);
            break;
        case 3:
            qcm = new QuorumCnxManager(this);
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                listener.start();
                FastLeaderElection fle = new FastLeaderElection(this, qcm);
                fle.start();
                le = fle;
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }
    

    (3) 启动 QuorumPeer 线程【QuorumPeer】

    @Override
    public void run() {
        
        // 省略...
        try {
            while (running) {
                switch (getPeerState()) {
                // 1. Leader 选举
                case LOOKING:
                    // 省略...
                    try {
                       reconfigFlagClear();
                        if (shuttingDownLE) {
                           shuttingDownLE = false;
                           startLeaderElection();
                        }
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                // 2. Observer
                case OBSERVING:
                    try {
                        setObserver(makeObserver(logFactory));
                        observer.observeLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e );
                    } finally {
                        observer.shutdown();
                        setObserver(null);  
                       updateServerState();
                    }
                    break;
                // 3. Follower
                case FOLLOWING:
                    try {
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                       LOG.warn("Unexpected exception",e);
                    } finally {
                       follower.shutdown();
                       setFollower(null);
                       updateServerState();
                    }
                    break;
                // 4. Leader
                case LEADING:
                    try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        updateServerState();
                    }
                    break;
                }
                start_fle = Time.currentElapsedTime();
            }
        } finally {
            // 省略...
        }
    }
    

    下面两节会重点关注 Leader 选举和请求处理。

    参考:

    1. 从 Paxos 到 Zookeeper : 分布式一致性原理与实践

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    读后感之—寒门学子重要选择-程序员
    架构中的分而治之
    如何从码农进化到项目管理者
    饿了么架构
    简单理解支付宝和蚂蚁花呗的架构
    架构小谈之美团外卖
    漫谈架构总结之1500
    平台基本信息项目目标文档
    第六学期每周总结-第三周
    质量管理之可用性战术分析
  • 原文地址:https://www.cnblogs.com/binarylei/p/9948790.html
Copyright © 2011-2022 走看看