zoukankan      html  css  js  c++  java
  • zookeeper 3.4.6启动流程粗略梳理

    zookeeper 3.4.6

    启动脚本里面

     nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}"     -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &    
    

    翻译过来之后太烦了,shit

    java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /usr/local/lu/zookeeper-3.4.6/bin/../build/classes:/usr/local/lu/zookeeper-3.4.6/bin/../build/lib/*.jar:/usr/local/lu/zookeeper-3.4.6/bin/../lib/slf4j-log4j12-1.6.1.jar:/usr/local/lu/zookeeper-3.4.6/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/lu/zookeeper-3.4.6/bin/../lib/netty-3.7.0.Final.jar:/usr/local/lu/zookeeper-3.4.6/bin/../lib/log4j-1.2.16.jar:/usr/local/lu/zookeeper-3.4.6/bin/../lib/jline-0.9.94.jar:/usr/local/lu/zookeeper-3.4.6/bin/../zookeeper-3.4.6.jar:/usr/local/lu/zookeeper-3.4.6/bin/../src/java/lib/*.jar:/usr/local/lu/zookeeper-3.4.6/bin/../conf:.:/data/java/jdk1.7.0_15/jre/lib/rt.jar:/data/java/jdk1.7.0_15/lib/dt.jar:/data/java/jdk1.7.0_15/lib/tools.jar -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain
    /usr/local/lu/zookeeper-3.4.6/bin/../conf/zoo.cfg
    

    这里看到启动入口是QuorumPeerMain,先看其main()函数关键代码

    QuorumPeerMain main = new QuorumPeerMain();
     main.initializeAndRun(args);
    

    initializeAndRun()这种会启动单例还是集群模式的zookeeper

        protected void initializeAndRun(String[] args)
            throws ConfigException, IOException
        {
            QuorumPeerConfig config = new QuorumPeerConfig();
            if (args.length == 1) {
                config.parse(args[0]);
            }
    
            // Start and schedule the the purge task
            DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                    .getDataDir(), config.getDataLogDir(), config
                    .getSnapRetainCount(), config.getPurgeInterval());
            purgeMgr.start();
    
            if (args.length == 1 && config.servers.size() > 0) {
                runFromConfig(config);
            } else {
                LOG.warn("Either no config or no quorum defined in config, running "
                        + " in standalone mode");
                // there is only server in the quorum -- run as standalone
                ZooKeeperServerMain.main(args);
            }
        }
    

    这里如果是集群模式走runFromConfig,首先注册Mxbean

    ManagedUtil.registerLog4jMBeans();
    

    然后是初始化连接工厂,默认使用NIO,这里是NIOServerCnxnFactory

    ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
              cnxnFactory.configure(config.getClientPortAddress(),
                                    config.getMaxClientCnxns());
    
       static public ServerCnxnFactory createFactory() throws IOException {
            String serverCnxnFactoryName =
                System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
            if (serverCnxnFactoryName == null) {
                serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
            }
            try {
                return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                                                    .newInstance();
            } catch (Exception e) {
                IOException ioe = new IOException("Couldn't instantiate "
                        + serverCnxnFactoryName);
                ioe.initCause(e);
                throw ioe;
            }
        }
    

    z之后初始化QuorumPeer,进行相应的配置,最后启动QuorumPeer

        ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
              cnxnFactory.configure(config.getClientPortAddress(),
                                    config.getMaxClientCnxns());
      
              quorumPeer = new QuorumPeer();
              quorumPeer.setClientPortAddress(config.getClientPortAddress());
              quorumPeer.setTxnFactory(new FileTxnSnapLog(
                          new File(config.getDataLogDir()),
                          new File(config.getDataDir())));
              quorumPeer.setQuorumPeers(config.getServers());
              quorumPeer.setElectionType(config.getElectionAlg());
              quorumPeer.setMyid(config.getServerId());
              quorumPeer.setTickTime(config.getTickTime());
              quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
              quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
              quorumPeer.setInitLimit(config.getInitLimit());
              quorumPeer.setSyncLimit(config.getSyncLimit());
              quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
              quorumPeer.setCnxnFactory(cnxnFactory);
              quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
              quorumPeer.setLearnerType(config.getPeerType());
              quorumPeer.setSyncEnabled(config.getSyncEnabled());
              quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
      
              quorumPeer.start();
              quorumPeer.join();
    

    ServerCnxnFactory.configure()这里配置ServerSocketChannel

      public void configure(InetSocketAddress addr, int maxcc) throws IOException {
            configureSaslLogin();
    
            thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
            thread.setDaemon(true);
            maxClientCnxns = maxcc;
            this.ss = ServerSocketChannel.open();
            ss.socket().setReuseAddress(true);
            LOG.info("binding to port " + addr);
            ss.socket().bind(addr);
            ss.configureBlocking(false);
            ss.register(selector, SelectionKey.OP_ACCEPT);
        }
    

    QuorumPeer.start()

        public synchronized void start() {
            loadDataBase();
            cnxnFactory.start();        
            startLeaderElection();
            super.start();
        }
    

    1,首先加载数据 loadDataBase();

    2,启动NIO线程;

    3,启动leader选举startLeaderElection();

    首先看从本地加载数据loadDataBase();

        public long loadDataBase() throws IOException {
            PlayBackListener listener=new PlayBackListener(){
                public void onTxnLoaded(TxnHeader hdr,Record txn){
                    Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
                            null, null);
                    r.txn = txn;
                    r.hdr = hdr;
                    r.zxid = hdr.getZxid();
                    addCommittedProposal(r);
                }
            };
            
            long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
            initialized = true;
            return zxid;
        }
    
    

    从zk的事务日志snapLog中恢复

     long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
    

    之后会回调PlayBackListener的onTxnLoaded来onTxnLoaded提交Proposal(addCommittedProposal())

      public long restore(DataTree dt, Map<Long, Integer> sessions, 
                PlayBackListener listener) throws IOException {
            snapLog.deserialize(dt, sessions);
            FileTxnLog txnLog = new FileTxnLog(dataDir);
            TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
            long highestZxid = dt.lastProcessedZxid;
            TxnHeader hdr;
            try {
                while (true) {
                    // iterator points to 
                    // the first valid txn when initialized
                    hdr = itr.getHeader();
                    if (hdr == null) {
                        //empty logs 
                        return dt.lastProcessedZxid;
                    }
                    if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                        LOG.error("{}(higestZxid) > {}(next log) for type {}",
                                new Object[] { highestZxid, hdr.getZxid(),
                                        hdr.getType() });
                    } else {
                        highestZxid = hdr.getZxid();
                    }
                    try {
                        processTransaction(hdr,dt,sessions, itr.getTxn());
                    } catch(KeeperException.NoNodeException e) {
                       throw new IOException("Failed to process transaction type: " +
                             hdr.getType() + " error: " + e.getMessage(), e);
                    }
                    listener.onTxnLoaded(hdr, itr.getTxn());
                    if (!itr.next()) 
                        break;
                }
            } finally {
                if (itr != null) {
                    itr.close();
                }
            }
            return highestZxid;
        }
    

    启动NIOServerCnxnFactory的start()

        public void start() {
            // ensure thread is started once and only once
            if (thread.getState() == Thread.State.NEW) {
                thread.start();
            }
        }
    

    这里的thread其实就是NIOServerCnxnFactory

    thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
    thread.setDaemon(true);
    

    然后来到run()循环

        public void run() {
            while (!ss.socket().isClosed()) {
                try {
                    selector.select(1000);
                    Set<SelectionKey> selected;
                    synchronized (this) {
                        selected = selector.selectedKeys();
                    }
                    ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                            selected);
                    Collections.shuffle(selectedList);
                    for (SelectionKey k : selectedList) {
                        if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                            SocketChannel sc = ((ServerSocketChannel) k
                                    .channel()).accept();
                            InetAddress ia = sc.socket().getInetAddress();
                            int cnxncount = getClientCnxnCount(ia);
                            if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                                LOG.warn("Too many connections from " + ia
                                         + " - max is " + maxClientCnxns );
                                sc.close();
                            } else {
                                LOG.info("Accepted socket connection from "
                                         + sc.socket().getRemoteSocketAddress());
                                sc.configureBlocking(false);
                                SelectionKey sk = sc.register(selector,
                                        SelectionKey.OP_READ);
                                NIOServerCnxn cnxn = createConnection(sc, sk);
                                sk.attach(cnxn);
                                addCnxn(cnxn);
                            }
                        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                            NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                            c.doIO(k);
                        } else {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Unexpected ops in select "
                                          + k.readyOps());
                            }
                        }
                    }
                    selected.clear();
                } catch (RuntimeException e) {
                    LOG.warn("Ignoring unexpected runtime exception", e);
                } catch (Exception e) {
                    LOG.warn("Ignoring exception", e);
                }
            }
            closeAll();
            LOG.info("NIOServerCnxn factory exited run method");
        }
    

    监听IO,阻塞等待数据到来(1s)

    selector.select(1000);
                    Set<SelectionKey> selected;
                    synchronized (this) {
                        selected = selector.selectedKeys();
                    }
                    ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                            selected);
                    Collections.shuffle(selectedList);
    

    如果是连接,则accept,生成则在selector中注册SelectionKey.OP_READ事件 ,在没有达到maxClientCnxns时会创建一个连接createConnection,保存在set中

            LOG.info("Accepted socket connection from "
                                         + sc.socket().getRemoteSocketAddress());
                                sc.configureBlocking(false);
                                SelectionKey sk = sc.register(selector,
                                        SelectionKey.OP_READ);
                                NIOServerCnxn cnxn = createConnection(sc, sk);
                                sk.attach(cnxn);
                                addCnxn(cnxn);
    

    处理OP_READ,OP_WRITE 读写时间

    if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                            NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                            c.doIO(k);
                        }
    
    /**
         * Handles read/write IO on connection.
         */
        void doIO(SelectionKey k) throws InterruptedException {
            try {
                if (isSocketOpen() == false) {
                    LOG.warn("trying to do i/o on a null socket for session:0x"
                             + Long.toHexString(sessionId));
    
                    return;
                }
                if (k.isReadable()) {
                    int rc = sock.read(incomingBuffer);
                    if (rc < 0) {
                        throw new EndOfStreamException(
                                "Unable to read additional data from client sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely client has closed socket");
                    }
                    if (incomingBuffer.remaining() == 0) {
                        boolean isPayload;
                        if (incomingBuffer == lenBuffer) { // start of next request
                            incomingBuffer.flip();
                            isPayload = readLength(k);
                            incomingBuffer.clear();
                        } else {
                            // continuation
                            isPayload = true;
                        }
                        if (isPayload) { // not the case for 4letterword
                            readPayload();
                        }
                        else {
                            // four letter words take care
                            // need not do anything else
                            return;
                        }
                    }
                }
                if (k.isWritable()) {
                    // ZooLog.logTraceMessage(LOG,
                    // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
                    // "outgoingBuffers.size() = " +
                    // outgoingBuffers.size());
                    if (outgoingBuffers.size() > 0) {
                        // ZooLog.logTraceMessage(LOG,
                        // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
                        // "sk " + k + " is valid: " +
                        // k.isValid());
    
                        /*
                         * This is going to reset the buffer position to 0 and the
                         * limit to the size of the buffer, so that we can fill it
                         * with data from the non-direct buffers that we need to
                         * send.
                         */
                        ByteBuffer directBuffer = factory.directBuffer;
                        directBuffer.clear();
    
                        for (ByteBuffer b : outgoingBuffers) {
                            if (directBuffer.remaining() < b.remaining()) {
                                /*
                                 * When we call put later, if the directBuffer is to
                                 * small to hold everything, nothing will be copied,
                                 * so we've got to slice the buffer if it's too big.
                                 */
                                b = (ByteBuffer) b.slice().limit(
                                        directBuffer.remaining());
                            }
                            /*
                             * put() is going to modify the positions of both
                             * buffers, put we don't want to change the position of
                             * the source buffers (we'll do that after the send, if
                             * needed), so we save and reset the position after the
                             * copy
                             */
                            int p = b.position();
                            directBuffer.put(b);
                            b.position(p);
                            if (directBuffer.remaining() == 0) {
                                break;
                            }
                        }
                        /*
                         * Do the flip: limit becomes position, position gets set to
                         * 0. This sets us up for the write.
                         */
                        directBuffer.flip();
    
                        int sent = sock.write(directBuffer);
                        ByteBuffer bb;
    
                        // Remove the buffers that we have sent
                        while (outgoingBuffers.size() > 0) {
                            bb = outgoingBuffers.peek();
                            if (bb == ServerCnxnFactory.closeConn) {
                                throw new CloseRequestException("close requested");
                            }
                            int left = bb.remaining() - sent;
                            if (left > 0) {
                                /*
                                 * We only partially sent this buffer, so we update
                                 * the position and exit the loop.
                                 */
                                bb.position(bb.position() + sent);
                                break;
                            }
                            packetSent();
                            /* We've sent the whole buffer, so drop the buffer */
                            sent -= bb.remaining();
                            outgoingBuffers.remove();
                        }
                        // ZooLog.logTraceMessage(LOG,
                        // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
                        // outgoingBuffers.size() = " + outgoingBuffers.size());
                    }
    
                    synchronized(this.factory){
                        if (outgoingBuffers.size() == 0) {
                            if (!initialized
                                    && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
                                throw new CloseRequestException("responded to info probe");
                            }
                            sk.interestOps(sk.interestOps()
                                    & (~SelectionKey.OP_WRITE));
                        } else {
                            sk.interestOps(sk.interestOps()
                                    | SelectionKey.OP_WRITE);
                        }
                    }
                }
            } catch (CancelledKeyException e) {
                LOG.warn("Exception causing close of session 0x"
                        + Long.toHexString(sessionId)
                        + " due to " + e);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("CancelledKeyException stack trace", e);
                }
                close();
            } catch (CloseRequestException e) {
                // expecting close to log session closure
                close();
            } catch (EndOfStreamException e) {
                LOG.warn("caught end of stream exception",e); // tell user why
    
                // expecting close to log session closure
                close();
            } catch (IOException e) {
                LOG.warn("Exception causing close of session 0x"
                        + Long.toHexString(sessionId)
                        + " due to " + e);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("IOException stack trace", e);
                }
                close();
            }
        }
    

    再来看看leader选举 ,这里首先对应又4种状态

    LOOKING,server进入主线程后,都处于LOOKING状态,lookForLeader

    LEADING,领导状态,本身自然就是leader了!

    FOLLOWING,跟随状态

    OBSERVING,观察者状态,不参与选举

    public enum ServerState {
        LOOKING, FOLLOWING, LEADING, OBSERVING;
    }
    

    startLeaderElection()-->createElectionAlgorithm(electionType)-->new FastLeaderElection(this, qcm);

    在QuorumPeer的run()中,依据state不同创建LeaderZooKeeperServer、FollowerZooKeeperServer、ObserverZooKeeperServer,如果是LOOKING则,lookForLeader

    LOOKING

    setCurrentVote(makeLEStrategy().lookForLeader());
    

    OBSERVING

    case OBSERVING:
        try {
            LOG.info("OBSERVING");
            setObserver(makeObserver(logFactory));
            observer.observeLeader();
        }
    

    FOLLOWING

    case FOLLOWING:
        try {
            LOG.info("FOLLOWING");
            setFollower(makeFollower(logFactory));
            follower.followLeader();
        } 
    

    LEADING

    case LEADING:
        LOG.info("LEADING");
        try {
            setLeader(makeLeader(logFactory));
            leader.lead();
            setLeader(null);
        } 
    

    makexx()

        protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
            return new Follower(this, new FollowerZooKeeperServer(logFactory, 
                    this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
        }
         
        protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
            return new Leader(this, new LeaderZooKeeperServer(logFactory,
                    this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
        }
        
        protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
            return new Observer(this, new ObserverZooKeeperServer(logFactory,
                    this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
        }
    
  • 相关阅读:
    opencv访问图像像素
    利用chrome浏览器的proxy switchy扩展智能切换代理
    一些linux命令【ubuntu】
    如何从 Ubuntu 10.04 升级到 10.10
    关于“无法获得排它锁 ”的解决办法
    【ubuntu】Grub2配置详解(转)
    BibTeX使用介绍
    【ubuntu】imagemagick用法
    ubuntu环境下安装OpenCV
    安装wordpress
  • 原文地址:https://www.cnblogs.com/donganwangshi/p/4115966.html
Copyright © 2011-2022 走看看