zoukankan      html  css  js  c++  java
  • Zookeeper 源码(四)Zookeeper 服务端源码

    Zookeeper 源码(四)Zookeeper 服务端源码

    Zookeeper 服务端架构

    Zookeeper 服务端的启动入口为 QuorumPeerMain

    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        main.initializeAndRun(args);
    }
    
    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException, AdminServerException {
        // 1. 读取配置文件
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }
    
        // 2. 创建并启动历史文件清理器
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
    
        if (args.length == 1 && config.isDistributed()) {
            // 3. 集群启动
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // 4. 单机启动
            ZooKeeperServerMain.main(args);
        }
    }
    

    一、单机启动

    单机启动流程

    (1) 启动入口【ZooKeeperServerMain】

    public static void main(String[] args) {
        ZooKeeperServerMain main = new ZooKeeperServerMain();
        main.initializeAndRun(args);
    }
    
    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException, AdminServerException {
        try {
            ManagedUtil.registerLog4jMBeans();
        } catch (JMException e) {
            LOG.warn("Unable to register log4j JMX control", e);
        }
    
        ServerConfig config = new ServerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        } else {
            config.parse(args);
        }
    
        runFromConfig(config);
    }
    

    (2) 核心启动方法【ZooKeeperServerMain】

    public void runFromConfig(ServerConfig config) throws IOException, AdminServerException {
        LOG.info("Starting server");
        FileTxnSnapLog txnLog = null;
        try {
            // 1. 事务日志文件和快照数据文件处理器
            txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
    
            // 2. 创建服务实例
            ZooKeeperServer zkServer = new ZooKeeperServer( txnLog,
                    config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);
    
            // 省略...
            boolean needStartZKServer = true;
            if (config.getClientPortAddress() != null) {
                // 3. 创建底层通信实现,默认为 NIOServerCnxnFactory
                cnxnFactory = ServerCnxnFactory.createFactory();
                cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
    
                // 4. 启动服务(核心)
                cnxnFactory.startup(zkServer);
                // zkServer has been started. So we don't need to start it again in secureCnxnFactory.
                needStartZKServer = false;
            }
            
            // 省略...
            if (cnxnFactory != null) {
                cnxnFactory.join();
            }
            if (zkServer.isRunning()) {
                zkServer.shutdown();
            }
        } catch (InterruptedException e) {
            // warn, but generally this is ok
            LOG.warn("Server interrupted", e);
        } finally {
            if (txnLog != null) {
                txnLog.close();
            }
        }
    }
    

    (3) ZooKeeperServer【ZooKeeperServer】

    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
            int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb) {
        serverStats = new ServerStats(this);
        this.txnLogFactory = txnLogFactory;
        this.zkDb = zkDb;
        this.tickTime = tickTime;
        setMinSessionTimeout(minSessionTimeout);
        setMaxSessionTimeout(maxSessionTimeout);
    }
    

    ServerStats 记录了服务端的以下信息:

    属性 说明
    packetsSent Zookeeper 启动后响应的次数
    packetsReceived Zookeeper 启动后接收请求的次数
    maxLatency、minLatency、totalLatency Zookeeper 启动后最大、最小、总延迟时间
    count Zookeeper 启动后处理客户端请求的总次数

    (4) createFactory【ServerCnxnFactory】

    // 创建底层通信的 ServerCnxnFactory
    public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
    static public ServerCnxnFactory createFactory() throws IOException {
        String serverCnxnFactoryName =
            System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
        if (serverCnxnFactoryName == null) {
            serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
        }
        try {
            return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                                                .newInstance();
        } catch (Exception e) {
            IOException ioe = new IOException("Couldn't instantiate "
                    + serverCnxnFactoryName);
            ioe.initCause(e);
            throw ioe;
        }
    }
    

    (5) startup【NettyServerCnxnFactory】

    @Override
    public void startup(ZooKeeperServer zks, boolean startServer)
            throws IOException, InterruptedException {
        // 1. 启动 netty
        start();
        setZooKeeperServer(zks);
        if (startServer) {
            // 2. 恢复本地数据
            zks.startdata();
            // 3. 启动会话管理器和请求处理链等
            zks.startup();
        }
    }
    
    // 启动 netty
    @Override
    public void start() {
        LOG.info("binding to port " + localAddress);
        parentChannel = bootstrap.bind(localAddress);
    }
    

    (6) startdata【ZooKeeperServer】

    // 恢复本地数据
    public void startdata() throws IOException, InterruptedException {
        //check to see if zkDb is not null
        if (zkDb == null) {
            zkDb = new ZKDatabase(this.txnLogFactory);
        }
        if (!zkDb.isInitialized()) {
            loadData();
        }
    }
    

    (7) startup【ZooKeeperServer】

    // 启动会话管理器、注册请求处理链
    public synchronized void startup() {
        if (sessionTracker == null) {
            createSessionTracker();
        }
        startSessionTracker();
        setupRequestProcessors();
    
        registerJMX();
    
        state = State.RUNNING;
        notifyAll();
    }
    
    // 启动会话管理器
    protected void createSessionTracker() {
        sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                tickTime, 1, getZooKeeperServerListener());
    }
    protected void startSessionTracker() {
        ((SessionTrackerImpl)sessionTracker).start();
    }
    
    // 注册请求处理链(核心,处理客户端请求)
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                finalProcessor);
        ((SyncRequestProcessor)syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }
    

    二、集群启动

    集群启动流程

    集群相比单机多个一个 Leader 选举的过程。Quorum 指多数,Peer 指法人,QuorumPeer 合起来表示多数派。

    (1) 核心启动方法【QuorumPeerMain】

    public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
        try {
                ManagedUtil.registerLog4jMBeans();
            } catch (JMException e) {
                LOG.warn("Unable to register log4j JMX control", e);
        }
    
        LOG.info("Starting quorum peer");
        try {
            ServerCnxnFactory cnxnFactory = null;
            ServerCnxnFactory secureCnxnFactory = null;
    
            if (config.getClientPortAddress() != null) {
              cnxnFactory = ServerCnxnFactory.createFactory();
              cnxnFactory.configure(config.getClientPortAddress(),
                        config.getMaxClientCnxns(), false);
            }
    
            if (config.getSecureClientPortAddress() != null) {
                secureCnxnFactory = ServerCnxnFactory.createFactory();
                secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                        config.getMaxClientCnxns(), true);
            }
            
            // 1. 初始化 QuorumPeer 并设置配置参数
            quorumPeer = new QuorumPeer();
            quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      config.getDataLogDir(),
                      config.getDataDir()));
            quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
            quorumPeer.enableLocalSessionsUpgrading(
              config.isLocalSessionsUpgradingEnabled());
            //quorumPeer.setQuorumPeers(config.getAllMembers());
            quorumPeer.setElectionType(config.getElectionAlg());
            quorumPeer.setMyid(config.getServerId());
            quorumPeer.setTickTime(config.getTickTime());
            quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
            quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
            quorumPeer.setInitLimit(config.getInitLimit());
            quorumPeer.setSyncLimit(config.getSyncLimit());
            quorumPeer.setConfigFileName(config.getConfigFilename());
            // 2. 设置内存数据库
            quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
            quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
            if (config.getLastSeenQuorumVerifier()!=null) {
                quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
            }
            quorumPeer.initConfigInZKDatabase();
            // 3. 设置底层通信 ServerCnxnFactory
            quorumPeer.setCnxnFactory(cnxnFactory);
            quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
            quorumPeer.setLearnerType(config.getPeerType());
            quorumPeer.setSyncEnabled(config.getSyncEnabled());
            quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
    
            // 4. 启动
            quorumPeer.start();
            quorumPeer.join();
        } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
        }
    }
    

    (2) start【QuorumPeer】

    public synchronized void start() {
        if (!getView().containsKey(myid)) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
         }
        // 1. 恢复本地数据
        loadDataBase();
        // 2. 启动 server
        startServerCnxnFactory();
        try {
            adminServer.start();
        } catch (AdminServerException e) {
            LOG.warn("Problem starting AdminServer", e);
            System.out.println(e);
        }
        // 3. 设置选举算法
        startLeaderElection();
        // 4. 启动线程(QuorumPeer 继承自 Thread)
        super.start();
    }
    
    // 绑定端口,启动 server 端
    private void startServerCnxnFactory() {
        if (cnxnFactory != null) {
            cnxnFactory.start();
        }
        if (secureCnxnFactory != null) {
            secureCnxnFactory.start();
        }
    }
    

    (3) startLeaderElection【QuorumPeer】

    electionType 是从配置文件的 electionAlg 设置,在 QuorumPeerConfig 中默认为 3,也就是说默认会采用 FastLeaderElection 算法进行 Leader 选举。

    // 默认采用 FastLeaderElection 算法进行选举
    synchronized public void startLeaderElection() {
        try {
           if (getPeerState() == ServerState.LOOKING) {
               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
           }
        } catch(IOException e) {
           RuntimeException re = new RuntimeException(e.getMessage());
           re.setStackTrace(e.getStackTrace());
           throw re;
        }
    
        if (electionType == 0) {
            try {
                udpSocket = new DatagramSocket(myQuorumAddr.getPort());
                responder = new ResponderThread();
                responder.start();
            } catch (SocketException e) {
                throw new RuntimeException(e);
            }
        }
        this.electionAlg = createElectionAlgorithm(electionType);
    }
    
    protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;
    
        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 0:
            le = new LeaderElection(this);
            break;
        case 1:
            le = new AuthFastLeaderElection(this);
            break;
        case 2:
            le = new AuthFastLeaderElection(this, true);
            break;
        case 3:
            qcm = new QuorumCnxManager(this);
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                listener.start();
                FastLeaderElection fle = new FastLeaderElection(this, qcm);
                fle.start();
                le = fle;
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }
    

    (3) 启动 QuorumPeer 线程【QuorumPeer】

    @Override
    public void run() {
        
        // 省略...
        try {
            while (running) {
                switch (getPeerState()) {
                // 1. Leader 选举
                case LOOKING:
                    // 省略...
                    try {
                       reconfigFlagClear();
                        if (shuttingDownLE) {
                           shuttingDownLE = false;
                           startLeaderElection();
                        }
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                // 2. Observer
                case OBSERVING:
                    try {
                        setObserver(makeObserver(logFactory));
                        observer.observeLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e );
                    } finally {
                        observer.shutdown();
                        setObserver(null);  
                       updateServerState();
                    }
                    break;
                // 3. Follower
                case FOLLOWING:
                    try {
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                       LOG.warn("Unexpected exception",e);
                    } finally {
                       follower.shutdown();
                       setFollower(null);
                       updateServerState();
                    }
                    break;
                // 4. Leader
                case LEADING:
                    try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        updateServerState();
                    }
                    break;
                }
                start_fle = Time.currentElapsedTime();
            }
        } finally {
            // 省略...
        }
    }
    

    下面两节会重点关注 Leader 选举和请求处理。

    参考:

    1. 从 Paxos 到 Zookeeper : 分布式一致性原理与实践

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    【校招面试 之 C/C++】第23题 C++ STL(五)之Set
    Cannot create an instance of OLE DB provider “OraOLEDB.Oracle” for linked server "xxxxxxx".
    Redhat Linux安装JDK 1.7
    ORA-10635: Invalid segment or tablespace type
    Symantec Backup Exec 2012 Agent for Linux 卸载
    Symantec Backup Exec 2012 Agent For Linux安装
    You must use the Role Management Tool to install or configure Microsoft .NET Framework 3.5 SP1
    YourSQLDba介绍
    PL/SQL重新编译包无反应
    MS SQL 监控数据/日志文件增长
  • 原文地址:https://www.cnblogs.com/binarylei/p/9948790.html
Copyright © 2011-2022 走看看