zoukankan      html  css  js  c++  java
  • zookeeper 3.4.6启动流程粗略梳理

    zookeeper 3.4.6

    启动脚本里面

     nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}"     -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &    
    

    翻译过来之后太烦了,shit

    java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /usr/local/lu/zookeeper-3.4.6/bin/../build/classes:/usr/local/lu/zookeeper-3.4.6/bin/../build/lib/*.jar:/usr/local/lu/zookeeper-3.4.6/bin/../lib/slf4j-log4j12-1.6.1.jar:/usr/local/lu/zookeeper-3.4.6/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/lu/zookeeper-3.4.6/bin/../lib/netty-3.7.0.Final.jar:/usr/local/lu/zookeeper-3.4.6/bin/../lib/log4j-1.2.16.jar:/usr/local/lu/zookeeper-3.4.6/bin/../lib/jline-0.9.94.jar:/usr/local/lu/zookeeper-3.4.6/bin/../zookeeper-3.4.6.jar:/usr/local/lu/zookeeper-3.4.6/bin/../src/java/lib/*.jar:/usr/local/lu/zookeeper-3.4.6/bin/../conf:.:/data/java/jdk1.7.0_15/jre/lib/rt.jar:/data/java/jdk1.7.0_15/lib/dt.jar:/data/java/jdk1.7.0_15/lib/tools.jar -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain
    /usr/local/lu/zookeeper-3.4.6/bin/../conf/zoo.cfg
    

    这里看到启动入口是QuorumPeerMain,先看其main()函数关键代码

    QuorumPeerMain main = new QuorumPeerMain();
     main.initializeAndRun(args);
    

    initializeAndRun()这种会启动单例还是集群模式的zookeeper

        protected void initializeAndRun(String[] args)
            throws ConfigException, IOException
        {
            QuorumPeerConfig config = new QuorumPeerConfig();
            if (args.length == 1) {
                config.parse(args[0]);
            }
    
            // Start and schedule the the purge task
            DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                    .getDataDir(), config.getDataLogDir(), config
                    .getSnapRetainCount(), config.getPurgeInterval());
            purgeMgr.start();
    
            if (args.length == 1 && config.servers.size() > 0) {
                runFromConfig(config);
            } else {
                LOG.warn("Either no config or no quorum defined in config, running "
                        + " in standalone mode");
                // there is only server in the quorum -- run as standalone
                ZooKeeperServerMain.main(args);
            }
        }
    

    这里如果是集群模式走runFromConfig,首先注册Mxbean

    ManagedUtil.registerLog4jMBeans();
    

    然后是初始化连接工厂,默认使用NIO,这里是NIOServerCnxnFactory

    ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
              cnxnFactory.configure(config.getClientPortAddress(),
                                    config.getMaxClientCnxns());
    
       static public ServerCnxnFactory createFactory() throws IOException {
            String serverCnxnFactoryName =
                System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
            if (serverCnxnFactoryName == null) {
                serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
            }
            try {
                return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                                                    .newInstance();
            } catch (Exception e) {
                IOException ioe = new IOException("Couldn't instantiate "
                        + serverCnxnFactoryName);
                ioe.initCause(e);
                throw ioe;
            }
        }
    

    z之后初始化QuorumPeer,进行相应的配置,最后启动QuorumPeer

        ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
              cnxnFactory.configure(config.getClientPortAddress(),
                                    config.getMaxClientCnxns());
      
              quorumPeer = new QuorumPeer();
              quorumPeer.setClientPortAddress(config.getClientPortAddress());
              quorumPeer.setTxnFactory(new FileTxnSnapLog(
                          new File(config.getDataLogDir()),
                          new File(config.getDataDir())));
              quorumPeer.setQuorumPeers(config.getServers());
              quorumPeer.setElectionType(config.getElectionAlg());
              quorumPeer.setMyid(config.getServerId());
              quorumPeer.setTickTime(config.getTickTime());
              quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
              quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
              quorumPeer.setInitLimit(config.getInitLimit());
              quorumPeer.setSyncLimit(config.getSyncLimit());
              quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
              quorumPeer.setCnxnFactory(cnxnFactory);
              quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
              quorumPeer.setLearnerType(config.getPeerType());
              quorumPeer.setSyncEnabled(config.getSyncEnabled());
              quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
      
              quorumPeer.start();
              quorumPeer.join();
    

    ServerCnxnFactory.configure()这里配置ServerSocketChannel

      public void configure(InetSocketAddress addr, int maxcc) throws IOException {
            configureSaslLogin();
    
            thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
            thread.setDaemon(true);
            maxClientCnxns = maxcc;
            this.ss = ServerSocketChannel.open();
            ss.socket().setReuseAddress(true);
            LOG.info("binding to port " + addr);
            ss.socket().bind(addr);
            ss.configureBlocking(false);
            ss.register(selector, SelectionKey.OP_ACCEPT);
        }
    

    QuorumPeer.start()

        public synchronized void start() {
            loadDataBase();
            cnxnFactory.start();        
            startLeaderElection();
            super.start();
        }
    

    1,首先加载数据 loadDataBase();

    2,启动NIO线程;

    3,启动leader选举startLeaderElection();

    首先看从本地加载数据loadDataBase();

        public long loadDataBase() throws IOException {
            PlayBackListener listener=new PlayBackListener(){
                public void onTxnLoaded(TxnHeader hdr,Record txn){
                    Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
                            null, null);
                    r.txn = txn;
                    r.hdr = hdr;
                    r.zxid = hdr.getZxid();
                    addCommittedProposal(r);
                }
            };
            
            long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
            initialized = true;
            return zxid;
        }
    
    

    从zk的事务日志snapLog中恢复

     long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
    

    之后会回调PlayBackListener的onTxnLoaded来onTxnLoaded提交Proposal(addCommittedProposal())

      public long restore(DataTree dt, Map<Long, Integer> sessions, 
                PlayBackListener listener) throws IOException {
            snapLog.deserialize(dt, sessions);
            FileTxnLog txnLog = new FileTxnLog(dataDir);
            TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
            long highestZxid = dt.lastProcessedZxid;
            TxnHeader hdr;
            try {
                while (true) {
                    // iterator points to 
                    // the first valid txn when initialized
                    hdr = itr.getHeader();
                    if (hdr == null) {
                        //empty logs 
                        return dt.lastProcessedZxid;
                    }
                    if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                        LOG.error("{}(higestZxid) > {}(next log) for type {}",
                                new Object[] { highestZxid, hdr.getZxid(),
                                        hdr.getType() });
                    } else {
                        highestZxid = hdr.getZxid();
                    }
                    try {
                        processTransaction(hdr,dt,sessions, itr.getTxn());
                    } catch(KeeperException.NoNodeException e) {
                       throw new IOException("Failed to process transaction type: " +
                             hdr.getType() + " error: " + e.getMessage(), e);
                    }
                    listener.onTxnLoaded(hdr, itr.getTxn());
                    if (!itr.next()) 
                        break;
                }
            } finally {
                if (itr != null) {
                    itr.close();
                }
            }
            return highestZxid;
        }
    

    启动NIOServerCnxnFactory的start()

        public void start() {
            // ensure thread is started once and only once
            if (thread.getState() == Thread.State.NEW) {
                thread.start();
            }
        }
    

    这里的thread其实就是NIOServerCnxnFactory

    thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
    thread.setDaemon(true);
    

    然后来到run()循环

        public void run() {
            while (!ss.socket().isClosed()) {
                try {
                    selector.select(1000);
                    Set<SelectionKey> selected;
                    synchronized (this) {
                        selected = selector.selectedKeys();
                    }
                    ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                            selected);
                    Collections.shuffle(selectedList);
                    for (SelectionKey k : selectedList) {
                        if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                            SocketChannel sc = ((ServerSocketChannel) k
                                    .channel()).accept();
                            InetAddress ia = sc.socket().getInetAddress();
                            int cnxncount = getClientCnxnCount(ia);
                            if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                                LOG.warn("Too many connections from " + ia
                                         + " - max is " + maxClientCnxns );
                                sc.close();
                            } else {
                                LOG.info("Accepted socket connection from "
                                         + sc.socket().getRemoteSocketAddress());
                                sc.configureBlocking(false);
                                SelectionKey sk = sc.register(selector,
                                        SelectionKey.OP_READ);
                                NIOServerCnxn cnxn = createConnection(sc, sk);
                                sk.attach(cnxn);
                                addCnxn(cnxn);
                            }
                        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                            NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                            c.doIO(k);
                        } else {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Unexpected ops in select "
                                          + k.readyOps());
                            }
                        }
                    }
                    selected.clear();
                } catch (RuntimeException e) {
                    LOG.warn("Ignoring unexpected runtime exception", e);
                } catch (Exception e) {
                    LOG.warn("Ignoring exception", e);
                }
            }
            closeAll();
            LOG.info("NIOServerCnxn factory exited run method");
        }
    

    监听IO,阻塞等待数据到来(1s)

    selector.select(1000);
                    Set<SelectionKey> selected;
                    synchronized (this) {
                        selected = selector.selectedKeys();
                    }
                    ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                            selected);
                    Collections.shuffle(selectedList);
    

    如果是连接,则accept,生成则在selector中注册SelectionKey.OP_READ事件 ,在没有达到maxClientCnxns时会创建一个连接createConnection,保存在set中

            LOG.info("Accepted socket connection from "
                                         + sc.socket().getRemoteSocketAddress());
                                sc.configureBlocking(false);
                                SelectionKey sk = sc.register(selector,
                                        SelectionKey.OP_READ);
                                NIOServerCnxn cnxn = createConnection(sc, sk);
                                sk.attach(cnxn);
                                addCnxn(cnxn);
    

    处理OP_READ,OP_WRITE 读写时间

    if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                            NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                            c.doIO(k);
                        }
    
    /**
         * Handles read/write IO on connection.
         */
        void doIO(SelectionKey k) throws InterruptedException {
            try {
                if (isSocketOpen() == false) {
                    LOG.warn("trying to do i/o on a null socket for session:0x"
                             + Long.toHexString(sessionId));
    
                    return;
                }
                if (k.isReadable()) {
                    int rc = sock.read(incomingBuffer);
                    if (rc < 0) {
                        throw new EndOfStreamException(
                                "Unable to read additional data from client sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely client has closed socket");
                    }
                    if (incomingBuffer.remaining() == 0) {
                        boolean isPayload;
                        if (incomingBuffer == lenBuffer) { // start of next request
                            incomingBuffer.flip();
                            isPayload = readLength(k);
                            incomingBuffer.clear();
                        } else {
                            // continuation
                            isPayload = true;
                        }
                        if (isPayload) { // not the case for 4letterword
                            readPayload();
                        }
                        else {
                            // four letter words take care
                            // need not do anything else
                            return;
                        }
                    }
                }
                if (k.isWritable()) {
                    // ZooLog.logTraceMessage(LOG,
                    // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
                    // "outgoingBuffers.size() = " +
                    // outgoingBuffers.size());
                    if (outgoingBuffers.size() > 0) {
                        // ZooLog.logTraceMessage(LOG,
                        // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
                        // "sk " + k + " is valid: " +
                        // k.isValid());
    
                        /*
                         * This is going to reset the buffer position to 0 and the
                         * limit to the size of the buffer, so that we can fill it
                         * with data from the non-direct buffers that we need to
                         * send.
                         */
                        ByteBuffer directBuffer = factory.directBuffer;
                        directBuffer.clear();
    
                        for (ByteBuffer b : outgoingBuffers) {
                            if (directBuffer.remaining() < b.remaining()) {
                                /*
                                 * When we call put later, if the directBuffer is to
                                 * small to hold everything, nothing will be copied,
                                 * so we've got to slice the buffer if it's too big.
                                 */
                                b = (ByteBuffer) b.slice().limit(
                                        directBuffer.remaining());
                            }
                            /*
                             * put() is going to modify the positions of both
                             * buffers, put we don't want to change the position of
                             * the source buffers (we'll do that after the send, if
                             * needed), so we save and reset the position after the
                             * copy
                             */
                            int p = b.position();
                            directBuffer.put(b);
                            b.position(p);
                            if (directBuffer.remaining() == 0) {
                                break;
                            }
                        }
                        /*
                         * Do the flip: limit becomes position, position gets set to
                         * 0. This sets us up for the write.
                         */
                        directBuffer.flip();
    
                        int sent = sock.write(directBuffer);
                        ByteBuffer bb;
    
                        // Remove the buffers that we have sent
                        while (outgoingBuffers.size() > 0) {
                            bb = outgoingBuffers.peek();
                            if (bb == ServerCnxnFactory.closeConn) {
                                throw new CloseRequestException("close requested");
                            }
                            int left = bb.remaining() - sent;
                            if (left > 0) {
                                /*
                                 * We only partially sent this buffer, so we update
                                 * the position and exit the loop.
                                 */
                                bb.position(bb.position() + sent);
                                break;
                            }
                            packetSent();
                            /* We've sent the whole buffer, so drop the buffer */
                            sent -= bb.remaining();
                            outgoingBuffers.remove();
                        }
                        // ZooLog.logTraceMessage(LOG,
                        // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
                        // outgoingBuffers.size() = " + outgoingBuffers.size());
                    }
    
                    synchronized(this.factory){
                        if (outgoingBuffers.size() == 0) {
                            if (!initialized
                                    && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
                                throw new CloseRequestException("responded to info probe");
                            }
                            sk.interestOps(sk.interestOps()
                                    & (~SelectionKey.OP_WRITE));
                        } else {
                            sk.interestOps(sk.interestOps()
                                    | SelectionKey.OP_WRITE);
                        }
                    }
                }
            } catch (CancelledKeyException e) {
                LOG.warn("Exception causing close of session 0x"
                        + Long.toHexString(sessionId)
                        + " due to " + e);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("CancelledKeyException stack trace", e);
                }
                close();
            } catch (CloseRequestException e) {
                // expecting close to log session closure
                close();
            } catch (EndOfStreamException e) {
                LOG.warn("caught end of stream exception",e); // tell user why
    
                // expecting close to log session closure
                close();
            } catch (IOException e) {
                LOG.warn("Exception causing close of session 0x"
                        + Long.toHexString(sessionId)
                        + " due to " + e);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("IOException stack trace", e);
                }
                close();
            }
        }
    

    再来看看leader选举 ,这里首先对应又4种状态

    LOOKING,server进入主线程后,都处于LOOKING状态,lookForLeader

    LEADING,领导状态,本身自然就是leader了!

    FOLLOWING,跟随状态

    OBSERVING,观察者状态,不参与选举

    public enum ServerState {
        LOOKING, FOLLOWING, LEADING, OBSERVING;
    }
    

    startLeaderElection()-->createElectionAlgorithm(electionType)-->new FastLeaderElection(this, qcm);

    在QuorumPeer的run()中,依据state不同创建LeaderZooKeeperServer、FollowerZooKeeperServer、ObserverZooKeeperServer,如果是LOOKING则,lookForLeader

    LOOKING

    setCurrentVote(makeLEStrategy().lookForLeader());
    

    OBSERVING

    case OBSERVING:
        try {
            LOG.info("OBSERVING");
            setObserver(makeObserver(logFactory));
            observer.observeLeader();
        }
    

    FOLLOWING

    case FOLLOWING:
        try {
            LOG.info("FOLLOWING");
            setFollower(makeFollower(logFactory));
            follower.followLeader();
        } 
    

    LEADING

    case LEADING:
        LOG.info("LEADING");
        try {
            setLeader(makeLeader(logFactory));
            leader.lead();
            setLeader(null);
        } 
    

    makexx()

        protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
            return new Follower(this, new FollowerZooKeeperServer(logFactory, 
                    this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
        }
         
        protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
            return new Leader(this, new LeaderZooKeeperServer(logFactory,
                    this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
        }
        
        protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
            return new Observer(this, new ObserverZooKeeperServer(logFactory,
                    this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
        }
    
  • 相关阅读:
    jchdl
    jchdl
    UVa 10256 (判断两个凸包相离) The Great Divide
    UVa 11168 (凸包+点到直线距离) Airport
    LA 2572 (求可见圆盘的数量) Kanazawa
    UVa 10652 (简单凸包) Board Wrapping
    UVa 12304 (6个二维几何问题合集) 2D Geometry 110 in 1!
    UVa 10674 (求两圆公切线) Tangents
    UVa 11796 Dog Distance
    LA 3263 (平面图的欧拉定理) That Nice Euler Circuit
  • 原文地址:https://www.cnblogs.com/donganwangshi/p/4115966.html
Copyright © 2011-2022 走看看