zoukankan      html  css  js  c++  java
  • Zookeeper(一):单机模式的启动逻辑

      zk用处如此之多,以至于每个地方都要你理解zk原理!

    请按如下操作姿势打开:

      1. 打开zk的git仓库地址: https://github.com/apache/zookeeper , 确认过眼神,它就是你要找有人!
      2. 下载源码到本地,下载 ant 工具到本地,(如果还没下载的话: http://ant.apache.org/)!
      3. 运行 ant 脚本,使生成需要的环境: ant eclipse !(可能会花费几分钟的时间)
      4. idea 打开源码,导入必要包!
      5. 运行源码main() 方法,启动 zk服务端,注意添加运行时配置文件!
      6. 分析源码,学习中!

    首先,从启动脚本入口:(zkServer.sh)

        #!/usr.bin.env bash
    
        # Licensed to the Apache Software Foundation (ASF) under one or more
        # contributor license agreements.  See the NOTICE file distributed with
        # this work for additional information regarding copyright ownership.
        # The ASF licenses this file to You under the Apache License, Version 2.0
        # (the "License"); you may not use this file except in compliance with
        # the License.  You may obtain a copy of the License at
        #
        #     http://www.apache.org.licenses.LICENSE-2.0
        #
        # Unless required by applicable law or agreed to in writing, software
        # distributed under the License is distributed on an "AS IS" BASIS,
        # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        # See the License for the specific language governing permissions and
        # limitations under the License.
    
        #
        # If this scripted is run out of /usr.bin or some other system bin directory
        # it should be linked to and not copied. Things like java jar files are found
        # relative to the canonical path of this script.
        #
    
    
    
        # use POSTIX interface, symlink is followed automatically
        ZOOBIN="${BASH_SOURCE-$0}"
        ZOOBIN="$(dirname "${ZOOBIN}")"
        ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
    
        if [ -e "$ZOOBIN/../libexec.zkEnv.sh" ]; then
          . "$ZOOBINDIR/../libexec.zkEnv.sh"
        else
          . "$ZOOBINDIR.zkEnv.sh"
        fi
    
        # See the following page for extensive details on setting
        # up the JVM to accept JMX remote management:
        # http://java.sun.com.javase/6/docs.technotes.guides.management.agent.html
        # by default we allow local JMX connections
        if [ "x$JMXLOCALONLY" = "x" ]
        then
            JMXLOCALONLY=false
        fi
    
        if [ "x$JMXDISABLE" = "x" ] || [ "$JMXDISABLE" = 'false' ]
        then
          echo "ZooKeeper JMX enabled by default" >&2
          if [ "x$JMXPORT" = "x" ]
          then
            # for some reason these two options are necessary on jdk6 on Ubuntu
            #   accord to the docs they are not necessary, but otw jconsole cannot
            #   do a local attach
            ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
          else
            if [ "x$JMXAUTH" = "x" ]
            then
              JMXAUTH=false
            fi
            if [ "x$JMXSSL" = "x" ]
            then
              JMXSSL=false
            fi
            if [ "x$JMXLOG4J" = "x" ]
            then
              JMXLOG4J=true
            fi
            echo "ZooKeeper remote JMX Port set to $JMXPORT" >&2
            echo "ZooKeeper remote JMX authenticate set to $JMXAUTH" >&2
            echo "ZooKeeper remote JMX ssl set to $JMXSSL" >&2
            echo "ZooKeeper remote JMX log4j set to $JMXLOG4J" >&2
            ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"
          fi
        else
            echo "JMX disabled by user request" >&2
            ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
        fi
    
        if [ "x$SERVER_JVMFLAGS"  != "x" ]
        then
            JVMFLAGS="$SERVER_JVMFLAGS $JVMFLAGS"
        fi
    
        if [ "x$2" != "x" ]
        then
            ZOOCFG="$ZOOCFGDIR/$2"
        fi
    
        # if we give a more complicated path to the config, don't screw around in $ZOOCFGDIR
        if [ "x$(dirname "$ZOOCFG")" != "x$ZOOCFGDIR" ]
        then
            ZOOCFG="$2"
        fi
    
        if $cygwin
        then
            ZOOCFG=`cygpath -wp "$ZOOCFG"`
            # cygwin has a "kill" in the shell itself, gets confused
            KILL=/bin.kill
        else
            KILL=kill
        fi
    
        echo "Using config: $ZOOCFG" >&2
    
        case "$OSTYPE" in
        *solaris*)
          GREP=/usr.xpg4/bin.grep
          ;;
        *)
          GREP=grep
          ;;
        esac
        if [ -z "$ZOOPIDFILE" ]; then
            ZOO_DATADIR="$($GREP "^[[:space:]]*dataDir" "$ZOOCFG" | sed -e 's/.*=//')"
            if [ ! -d "$ZOO_DATADIR" ]; then
                mkdir -p "$ZOO_DATADIR"
            fi
            ZOOPIDFILE="$ZOO_DATADIR.zookeeper_server.pid"
        else
            # ensure it exists, otw stop will fail
            mkdir -p "$(dirname "$ZOOPIDFILE")"
        fi
    
        if [ ! -w "$ZOO_LOG_DIR" ] ; then
        mkdir -p "$ZOO_LOG_DIR"
        fi
    
        _ZOO_DAEMON_OUT="$ZOO_LOG_DIR.zookeeper.out"
    
        case $1 in
        start)
            echo  -n "Starting zookeeper ... "
            if [ -f "$ZOOPIDFILE" ]; then
              if kill -0 `cat "$ZOOPIDFILE"` > /dev.null 2>&1; then
                 echo $command already running as process `cat "$ZOOPIDFILE"`. 
                 exit 0
              fi
            fi
            nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" 
            -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev.null &
            if [ $? -eq 0 ]
            then
              case "$OSTYPE" in
              *solaris*)
                /bin.echo "${!}\c" > "$ZOOPIDFILE"
                ;;
              *)
                /bin.echo -n $! > "$ZOOPIDFILE"
                ;;
              esac
              if [ $? -eq 0 ];
              then
                sleep 1
                echo STARTED
              else
                echo FAILED TO WRITE PID
                exit 1
              fi
            else
              echo SERVER DID NOT START
              exit 1
            fi
            ;;
        start-foreground)
            ZOO_CMD=(exec "$JAVA")
            if [ "${ZOO_NOEXEC}" != "" ]; then
              ZOO_CMD=("$JAVA")
            fi
            "${ZOO_CMD[@]}" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" 
            -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG"
            ;;
        print-cmd)
            echo ""$JAVA" -Dzookeeper.log.dir="${ZOO_LOG_DIR}" -Dzookeeper.root.logger="${ZOO_LOG4J_PROP}" -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev.null"
            ;;
        stop)
            echo -n "Stopping zookeeper ... "
            if [ ! -f "$ZOOPIDFILE" ]
            then
              echo "no zookeeper to stop (could not find file $ZOOPIDFILE)"
            else
              $KILL -9 $(cat "$ZOOPIDFILE")
              rm "$ZOOPIDFILE"
              echo STOPPED
            fi
            exit 0
            ;;
        upgrade)
            shift
            echo "upgrading the servers to 3.*"
            "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" 
            -cp "$CLASSPATH" $JVMFLAGS org.apache.zookeeper.server.upgrade.UpgradeMain ${@}
            echo "Upgrading ... "
            ;;
        restart)
            shift
            "$0" stop ${@}
            sleep 3
            "$0" start ${@}
            ;;
        status)
            # -q is necessary on some versions of linux where nc returns too quickly, and no stat result is output
            clientPortAddress=`$GREP "^[[:space:]]*clientPortAddress[^[:alpha:]]" "$ZOOCFG" | sed -e 's/.*=//'`
            if ! [ $clientPortAddress ]
            then
            clientPortAddress="localhost"
            fi
            clientPort=`$GREP "^[[:space:]]*clientPort[^[:alpha:]]" "$ZOOCFG" | sed -e 's/.*=//'`
            STAT=`"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" 
                     -cp "$CLASSPATH" $JVMFLAGS org.apache.zookeeper.client.FourLetterWordMain 
                     $clientPortAddress $clientPort srvr 2> /dev.null    
                  | $GREP Mode`
            if [ "x$STAT" = "x" ]
            then
                echo "Error contacting service. It is probably not running."
                exit 1
            else
                echo $STAT
                exit 0
            fi
            ;;
        *)
            echo "Usage: $0 {start|start-foreground|stop|restart|status|upgrade|print-cmd}" >&2
    
        esac
    View Code

      主要看下启动的脚本:

        nohup java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /etc.zookeeper-3.4.13/bin/../build.classes:/etc.zookeeper-3.4.13/bin/../build.lib/*.jar:/etc.zookeeper-3.4.13/bin/../lib.slf4j-log4j12-1.7.25.jar:/etc.zookeeper-3.4.13/bin/../lib.slf4j-api-1.7.25.jar:/etc.zookeeper-3.4.13/bin/../lib.netty-3.10.6.Final.jar:/etc.zookeeper-3.4.13/bin/../lib.log4j-1.2.17.jar:/etc.zookeeper-3.4.13/bin/../lib.jline-0.9.94.jar:/etc.zookeeper-3.4.13/bin/../lib.audience-annotations-0.5.0.jar:/etc.zookeeper-3.4.13/bin/../zookeeper-3.4.13.jar:/etc.zookeeper-3.4.13/bin/../src.java.lib/*.jar:/etc.zookeeper-3.4.13/bin/../conf:  -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /opt.zookeeper.zoo1.cfg > ./zookeeper.out 2>&1 < /dev.null &

      可以看到, org.apache.zookeeper.server.quorum.QuorumPeerMain 是启动类, 因此找到这个方法:

        /**
         * To start the replicated server specify the configuration file name on
         * the command line.
         * @param args path to the configfile
         */
        public static void main(String[] args) {
            QuorumPeerMain main = new QuorumPeerMain();
            try {
                main.initializeAndRun(args);
            } catch (IllegalArgumentException e) {
                LOG.error("Invalid arguments, exiting abnormally", e);
                LOG.info(USAGE);
                System.err.println(USAGE);
                System.exit(ExitCode.INVALID_INVOCATION.getValue());
            } catch (ConfigException e) {
                LOG.error("Invalid config, exiting abnormally", e);
                System.err.println("Invalid config, exiting abnormally");
                System.exit(ExitCode.INVALID_INVOCATION.getValue());
            } catch (DatadirException e) {
                LOG.error("Unable to access datadir, exiting abnormally", e);
                System.err.println("Unable to access datadir, exiting abnormally");
                System.exit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue());
            } catch (AdminServerException e) {
                LOG.error("Unable to start AdminServer, exiting abnormally", e);
                System.err.println("Unable to start AdminServer, exiting abnormally");
                System.exit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue());
            } catch (Exception e) {
                LOG.error("Unexpected exception, exiting abnormally", e);
                System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
            }
            LOG.info("Exiting normally");
            System.exit(ExitCode.EXECUTION_FINISHED.getValue());
        }
    
        protected void initializeAndRun(String[] args)
            throws ConfigException, IOException, AdminServerException
        {
            QuorumPeerConfig config = new QuorumPeerConfig();
            if (args.length == 1) {
                config.parse(args[0]);
            }
    
            // Start and schedule the the purge task
            DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                    .getDataDir(), config.getDataLogDir(), config
                    .getSnapRetainCount(), config.getPurgeInterval());
            // 启动后台清理线程
            // org.apache.zookeeper.server.DatadirCleanupManager
            purgeMgr.start();
    
            if (args.length == 1 && config.isDistributed()) {
                // 如果是集群模式,则走
                runFromConfig(config);
            } else {
                // 单机模式运行,咱们先看单机模式,后续再深入到集群模式吧
                LOG.warn("Either no config or no quorum defined in config, running "
                        + " in standalone mode");
                // there is only server in the quorum -- run as standalone
                ZooKeeperServerMain.main(args);
            }
        }

      以上过程, 主要就是进行初始化,然后捕获各种异常!包括对种应用参数,配置的异常检测!

    其中,对单机模式的处理,则是直接转发给了 ZooKeeperServerMain.main() 处理。解析配置文件的过程大致如下:
    对单机模式的运行,直接调用 ZooKeeperServerMain, 即可!

        // org.apache.zookeeper.server.ZooKeeperServerMain
        /*
         * Start up the ZooKeeper server.
         *
         * @param args the configfile or the port datadir [ticktime]
         */
        public static void main(String[] args) {
            ZooKeeperServerMain main = new ZooKeeperServerMain();
            try {
                main.initializeAndRun(args);
            } catch (IllegalArgumentException e) {
                LOG.error("Invalid arguments, exiting abnormally", e);
                LOG.info(USAGE);
                System.err.println(USAGE);
                System.exit(ExitCode.INVALID_INVOCATION.getValue());
            } catch (ConfigException e) {
                LOG.error("Invalid config, exiting abnormally", e);
                System.err.println("Invalid config, exiting abnormally");
                System.exit(ExitCode.INVALID_INVOCATION.getValue());
            } catch (DatadirException e) {
                LOG.error("Unable to access datadir, exiting abnormally", e);
                System.err.println("Unable to access datadir, exiting abnormally");
                System.exit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue());
            } catch (AdminServerException e) {
                LOG.error("Unable to start AdminServer, exiting abnormally", e);
                System.err.println("Unable to start AdminServer, exiting abnormally");
                System.exit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue());
            } catch (Exception e) {
                LOG.error("Unexpected exception, exiting abnormally", e);
                System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
            }
            LOG.info("Exiting normally");
            System.exit(ExitCode.EXECUTION_FINISHED.getValue());
        }
    
        // 单机模式初始化方法
        protected void initializeAndRun(String[] args)
            throws ConfigException, IOException, AdminServerException
        {
            try {
                ManagedUtil.registerLog4jMBeans();
            } catch (JMException e) {
                LOG.warn("Unable to register log4j JMX control", e);
            }
    
            ServerConfig config = new ServerConfig();
            if (args.length == 1) {
                config.parse(args[0]);
            } else {
                config.parse(args);
            }
    
            // 主要是看这个运行过程
            runFromConfig(config);
        }

      从上面单机和集群模式的启动框架来看,大概流程都是一样的,都是先把配置文件解析出来,然后再启动自己的逻辑。另外,在集群方法中解析出的参数,需要的单机模式重新再解析一次,以做启动模式的兼容性! 

      集群使用的是 QuorumPeerConfig 解析,而单机则是使用 ServerConfig 来解析!

      不过单机模式的配置解析仍然委托于集群方式的解析,如下: ServerConfig.parse();

        // org.apache.zookeeper.server.ServerConfig
        /**
         * Parse a ZooKeeper configuration file
         * @param path the patch of the configuration file
         * @return ServerConfig configured wrt arguments
         * @throws ConfigException error processing configuration
         */
        public void parse(String path) throws ConfigException {
            // 直接交由 QuorumPeerConfig 解析
            QuorumPeerConfig config = new QuorumPeerConfig();
            config.parse(path);
    
            // let qpconfig parse the file and then pull the stuff we are
            // interested in
            // 然后读取必要的参数即可, 这即是单机和集群模式配置的差别
            readFrom(config);
        }
        
        /**
         * Read attributes from a QuorumPeerConfig.
         * @param config
         */
        public void readFrom(QuorumPeerConfig config) {
            clientPortAddress = config.getClientPortAddress();
            secureClientPortAddress = config.getSecureClientPortAddress();
            dataDir = config.getDataDir();
            dataLogDir = config.getDataLogDir();
            tickTime = config.getTickTime();
            maxClientCnxns = config.getMaxClientCnxns();
            minSessionTimeout = config.getMinSessionTimeout();
            maxSessionTimeout = config.getMaxSessionTimeout();
            metricsProviderClassName = config.getMetricsProviderClassName();
            metricsProviderConfiguration = config.getMetricsProviderConfiguration();
        }

      所以,咱们还是简单看下集群下配置文件都是怎么解析的吧!

        // org.apache.zookeeper.server.quorum.QuorumPeerConfig
        /**
         * Parse a ZooKeeper configuration file
         * @param path the patch of the configuration file
         * @throws ConfigException error processing configuration
         */
        public void parse(String path) throws ConfigException {
            LOG.info("Reading configuration from: " + path);
           
            try {
                // 使用建造者模式,生成配置文件对象
                File configFile = (new VerifyingFileFactory.Builder(LOG)
                    .warnForRelativePath()
                    .failForNonExistingPath()
                    .build()).create(path);
                    
                Properties cfg = new Properties();
                FileInputStream in = new FileInputStream(configFile);
                try {
                    cfg.load(in);
                    configFileStr = path;
                } finally {
                    in.close();
                }
                
                // 解析配置属性到各字段域中,从这里我们也可以看到 zk 支持的所有配置项,如下文所示
                parseProperties(cfg);
            } catch (IOException e) {
                throw new ConfigException("Error processing " + path, e);
            } catch (IllegalArgumentException e) {
                throw new ConfigException("Error processing " + path, e);
            }   
            
            if (dynamicConfigFileStr!=null) {
               try {
                    // 对集群模式,则初始化集群配置
                   Properties dynamicCfg = new Properties();
                   FileInputStream inConfig = new FileInputStream(dynamicConfigFileStr);
                   try {
                       dynamicCfg.load(inConfig);
                       if (dynamicCfg.getProperty("version") != null) {
                           throw new ConfigException("dynamic file shouldn't have version inside");
                       }
    
                       String version = getVersionFromFilename(dynamicConfigFileStr);
                       // If there isn't any version associated with the filename,
                       // the default version is 0.
                       if (version != null) {
                           dynamicCfg.setProperty("version", version);
                       }
                   } finally {
                       inConfig.close();
                   }
                   setupQuorumPeerConfig(dynamicCfg, false);
    
               } catch (IOException e) {
                   throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
               } catch (IllegalArgumentException e) {
                   throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
               }        
               File nextDynamicConfigFile = new File(configFileStr + nextDynamicConfigFileSuffix);
               if (nextDynamicConfigFile.exists()) {
                   try {           
                       Properties dynamicConfigNextCfg = new Properties();
                       FileInputStream inConfigNext = new FileInputStream(nextDynamicConfigFile);       
                       try {
                           dynamicConfigNextCfg.load(inConfigNext);
                       } finally {
                           inConfigNext.close();
                       }
                       boolean isHierarchical = false;
                       for (Entry<Object, Object> entry : dynamicConfigNextCfg.entrySet()) {
                           String key = entry.getKey().toString().trim();  
                           if (key.startsWith("group") || key.startsWith("weight")) {
                               isHierarchical = true;
                               break;
                           }
                       }
                       lastSeenQuorumVerifier = createQuorumVerifier(dynamicConfigNextCfg, isHierarchical);
                   } catch (IOException e) {
                       LOG.warn("NextQuorumVerifier is initiated to null");
                   }
               }
            }
        }
        // 对各属性的解析,从这里我们可以看到zk到底支持几个属性配置    
        /**
         * Parse config from a Properties.
         * @param zkProp Properties to parse from.
         * @throws IOException
         * @throws ConfigException
         */
        public void parseProperties(Properties zkProp)
        throws IOException, ConfigException {
            int clientPort = 0;
            int secureClientPort = 0;
            String clientPortAddress = null;
            String secureClientPortAddress = null;
            VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
            for (Entry<Object, Object> entry : zkProp.entrySet()) {
                String key = entry.getKey().toString().trim();
                String value = entry.getValue().toString().trim();
                if (key.equals("dataDir")) {
                    dataDir = vff.create(value);
                } else if (key.equals("dataLogDir")) {
                    dataLogDir = vff.create(value);
                } else if (key.equals("clientPort")) {
                    clientPort = Integer.parseInt(value);
                } else if (key.equals("localSessionsEnabled")) {
                    localSessionsEnabled = Boolean.parseBoolean(value);
                } else if (key.equals("localSessionsUpgradingEnabled")) {
                    localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
                } else if (key.equals("clientPortAddress")) {
                    clientPortAddress = value.trim();
                } else if (key.equals("secureClientPort")) {
                    secureClientPort = Integer.parseInt(value);
                } else if (key.equals("secureClientPortAddress")){
                    secureClientPortAddress = value.trim();
                } else if (key.equals("tickTime")) {
                    tickTime = Integer.parseInt(value);
                } else if (key.equals("maxClientCnxns")) {
                    maxClientCnxns = Integer.parseInt(value);
                } else if (key.equals("minSessionTimeout")) {
                    minSessionTimeout = Integer.parseInt(value);
                } else if (key.equals("maxSessionTimeout")) {
                    maxSessionTimeout = Integer.parseInt(value);
                } else if (key.equals("initLimit")) {
                    initLimit = Integer.parseInt(value);
                } else if (key.equals("syncLimit")) {
                    syncLimit = Integer.parseInt(value);
                } else if (key.equals("electionAlg")) {
                    electionAlg = Integer.parseInt(value);
                    if (electionAlg != 1 && electionAlg != 2 && electionAlg != 3) {
                        throw new ConfigException("Invalid electionAlg value. Only 1, 2, 3 are supported.");
                    }
                } else if (key.equals("quorumListenOnAllIPs")) {
                    quorumListenOnAllIPs = Boolean.parseBoolean(value);
                } else if (key.equals("peerType")) {
                    if (value.toLowerCase().equals("observer")) {
                        peerType = LearnerType.OBSERVER;
                    } else if (value.toLowerCase().equals("participant")) {
                        peerType = LearnerType.PARTICIPANT;
                    } else
                    {
                        throw new ConfigException("Unrecognised peertype: " + value);
                    }
                } else if (key.equals( "syncEnabled" )) {
                    syncEnabled = Boolean.parseBoolean(value);
                } else if (key.equals("dynamicConfigFile")){
                    dynamicConfigFileStr = value;
                } else if (key.equals("autopurge.snapRetainCount")) {
                    snapRetainCount = Integer.parseInt(value);
                } else if (key.equals("autopurge.purgeInterval")) {
                    purgeInterval = Integer.parseInt(value);
                } else if (key.equals("standaloneEnabled")) {
                    if (value.toLowerCase().equals("true")) {
                        setStandaloneEnabled(true);
                    } else if (value.toLowerCase().equals("false")) {
                        setStandaloneEnabled(false);
                    } else {
                        throw new ConfigException("Invalid option " + value + " for standalone mode. Choose 'true' or 'false.'");
                    }
                } else if (key.equals("reconfigEnabled")) {
                    if (value.toLowerCase().equals("true")) {
                        setReconfigEnabled(true);
                    } else if (value.toLowerCase().equals("false")) {
                        setReconfigEnabled(false);
                    } else {
                        throw new ConfigException("Invalid option " + value + " for reconfigEnabled flag. Choose 'true' or 'false.'");
                    }
                } else if (key.equals("sslQuorum")){
                    sslQuorum = Boolean.parseBoolean(value);
    // TODO: UnifiedServerSocket is currently buggy, will be fixed when @ivmaykov's PRs are merged. Disable port unification until then.
    //            } else if (key.equals("portUnification")){
    //                shouldUsePortUnification = Boolean.parseBoolean(value);
                } else if ((key.startsWith("server.") || key.startsWith("group") || key.startsWith("weight")) && zkProp.containsKey("dynamicConfigFile")) {
                    throw new ConfigException("parameter: " + key + " must be in a separate dynamic config file");
                } else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) {
                    quorumEnableSasl = Boolean.parseBoolean(value);
                } else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED)) {
                    quorumServerRequireSasl = Boolean.parseBoolean(value);
                } else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED)) {
                    quorumLearnerRequireSasl = Boolean.parseBoolean(value);
                } else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT)) {
                    quorumLearnerLoginContext = value;
                } else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT)) {
                    quorumServerLoginContext = value;
                } else if (key.equals(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL)) {
                    quorumServicePrincipal = value;
                } else if (key.equals("quorum.cnxn.threads.size")) {
                    quorumCnxnThreadsSize = Integer.parseInt(value);
                } else if (key.equals("metricsProvider.className")) {
                    metricsProviderClassName = value;
                } else if (key.startsWith("metricsProvider.")) {
                    String keyForMetricsProvider = key.substring(16);
                    metricsProviderConfiguration.put(keyForMetricsProvider, value);
                } else {
                    System.setProperty("zookeeper." + key, value);
                }
            }
    
            if (!quorumEnableSasl && quorumServerRequireSasl) {
                throw new IllegalArgumentException(
                        QuorumAuth.QUORUM_SASL_AUTH_ENABLED
                                + " is disabled, so cannot enable "
                                + QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);
            }
            if (!quorumEnableSasl && quorumLearnerRequireSasl) {
                throw new IllegalArgumentException(
                        QuorumAuth.QUORUM_SASL_AUTH_ENABLED
                                + " is disabled, so cannot enable "
                                + QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED);
            }
            // If quorumpeer learner is not auth enabled then self won't be able to
            // join quorum. So this condition is ensuring that the quorumpeer learner
            // is also auth enabled while enabling quorum server require sasl.
            if (!quorumLearnerRequireSasl && quorumServerRequireSasl) {
                throw new IllegalArgumentException(
                        QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED
                                + " is disabled, so cannot enable "
                                + QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);
            }
    
            // Reset to MIN_SNAP_RETAIN_COUNT if invalid (less than 3)
            // PurgeTxnLog.purge(File, File, int) will not allow to purge less
            // than 3.
            if (snapRetainCount < MIN_SNAP_RETAIN_COUNT) {
                LOG.warn("Invalid autopurge.snapRetainCount: " + snapRetainCount
                        + ". Defaulting to " + MIN_SNAP_RETAIN_COUNT);
                snapRetainCount = MIN_SNAP_RETAIN_COUNT;
            }
    
            if (dataDir == null) {
                throw new IllegalArgumentException("dataDir is not set");
            }
            if (dataLogDir == null) {
                dataLogDir = dataDir;
            }
    
            if (clientPort == 0) {
                LOG.info("clientPort is not set");
                if (clientPortAddress != null) {
                    throw new IllegalArgumentException("clientPortAddress is set but clientPort is not set");
                }
            } else if (clientPortAddress != null) {
                this.clientPortAddress = new InetSocketAddress(
                        InetAddress.getByName(clientPortAddress), clientPort);
                LOG.info("clientPortAddress is {}", formatInetAddr(this.clientPortAddress));
            } else {
                this.clientPortAddress = new InetSocketAddress(clientPort);
                LOG.info("clientPortAddress is {}", formatInetAddr(this.clientPortAddress));
            }
    
            if (secureClientPort == 0) {
                LOG.info("secureClientPort is not set");
                if (secureClientPortAddress != null) {
                    throw new IllegalArgumentException("secureClientPortAddress is set but secureClientPort is not set");
                }
            } else if (secureClientPortAddress != null) {
                this.secureClientPortAddress = new InetSocketAddress(
                        InetAddress.getByName(secureClientPortAddress), secureClientPort);
                LOG.info("secureClientPortAddress is {}", formatInetAddr(this.secureClientPortAddress));
            } else {
                this.secureClientPortAddress = new InetSocketAddress(secureClientPort);
                LOG.info("secureClientPortAddress is {}", formatInetAddr(this.secureClientPortAddress));
            }
            if (this.secureClientPortAddress != null) {
                configureSSLAuth();
            }
    
            if (tickTime == 0) {
                throw new IllegalArgumentException("tickTime is not set");
            }
    
            minSessionTimeout = minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
            maxSessionTimeout = maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout;
    
            if (minSessionTimeout > maxSessionTimeout) {
                throw new IllegalArgumentException(
                        "minSessionTimeout must not be larger than maxSessionTimeout");
            }
    
            LOG.info("metricsProvider.className is {}", metricsProviderClassName);
            try {
                Class.forName(metricsProviderClassName, false, Thread.currentThread().getContextClassLoader());
            } catch (ClassNotFoundException error) {
                throw new IllegalArgumentException("metrics provider class was not found", error);
            }
    
            // backward compatibility - dynamic configuration in the same file as
            // static configuration params see writeDynamicConfig()
            if (dynamicConfigFileStr == null) {
                // 初始化集群配置, 比如要求配置格式为 server.1=172.19.2.2:2181:3181, 否则抛出异常, 
                // 其中 2181 为提供服务使用的端口, 3181 为选举使用的端口号
                setupQuorumPeerConfig(zkProp, true);
                if (isDistributed() && isReconfigEnabled()) {
                    // we don't backup static config for standalone mode.
                    // we also don't backup if reconfig feature is disabled.
                    backupOldConfig();
                }
            }
        }
    
        /**
         * Parse dynamic configuration file and return
         * quorumVerifier for new configuration.
         * @param dynamicConfigProp Properties to parse from.
         * @throws IOException
         * @throws ConfigException
         */
        public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings,
           boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
           boolean isHierarchical = false;
            for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) {
                String key = entry.getKey().toString().trim();                    
                if (key.startsWith("group") || key.startsWith("weight")) {
                   isHierarchical = true;
                } else if (!configBackwardCompatibilityMode && !key.startsWith("server.") && !key.equals("version")){ 
                   LOG.info(dynamicConfigProp.toString());
                   throw new ConfigException("Unrecognised parameter: " + key);                
                }
            }
            
            QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical);
                   
            int numParticipators = qv.getVotingMembers().size();
            int numObservers = qv.getObservingMembers().size();
            if (numParticipators == 0) {
                if (!standaloneEnabled) {
                    throw new IllegalArgumentException("standaloneEnabled = false then " +
                            "number of participants should be >0");
                }
                if (numObservers > 0) {
                    throw new IllegalArgumentException("Observers w.o participants is an invalid configuration");
                }
            } else if (numParticipators == 1 && standaloneEnabled) {
                // HBase currently adds a single server line to the config, for
                // b.w compatibility reasons we need to keep this here. If standaloneEnabled
                // is true, the QuorumPeerMain script will create a standalone server instead
                // of a quorum configuration
                // 如果只有一个server, 但是又要成为 集群选举模式,则是错误的配置
                LOG.error("Invalid configuration, only one server specified (ignoring)");
                if (numObservers > 0) {
                    throw new IllegalArgumentException("Observers w.o quorum is an invalid configuration");
                }
            } else {
                if (warnings) {
                    if (numParticipators <= 2) {
                        LOG.warn("No server failure will be tolerated. " +
                            "You need at least 3 servers.");
                    } else if (numParticipators % 2 == 0) {
                        LOG.warn("Non-optimial configuration, consider an odd number of servers.");
                    }
                }
    
                for (QuorumServer s : qv.getVotingMembers().values()) {
                    if (s.electionAddr == null)
                        throw new IllegalArgumentException(
                                "Missing election port for server: " + s.id);
                }
            }
            return qv;
        }
    
        private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException{
           if(isHierarchical){
                return new QuorumHierarchical(dynamicConfigProp);
            } else {
               /*
                 * The default QuorumVerifier is QuorumMaj
                 */        
                //LOG.info("Defaulting to majority quorums");
                return new QuorumMaj(dynamicConfigProp);            
            }          
        }
        public QuorumMaj(Properties props) throws ConfigException {
            for (Entry<Object, Object> entry : props.entrySet()) {
                String key = entry.getKey().toString();
                String value = entry.getValue().toString();
    
                if (key.startsWith("server.")) {
                    int dot = key.indexOf('.');
                    long sid = Long.parseLong(key.substring(dot + 1));
                    QuorumServer qs = new QuorumServer(sid, value);
                    allMembers.put(Long.valueOf(sid), qs);
                    if (qs.type == LearnerType.PARTICIPANT)
                        // 把自己加入投票者名单中
                        votingMembers.put(Long.valueOf(sid), qs);
                    else {
                        observingMembers.put(Long.valueOf(sid), qs);
                    }
                } else if (key.equals("version")) {
                    version = Long.parseLong(value, 16);
                }
            }
            // 最后,计算出半数的投票人数,超过半数后,选举将成立
            half = votingMembers.size() / 2;
        }
    
            // org.apache.zookeeper.server.quorum.QuorumPeer$QuorumServer
            public QuorumServer(long sid, String addressStr) throws ConfigException {
                // LOG.warn("sid = " + sid + " addressStr = " + addressStr);
                this.id = sid;
                String serverClientParts[] = addressStr.split(";");
                String serverParts[] = ConfigUtils.getHostAndPort(serverClientParts[0]);
                if ((serverClientParts.length > 2) || (serverParts.length < 3)
                        || (serverParts.length > 4)) {
                    throw new ConfigException(addressStr + wrongFormat);
                }
    
                if (serverClientParts.length == 2) {
                    //LOG.warn("ClientParts: " + serverClientParts[1]);
                    String clientParts[] = ConfigUtils.getHostAndPort(serverClientParts[1]);
                    if (clientParts.length > 2) {
                        throw new ConfigException(addressStr + wrongFormat);
                    }
    
                    // is client_config a host:port or just a port
                    hostname = (clientParts.length == 2) ? clientParts[0] : "0.0.0.0";
                    try {
                        clientAddr = new InetSocketAddress(hostname,
                                Integer.parseInt(clientParts[clientParts.length - 1]));
                        //LOG.warn("Set clientAddr to " + clientAddr);
                    } catch (NumberFormatException e) {
                        throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]);
                    }
                }
    
                // server_config should be either host:port:port or host:port:port:type
                try {
                    addr = new InetSocketAddress(serverParts[0],
                            Integer.parseInt(serverParts[1]));
                } catch (NumberFormatException e) {
                    throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[1]);
                }
                try {
                    electionAddr = new InetSocketAddress(serverParts[0],
                            Integer.parseInt(serverParts[2]));
                } catch (NumberFormatException e) {
                    throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[2]);
                }
    
                if(addr.getPort() == electionAddr.getPort()) {
                    throw new ConfigException(
                            "Client and election port must be different! Please update the configuration file on server." + sid);
                }
    
                if (serverParts.length == 4) {
                    setType(serverParts[3]);
                }
    
                this.hostname = serverParts[0];
                
                setMyAddrs();
            }
    
            private void setMyAddrs() {
                this.myAddrs = new ArrayList<InetSocketAddress>();
                this.myAddrs.add(this.addr);
                this.myAddrs.add(this.clientAddr);
                this.myAddrs.add(this.electionAddr);
                // 把类型于 127.0.0.1 这样的特殊地址排除
                this.myAddrs = excludedSpecialAddresses(this.myAddrs);
            }
        // org.apache.zookeeper.server.quorum.QuorumPeerConfig
        void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
                throws IOException, ConfigException {
            quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
            // 检测 myid 文件是否存在,不存在则报错
            setupMyId();
            // 设置端口号
            setupClientPort();
            // 设置节点类型: PARTICIPANT, OBSERVER
            setupPeerType();
            checkValidity();
        }    
        

      配置解析好后,就运行zk的server逻辑了!此处以单机模式为例进行分解!

        // org.apache.zookeeper.server.ZooKeeperServerMain
        /**
         * Run from a ServerConfig.
         * @param config ServerConfig to use.
         * @throws IOException
         * @throws AdminServerException
         */
        public void runFromConfig(ServerConfig config)
                throws IOException, AdminServerException {
            LOG.info("Starting server");
            FileTxnSnapLog txnLog = null;
            try {
                try {
                    // 先启动度量程序
                    metricsProvider = MetricsProviderBootstrap
                            .startMetricsProvider(config.getMetricsProviderClassName(),
                                                  config.getMetricsProviderConfiguration());
                } catch (MetricsProviderLifeCycleException error) {
                    throw new IOException("Cannot boot MetricsProvider "+config.getMetricsProviderClassName(),
                        error);
                }
    
                // Note that this thread isn't going to be doing anything else,
                // so rather than spawning another thread, we will just call
                // run() in this thread.
                // create a file logger url from the command line args
                // 创建各种日志文件,并校验有效性
                txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
                // 创建 ZooKeeperServer, zk 正式启动, zkDb 设置为 null
                final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog,
                        config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);
                zkServer.setRootMetricsContext(metricsProvider.getRootContext());
                txnLog.setServerStats(zkServer.serverStats());
    
                // Registers shutdown handler which will be used to know the
                // server error or shutdown state changes.
                // 关闭的闭锁,注册关闭钩子
                final CountDownLatch shutdownLatch = new CountDownLatch(1);
                zkServer.registerServerShutdownHandler(
                        new ZooKeeperServerShutdownHandler(shutdownLatch));
    
                // Start Admin server
                // 创建一个 jettyServer 的实例, 后台管理控制台
                adminServer = AdminServerFactory.createAdminServer();
                adminServer.setZooKeeperServer(zkServer);
                adminServer.start();
    
                boolean needStartZKServer = true;
                if (config.getClientPortAddress() != null) {
                    // 创建 cnxn , 默认为 NIOServerCnxnFactory
                    cnxnFactory = ServerCnxnFactory.createFactory();
                    // 配置 server, 进行权限验证
                    cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
                    cnxnFactory.startup(zkServer);
                    // zkServer has been started. So we don't need to start it again in secureCnxnFactory.
                    needStartZKServer = false;
                }
                if (config.getSecureClientPortAddress() != null) {
                    secureCnxnFactory = ServerCnxnFactory.createFactory();
                    secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true);
                    secureCnxnFactory.startup(zkServer, needStartZKServer);
                }
    
                containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.firstProcessor,
                        Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
                        Integer.getInteger("znode.container.maxPerMinute", 10000)
                );
                containerManager.start();
    
                // Watch status of ZooKeeper server. It will do a graceful shutdown
                // if the server is not running or hits an internal error.
                // 阻塞在此进行等等关闭信号, 至此,则 server 已完全启动
                shutdownLatch.await();
    
                // 优雅停机处理
                shutdown();
    
                if (cnxnFactory != null) {
                    cnxnFactory.join();
                }
                if (secureCnxnFactory != null) {
                    secureCnxnFactory.join();
                }
                if (zkServer.canShutdown()) {
                    zkServer.shutdown(true);
                }
            } catch (InterruptedException e) {
                // warn, but generally this is ok
                LOG.warn("Server interrupted", e);
            } finally {
                if (txnLog != null) {
                    txnLog.close();
                }
                if (metricsProvider != null) {
                    try {
                        metricsProvider.stop();
                    } catch (Throwable error) {
                        LOG.warn("Error while stopping metrics", error);
                    }
                }
            }
        }

      

    如上,启动主要分作几步:
      1. 开启度量程序,监控指标;
      2. 使用 FileTxnSnapLog, 启动时从磁盘或其他地方恢复数据;
      3. 创建 ZooKeeperServer, 待用;
      4. 注册关闭钩子 ZooKeeperServerShutdownHandler ;
      5. 启动后台管理程序 AdminServerFactory ;
      6. 启动 zkServer;
      7. 启动 ContainerManager;
      8. 服务启动, 阻塞等待关闭信号;

      如上,也算是基本的服务端程序的动作流程了!

      附一个判断是否是集群的实现方式:

        // 检测是否是集群模式
        public boolean isDistributed() {
            return quorumVerifier!=null && (!standaloneEnabled || quorumVerifier.getVotingMembers().size() > 1);
        }

     接下来,我们先来看看ZkServer 的初始化!

        // org.apache.zookeeper.server.ZooKeeperServer
        /**
         * Creates a ZooKeeperServer instance. It sets everything up, but doesn't
         * actually start listening for clients until run() is invoked.
         *
         * @param dataDir the directory to put the data
         */
        public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
                int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb) {
            serverStats = new ServerStats(this);
            this.txnLogFactory = txnLogFactory;
            this.txnLogFactory.setServerStats(this.serverStats);
            // zkDb 为 zk 保存数据最重要的实例,但是此为 null
            this.zkDb = zkDb;
            this.tickTime = tickTime;
            // 默认最小超时: 4000, 最大超时: 40000;
            setMinSessionTimeout(minSessionTimeout);
            setMaxSessionTimeout(maxSessionTimeout);
            // 开启一个监听,主要作用是在
            listener = new ZooKeeperServerListenerImpl(this);
            LOG.info("Created server with tickTime " + tickTime
                    + " minSessionTimeout " + getMinSessionTimeout()
                    + " maxSessionTimeout " + getMaxSessionTimeout()
                    + " datadir " + txnLogFactory.getDataDir()
                    + " snapdir " + txnLogFactory.getSnapDir());
        }
        // 其中, server 的 state 为4种, 第种状态代表其所处的生命周期
        protected enum State {
            INITIAL, RUNNING, SHUTDOWN, ERROR
        }
        
        // org.apache.zookeeper.server.NIOServerCnxnFactory, 配置 server
        @Override
        public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
            if (secure) {
                throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
            }
            // 如果有必要的话,会进行登录权限验证, 有时候因为 zk 连接不通,则可能报 SASL 没有权限的错误
            configureSaslLogin();
    
            // 最大连接数,默认为 60
            maxClientCnxns = maxcc;
            sessionlessCnxnTimeout = Integer.getInteger(
                ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
            // We also use the sessionlessCnxnTimeout as expiring interval for
            // cnxnExpiryQueue. These don't need to be the same, but the expiring
            // interval passed into the ExpiryQueue() constructor below should be
            // less than or equal to the timeout.
            cnxnExpiryQueue =
                new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
            // 过期队列清理线程,后面细看
            expirerThread = new ConnectionExpirerThread();
    
            int numCores = Runtime.getRuntime().availableProcessors();
            // 32 cores sweet spot seems to be 4 selector threads
            numSelectorThreads = Integer.getInteger(
                ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
                Math.max((int) Math.sqrt((float) numCores/2), 1));
            if (numSelectorThreads < 1) {
                throw new IOException("numSelectorThreads must be at least 1");
            }
    
            // worker 线程数,默认为 cpu数的两倍
            numWorkerThreads = Integer.getInteger(
                ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
            workerShutdownTimeoutMS = Long.getLong(
                ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
    
            LOG.info("Configuring NIO connection handler with "
                     + (sessionlessCnxnTimeout/1000) + "s sessionless connection"
                     + " timeout, " + numSelectorThreads + " selector thread(s), "
                     + (numWorkerThreads > 0 ? numWorkerThreads : "no")
                     + " worker threads, and "
                     + (directBufferBytes == 0 ? "gathered writes." :
                        ("" + (directBufferBytes/1024) + " kB direct buffers.")));
            // 将指定数量的selector线程添加到集合中,以便在 acceptThread 中开启
            for(int i=0; i<numSelectorThreads; ++i) {
                selectorThreads.add(new SelectorThread(i));
            }
    
            // 打开一个 nio 连接, 判定socket 端口,至此,外部语法就可以进来了,但是,还没有任何的处理程序
            this.ss = ServerSocketChannel.open();
            ss.socket().setReuseAddress(true);
            LOG.info("binding to port " + addr);
            ss.socket().bind(addr);
            ss.configureBlocking(false);
            // 将连接信息传入内部类 AcceptThread 线程中,后缀的连接操作将直接由 AcceptThread 处理;
            acceptThread = new AcceptThread(ss, addr, selectorThreads);
        }
        
            // org.apache.zookeeper.server.NIOServerCnxnFactory$AcceptThread
            public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr,
                    Set<SelectorThread> selectorThreads) throws IOException {
                super("NIOServerCxnFactory.AcceptThread:" + addr);
                this.acceptSocket = ss;
                this.acceptKey =
                    acceptSocket.register(selector, SelectionKey.OP_ACCEPT);
                this.selectorThreads = Collections.unmodifiableList(
                    new ArrayList<SelectorThread>(selectorThreads));
                selectorIterator = this.selectorThreads.iterator();
            }
    
        // org.apache.zookeeper.server.ServerCnxnFactory
        /**
         * Initialize the server SASL if specified.
         *
         * If the user has specified a "ZooKeeperServer.LOGIN_CONTEXT_NAME_KEY"
         * or a jaas.conf using "java.security.auth.login.config"
         * the authentication is required and an exception is raised.
         * Otherwise no authentication is configured and no exception is raised.
         *
         * @throws IOException if jaas.conf is missing or there's an error in it.
         */
        protected void configureSaslLogin() throws IOException {
            String serverSection = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,
                                                      ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME);
    
            // Note that 'Configuration' here refers to javax.security.auth.login.Configuration.
            AppConfigurationEntry entries[] = null;
            SecurityException securityException = null;
            try {
                entries = Configuration.getConfiguration().getAppConfigurationEntry(serverSection);
            } catch (SecurityException e) {
                // handle below: might be harmless if the user doesn't intend to use JAAS authentication.
                securityException = e;
            }
    
            // No entries in jaas.conf
            // If there's a configuration exception fetching the jaas section and
            // the user has required sasl by specifying a LOGIN_CONTEXT_NAME_KEY or a jaas file
            // we throw an exception otherwise we continue without authentication.
            if (entries == null) {
                String jaasFile = System.getProperty(Environment.JAAS_CONF_KEY);
                String loginContextName = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY);
                if (securityException != null && (loginContextName != null || jaasFile != null)) {
                    String errorMessage = "No JAAS configuration section named '" + serverSection +  "' was found";
                    if (jaasFile != null) {
                        errorMessage += "in '" + jaasFile + "'.";
                    }
                    if (loginContextName != null) {
                        errorMessage += " But " + ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY + " was set.";
                    }
                    LOG.error(errorMessage);
                    throw new IOException(errorMessage);
                }
                return;
            }
    
            // jaas.conf entry available
            try {
                saslServerCallbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration());
                login = new Login(serverSection, saslServerCallbackHandler, new ZKConfig() );
                login.startThreadIfNeeded();
            } catch (LoginException e) {
                throw new IOException("Could not configure server because SASL configuration did not allow the "
                  + " ZooKeeper server to authenticate itself properly: " + e);
            }
        }

    接下来是对连接端的启动: cnxnFactory.startup(zkServer);

        // org.apache.zookeeper.server.ServerCnxnFactory
        public void startup(ZooKeeperServer zkServer) throws IOException, InterruptedException {
            startup(zkServer, true);
        }
        @Override
        public void startup(ZooKeeperServer zks, boolean startServer)
                throws IOException, InterruptedException {
            // 调用 NioServer 的start()
            start();
            // 绑定server 工厂类
            setZooKeeperServer(zks);
            if (startServer) {
                // 从磁盘加载初始化数据
                zks.startdata();
                // 启动 zkServer
                zks.startup();
            }
        }
    
        // org.apache.zookeeper.server.NIOServerCnxnFactory
        // 开启各种线程, acceptor, selector, expirerThread...
        @Override
        public void start() {
            stopped = false;
            if (workerPool == null) {
                // 自定义实现的线程池, 其底层也是启用 Executors 工厂类,去生成  ThreadPoolExecutor
                // 该 线程程是自启动的
                workerPool = new WorkerService(
                    "NIOWorker", numWorkerThreads, false);
            }
            for(SelectorThread thread : selectorThreads) {
                // 没有启动的线程就让它启动, 而 SelectorThread 内部则是由两个关键队列组成
                if (thread.getState() == Thread.State.NEW) {
                    thread.start();
                }
            }
            // ensure thread is started once and only once
            if (acceptThread.getState() == Thread.State.NEW) {
                acceptThread.start();
            }
            if (expirerThread.getState() == Thread.State.NEW) {
                expirerThread.start();
            }
        }
        // org.apache.zookeeper.server.WorkerService , worker 线程池的实现方式一览: FixedThreadPool()
        public WorkerService(String name, int numThreads,
                             boolean useAssignableThreads) {
            this.threadNamePrefix = (name == null ? "" : name) + "Thread";
            this.numWorkerThreads = numThreads;
            this.threadsAreAssignable = useAssignableThreads;
            // 构造好后,直接启动自身, 其实是初始化好线程池
            start();
        }
        public void start() {
            if (numWorkerThreads > 0) {
                if (threadsAreAssignable) {
                    for(int i = 1; i <= numWorkerThreads; ++i) {
                        // worker 是基于 fixed 线程池的
                        workers.add(Executors.newFixedThreadPool(
                            1, new DaemonThreadFactory(threadNamePrefix, i)));
                    }
                } else {
                    workers.add(Executors.newFixedThreadPool(
                        numWorkerThreads, new DaemonThreadFactory(threadNamePrefix)));
                }
            }
            stopped = false;
        }
        // SelectorThread 是一个 org.apache.zookeeper.server.NIOServerCnxnFactory, 的内部类, 主要维护 acceptedQueue和updateQueue 两个队列
            public SelectorThread(int id) throws IOException {
                super("NIOServerCxnFactory.SelectorThread-" + id);
                this.id = id;
                acceptedQueue = new LinkedBlockingQueue<SocketChannel>();
                updateQueue = new LinkedBlockingQueue<SelectionKey>();
            }


    如上 zkServer 的 start() 过程,其实就是多种线程/线程池的开启过程!所有的 zkServer 的服务也是由这些线程来操作的!主要的操作流程为:
      1. selectorThreads 接收外部请求, 放入 acceptedQueue 中;
      2. 由 selector 构造 IOWorkRequest 放入 workerPool 中,进行稍后调试处理;
      3. 由 workerPool 调度 IOWorkRequest.dowork() 方法进行处理;

      4. ....

    zkServer 启动起来之后,就会先的磁盘或者其他地方同步初始化数据;

      zks.startdata(); 加载磁盘数据; 即初始化 zkDb, 整个运行的数据都是保存在该数据结构中!

        // 我们先看下 FileTxnSnapLog 的构造器,其实做了很多事,如检查目录权限,创建目录等, 这将为后续的数据恢复打下基础
        // org.apache.zookeeper.server.persistence.FileTxnSnapLog
        /**
         * the constructor which takes the datadir and
         * snapdir.
         * @param dataDir the transaction directory
         * @param snapDir the snapshot directory
         */
        public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
            LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);
    
            this.dataDir = new File(dataDir, version + VERSION);
            this.snapDir = new File(snapDir, version + VERSION);
    
            // by default create snap.log dirs, but otherwise complain instead
            // See ZOOKEEPER-1161 for more details
            boolean enableAutocreate = Boolean.valueOf(
                    System.getProperty(ZOOKEEPER_DATADIR_AUTOCREATE,
                            ZOOKEEPER_DATADIR_AUTOCREATE_DEFAULT));
    
            if (!this.dataDir.exists()) {
                if (!enableAutocreate) {
                    throw new DatadirException("Missing data directory "
                            + this.dataDir
                            + ", automatic data directory creation is disabled ("
                            + ZOOKEEPER_DATADIR_AUTOCREATE
                            + " is false). Please create this directory manually.");
                }
    
                if (!this.dataDir.mkdirs()) {
                    throw new DatadirException("Unable to create data directory "
                            + this.dataDir);
                }
            }
            if (!this.dataDir.canWrite()) {
                throw new DatadirException("Cannot write to data directory " + this.dataDir);
            }
    
            if (!this.snapDir.exists()) {
                // by default create this directory, but otherwise complain instead
                // See ZOOKEEPER-1161 for more details
                if (!enableAutocreate) {
                    throw new DatadirException("Missing snap directory "
                            + this.snapDir
                            + ", automatic data directory creation is disabled ("
                            + ZOOKEEPER_DATADIR_AUTOCREATE
                            + " is false). Please create this directory manually.");
                }
    
                if (!this.snapDir.mkdirs()) {
                    throw new DatadirException("Unable to create snap directory "
                            + this.snapDir);
                }
            }
            if (!this.snapDir.canWrite()) {
                throw new DatadirException("Cannot write to snap directory " + this.snapDir);
            }
    
            // check content of transaction log and snapshot dirs if they are two different directories
            // See ZOOKEEPER-2967 for more details
            if(!this.dataDir.getPath().equals(this.snapDir.getPath())){
                checkLogDir();
                checkSnapDir();
            }
    
            txnLog = new FileTxnLog(this.dataDir);
            snapLog = new FileSnap(this.snapDir);
    
            autoCreateDB = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DB_AUTOCREATE,
                    ZOOKEEPER_DB_AUTOCREATE_DEFAULT));
        }
    
        // org.apache.zookeeper.server.ZooKeeperServer, 这里是开始初始化 zkDb
        public void startdata()
        throws IOException, InterruptedException {
            //check to see if zkDb is not null
            // zkDb 主要使用 链表 和 map 来保存数据, DataTree() 保存数据, 而 DataTree 中又以 ConcurrentHashMap() 作为存储方式
            // 咱们先重点看一下
            if (zkDb == null) {
                zkDb = new ZKDatabase(this.txnLogFactory);
            }
            // 未初始化过 zkDb 则从磁盘加载一次数据
            if (!zkDb.isInitialized()) {
                loadData();
            }
        }
        // org.apache.zookeeper.server.ZKDatabase, 最重要的一个数据结构之一
        /**
         * the filetxnsnaplog that this zk database
         * maps to. There is a one to one relationship
         * between a filetxnsnaplog and zkdatabase.
         * @param snapLog the FileTxnSnapLog mapping this zkdatabase
         */
        public ZKDatabase(FileTxnSnapLog snapLog) {
            // 使用 DataTree() 
            dataTree = createDataTree();
            // 有过期时间的 session 使用 ConcurrentHashMap 保存, zxid -> timeout
            sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
            this.snapLog = snapLog;
    
            try {
                snapshotSizeFactor = Double.parseDouble(
                    System.getProperty(SNAPSHOT_SIZE_FACTOR,
                            Double.toString(DEFAULT_SNAPSHOT_SIZE_FACTOR)));
                if (snapshotSizeFactor > 1) {
                    snapshotSizeFactor = DEFAULT_SNAPSHOT_SIZE_FACTOR;
                    LOG.warn("The configured {} is invalid, going to use " +
                            "the default {}", SNAPSHOT_SIZE_FACTOR,
                            DEFAULT_SNAPSHOT_SIZE_FACTOR);
                }
            } catch (NumberFormatException e) {
                LOG.error("Error parsing {}, using default value {}",
                        SNAPSHOT_SIZE_FACTOR, DEFAULT_SNAPSHOT_SIZE_FACTOR);
                snapshotSizeFactor = DEFAULT_SNAPSHOT_SIZE_FACTOR;
            }
            LOG.info("{} = {}", SNAPSHOT_SIZE_FACTOR, snapshotSizeFactor);
        }
    
        // org.apache.zookeeper.server.DataTree 构造函数如下
        public DataTree() {
            /* Rather than fight it, let root have an alias */
            nodes.put("", root);
            nodes.put(rootZookeeper, root);
    
            /** add the proc node and quota node */
            root.addChild(procChildZookeeper);
            nodes.put(procZookeeper, procDataNode);
    
            procDataNode.addChild(quotaChildZookeeper);
            nodes.put(quotaZookeeper, quotaDataNode);
    
            addConfigNode();
    
            nodeDataSize.set(approximateDataSize());
            try {
                // 创建 watchManager, 方便 watch 
                dataWatches = WatchManagerFactory.createWatchManager();
                childWatches = WatchManagerFactory.createWatchManager();
            } catch (Exception e) {
                LOG.error("Unexpected exception when creating WatchManager, " +
                        "exiting abnormally", e);
                System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
            }
        }
        // org.apache.zookeeper.server.watch.WatchManagerFactory, 尽量支持动态设置, 默认为 WatchManager ;
        public static IWatchManager createWatchManager() throws IOException {
            String watchManagerName = System.getProperty(ZOOKEEPER_WATCH_MANAGER_NAME);
            if (watchManagerName == null) {
                watchManagerName = WatchManager.class.getName();
            }
            try {
                IWatchManager watchManager =
                        (IWatchManager) Class.forName(watchManagerName).newInstance();
                LOG.info("Using {} as watch manager", watchManagerName);
                return watchManager;
            } catch (Exception e) {
                IOException ioe = new IOException("Couldn't instantiate "
                        + watchManagerName);
                ioe.initCause(e);
                throw ioe;
            }
        }
    
        // org.apache.zookeeper.server.ZooKeeperServer
        // 从磁盘加载初始化数据, 
        /**
         *  Restore sessions and data
         */
        public void loadData() throws IOException, InterruptedException {
            /*
             * When a new leader starts executing Leader#lead, it
             * invokes this method. The database, however, has been
             * initialized before running leader election so that
             * the server could pick its zxid for its initial vote.
             * It does it by invoking QuorumPeer#getLastLoggedZxid.
             * Consequently, we don't need to initialize it once more
             * and avoid the penalty of loading it a second time. Not
             * reloading it is particularly important for applications
             * that host a large database.
             *
             * The following if block checks whether the database has
             * been initialized or not. Note that this method is
             * invoked by at least one other method:
             * ZooKeeperServer#startdata.
             *
             * See ZOOKEEPER-1642 for more detail.
             */
            if(zkDb.isInitialized()){
                setZxid(zkDb.getDataTreeLastProcessedZxid());
            }
            else {
                // 首次加载 DataBase, 会重新 loadDataBase(), 并获取最大 zxid
                setZxid(zkDb.loadDataBase());
            }
    
            // Clean up dead sessions
            List<Long> deadSessions = new LinkedList<Long>();
            for (Long session : zkDb.getSessions()) {
                if (zkDb.getSessionWithTimeOuts().get(session) == null) {
                    deadSessions.add(session);
                }
            }
    
            for (long session : deadSessions) {
                // XXX: Is lastProcessedZxid really the best thing to use?
                killSession(session, zkDb.getDataTreeLastProcessedZxid());
            }
    
            // Make a clean snapshot
            takeSnapshot();
        }
    
        // org.apache.zookeeper.server.ZKDatabase
        /**
         * load the database from the disk onto memory and also add
         * the transactions to the committedlog in memory.
         * @return the last valid zxid on disk
         * @throws IOException
         */
        public long loadDataBase() throws IOException {
            long startTime = Time.currentElapsedTime();
            // 使用 snapLog 进行数据的恢复
            long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
            initialized = true;
            long loadTime = Time.currentElapsedTime() - startTime;
            ServerMetrics.DB_INIT_TIME.add(loadTime);
            LOG.info("Snapshot loaded in " + loadTime + " ms");
            return zxid;
        }
    
        // org.apache.zookeeper.server.persistence.FileSnap    
        /**
         * this function restores the server
         * database after reading from the
         * snapshots and transaction logs
         * @param dt the datatree to be restored
         * @param sessions the sessions to be restored
         * @param listener the playback listener to run on the
         * database restoration
         * @return the highest zxid restored
         * @throws IOException
         */
        public long restore(DataTree dt, Map<Long, Integer> sessions,
                            PlayBackListener listener) throws IOException {
            long deserializeResult = snapLog.deserialize(dt, sessions);
            FileTxnLog txnLog = new FileTxnLog(dataDir);
            boolean trustEmptyDB;
            // 如果存在初始化文件标识存在,则删除
            File initFile = new File(dataDir.getParent(), "initialize");
            if (Files.deleteIfExists(initFile.toPath())) {
                LOG.info("Initialize file found, an empty database will not block voting participation");
                trustEmptyDB = true;
            } else {
                trustEmptyDB = autoCreateDB;
            }
            if (-1L == deserializeResult) {
                /* this means that we couldn't find any snapshot, so we need to
                 * initialize an empty database (reported in ZOOKEEPER-2325) */
                if (txnLog.getLastLoggedZxid() != -1) {
                    throw new IOException(
                            "No snapshot found, but there are log entries. " +
                            "Something is broken!");
                }
    
                if (trustEmptyDB) {
                    /* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
                     *       or use Map on save() */
                    save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false);
    
                    /* return a zxid of 0, since we know the database is empty */
                    return 0L;
                } else {
                    /* return a zxid of -1, since we are possibly missing data */
                    LOG.warn("Unexpected empty data tree, setting zxid to -1");
                    dt.lastProcessedZxid = -1L;
                    return -1L;
                }
            }
            // 从副本中恢复数据
            return fastForwardFromEdits(dt, sessions, listener);
        }
    
        /**
         * deserialize a data tree from the most recent snapshot
         * @return the zxid of the snapshot
         */
        public long deserialize(DataTree dt, Map<Long, Integer> sessions)
                throws IOException {
            // we run through 100 snapshots (not all of them)
            // if we cannot get it running within 100 snapshots
            // we should  give up
            List<File> snapList = findNValidSnapshots(100);
            if (snapList.size() == 0) {
                return -1L;
            }
            File snap = null;
            boolean foundValid = false;
            for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
                snap = snapList.get(i);
                LOG.info("Reading snapshot " + snap);
                try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
                     CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
                    InputArchive ia = BinaryInputArchive.getArchive(crcIn);
                    deserialize(dt, sessions, ia);
                    long checkSum = crcIn.getChecksum().getValue();
                    long val = ia.readLong("val");
                    // 检查数据checksum,确认是否有被损坏
                    if (val != checkSum) {
                        throw new IOException("CRC corruption in snapshot :  " + snap);
                    }
                    foundValid = true;
                    break;
                } catch (IOException e) {
                    LOG.warn("problem reading snap file " + snap, e);
                }
            }
            if (!foundValid) {
                throw new IOException("Not able to find valid snapshots in " + snapDir);
            }
            // 快照即是带了 zxid 的, 所以 以名字就可以解析出最大的 zxid
            dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
            return dt.lastProcessedZxid;
        }
    
        /**
         * This function will fast forward the server database to have the latest
         * transactions in it.  This is the same as restore, but only reads from
         * the transaction logs and not restores from a snapshot.
         * @param dt the datatree to write transactions to.
         * @param sessions the sessions to be restored.
         * @param listener the playback listener to run on the
         * database transactions.
         * @return the highest zxid restored.
         * @throws IOException
         */
        public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
                                         PlayBackListener listener) throws IOException {
            TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
            long highestZxid = dt.lastProcessedZxid;
            TxnHeader hdr;
            try {
                while (true) {
                    // iterator points to
                    // the first valid txn when initialized
                    hdr = itr.getHeader();
                    if (hdr == null) {
                        //empty logs
                        return dt.lastProcessedZxid;
                    }
                    // 只处理最后几个连续未被提交的事务数据
                    if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                        LOG.error("{}(highestZxid) > {}(next log) for type {}",
                                highestZxid, hdr.getZxid(), hdr.getType());
                    } else {
                        highestZxid = hdr.getZxid();
                    }
                    try {
                        processTransaction(hdr,dt,sessions, itr.getTxn());
                    } catch(KeeperException.NoNodeException e) {
                       throw new IOException("Failed to process transaction type: " +
                             hdr.getType() + " error: " + e.getMessage(), e);
                    }
                    listener.onTxnLoaded(hdr, itr.getTxn());
                    if (!itr.next())
                        break;
                }
            } finally {
                if (itr != null) {
                    itr.close();
                }
            }
            return highestZxid;
        }
    
        // org.apache.zookeeper.server.ZKDatabase
        // 反序列化时,会调用 DataTree.deserialize() 方法;
        public void deserialize(InputArchive ia, String tag) throws IOException {
            aclCache.deserialize(ia);
            nodes.clear();
            pTrie.clear();
            nodeDataSize.set(0);
            String path = ia.readString("path");
            while (!"/".equals(path)) {
                // 数据一条条读入 node 中
                DataNode node = new DataNode();
                ia.readRecord(node, "node");
                nodes.put(path, node);
                synchronized (node) {
                    aclCache.addUsage(node.acl);
                }
                int lastSlash = path.lastIndexOf('/');
                if (lastSlash == -1) {
                    root = node;
                } else {
                    String parentPath = path.substring(0, lastSlash);
                    DataNode parent = nodes.get(parentPath);
                    if (parent == null) {
                        throw new IOException("Invalid Datatree, unable to find " +
                                "parent " + parentPath + " of path " + path);
                    }
                    parent.addChild(path.substring(lastSlash + 1));
                    long eowner = node.stat.getEphemeralOwner();
                    EphemeralType ephemeralType = EphemeralType.get(eowner);
                    if (ephemeralType == EphemeralType.CONTAINER) {
                        containers.add(path);
                    } else if (ephemeralType == EphemeralType.TTL) {
                        ttls.add(path);
                    } else if (eowner != 0) {
                        HashSet<String> list = ephemerals.get(eowner);
                        if (list == null) {
                            list = new HashSet<String>();
                            ephemerals.put(eowner, list);
                        }
                        list.add(path);
                    }
                }
                path = ia.readString("path");
            }
            // 最后,放入 / 根节点
            nodes.put("/", root);
    
            nodeDataSize.set(approximateDataSize());
    
            // we are done with deserializing the
            // the datatree
            // update the quotas - create path trie
            // and also update the stat nodes
            // 更新集群节点信息,如有必要的话
            setupQuota();
            // 清除无用数据, 即 refers 中 小于0 的节点数据
            aclCache.purgeUnused();
        }
    
        /**
         * this method sets up the path trie and sets up stats for quota nodes
         */
        private void setupQuota() {
            String quotaPath = Quotas.quotaZookeeper;
            DataNode node = getNode(quotaPath);
            if (node == null) {
                return;
            }
            traverseNode(quotaPath);
        }
        
        // 从磁盘加载完数据后,立即做一次新的快照
        // org.apache.zookeeper.server.persistence.FileTxnSnapLog
        public void takeSnapshot() {
            takeSnapshot(false);
        }
    
        public void takeSnapshot(boolean syncSnap){
            long start = Time.currentElapsedTime();
            try {
                txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
            } catch (IOException e) {
                LOG.error("Severe unrecoverable error, exiting", e);
                // This is a severe error that we cannot recover from,
                // so we need to exit
                System.exit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
            }
            long elapsed = Time.currentElapsedTime() - start;
            LOG.info("Snapshot taken in " + elapsed + " ms");
            ServerMetrics.SNAPSHOT_TIME.add(elapsed);
        }
    
        /**
         * save the datatree and the sessions into a snapshot
         * @param dataTree the datatree to be serialized onto disk
         * @param sessionsWithTimeouts the session timeouts to be
         * serialized onto disk
         * @param syncSnap sync the snapshot immediately after write
         * @throws IOException
         */
        public void save(DataTree dataTree,
                         ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
                         boolean syncSnap)
            throws IOException {
            long lastZxid = dataTree.lastProcessedZxid;
            // 快照的命名方式就是前缀+zxid
            // FileSnap.SNAPSHOT_FILE_PREFIX + "." + Long.toHexString(zxid);
            File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
            LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
                    snapshotFile);
            try {
                // 按照一定规则序列化后存储,在读取时反向操作即可,此处为同步操作
                snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
            } catch (IOException e) {
                if (snapshotFile.length() == 0) {
                    /* This may be caused by a full disk. In such a case, the server
                     * will get stuck in a loop where it tries to write a snapshot
                     * out to disk, and ends up creating an empty file instead.
                     * Doing so will eventually result in valid snapshots being
                     * removed during cleanup. */
                    if (snapshotFile.delete()) {
                        LOG.info("Deleted empty snapshot file: " +
                                 snapshotFile.getAbsolutePath());
                    } else {
                        LOG.warn("Could not delete empty snapshot file: " +
                                 snapshotFile.getAbsolutePath());
                    }
                } else {
                    /* Something else went wrong when writing the snapshot out to
                     * disk. If this snapshot file is invalid, when restarting,
                     * ZooKeeper will skip it, and find the last known good snapshot
                     * instead. */
                }
                throw e;
            }
        }
        // org.apache.zookeeper.server.persistence.FileSnap
        /**
         * serialize the datatree and session into the file snapshot
         * @param dt the datatree to be serialized
         * @param sessions the sessions to be serialized
         * @param snapShot the file to store snapshot into
         * @param fsync sync the file immediately after write
         */
        public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync)
                throws IOException {
            if (!close) {
                try (CheckedOutputStream crcOut =
                             new CheckedOutputStream(new BufferedOutputStream(fsync ? new AtomicFileOutputStream(snapShot) :
                                                                                      new FileOutputStream(snapShot)),
                                                     new Adler32())) {
                    //CheckedOutputStream cout = new CheckedOutputStream()
                    OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
                    FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
                    serialize(dt, sessions, oa, header);
                    long val = crcOut.getChecksum().getValue();
                    oa.writeLong(val, "val");
                    oa.writeString("/", "path");
                    crcOut.flush();
                }
            }
        }
        

    如上,loaddata(); 就算完成了!接下就是真正的启动了!

      zks.startup();

        
        // org.apache.zookeeper.server.ZooKeeperServer
        public synchronized void startup() {
            if (sessionTracker == null) {
                // 首先创建一个 sessionTracker, 它是一个异步线程,主要处理session的过期处理问题
                createSessionTracker();
            }
            // 开启处理线程
            startSessionTracker();
            // 设置处理器链,至关重要
            setupRequestProcessors();
    
            // 注册 JMX
            registerJMX();
    
            // 最后标识启动完成,运行中
            setState(State.RUNNING);
            // 唤醒被阻塞的所有对象
            notifyAll();
        }
        
        // org.apache.zookeeper.server.SessionTrackerImpl
        public SessionTrackerImpl(SessionExpirer expirer,
                ConcurrentMap<Long, Integer> sessionsWithTimeout, int tickTime,
                long serverId, ZooKeeperServerListener listener)
        {
            super("SessionTracker", listener);
            this.expirer = expirer;
            this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime);
            this.sessionsWithTimeout = sessionsWithTimeout;
            this.nextSessionId.set(initializeNextSession(serverId));
            for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
                trackSession(e.getKey(), e.getValue());
            }
    
            EphemeralType.validateServerId(serverId);
        }
        @Override
        public void run() {
            try {
                while (running) {
                    long waitTime = sessionExpiryQueue.getWaitTime();
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                        continue;
                    }
                    // 主要任务就是将过期的session 关闭掉
                    for (SessionImpl s : sessionExpiryQueue.poll()) {
                        setSessionClosing(s.sessionId);
                        expirer.expire(s);
                    }
                }
            } catch (InterruptedException e) {
                handleException(this.getName(), e);
            }
            LOG.info("SessionTrackerImpl exited loop!");
        }
        
        // org.apache.zookeeper.server.ZooKeeperServer
        // 构建处理器链,由此组合请求进来后的处理方式
        protected void setupRequestProcessors() {
            // 这里使用一个责任链模式进行包装 多个 processer
            // PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
            // 先准备 request, 然后落盘数据, 最后处理
            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
            RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                    finalProcessor);
            // SyncRequestProcessor 是一个异步线程, 主要处理请求数据的实时落盘操作
            ((SyncRequestProcessor)syncProcessor).start();
            firstProcessor = new PrepRequestProcessor(this, syncProcessor);
            ((PrepRequestProcessor)firstProcessor).start();
        }
        
        // org.apache.zookeeper.server.PrepRequestProcessor, 作为第一个请求处理的线程
        @Override
        public void run() {
            try {
                while (true) {
                    // LinkedBlockingQueue<Request>, 阻塞队列
                    Request request = submittedRequests.take();
                    long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                    if (request.type == OpCode.ping) {
                        traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                    }
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                    }
                    if (Request.requestOfDeath == request) {
                        break;
                    }
                    // 处理请求
                    pRequest(request);
                }
            } catch (RequestProcessorException e) {
                if (e.getCause() instanceof XidRolloverException) {
                    LOG.info(e.getCause().getMessage());
                }
                handleException(this.getName(), e);
            } catch (Exception e) {
                handleException(this.getName(), e);
            }
            LOG.info("PrepRequestProcessor exited loop!");
        }
        
        // org.apache.zookeeper.server.SyncRequestProcessor
        // 当有请求需要进行数据落盘时,仅仅是将数据插入到 queuedRequests 中,即可,该后台线程会及时把数据刷入磁盘的
        @Override
        public void run() {
            try {
                int logCount = 0;
    
                // we do this in an attempt to ensure that not all of the servers
                // in the ensemble take a snapshot at the same time
                int randRoll = r.nextInt(snapCount/2);
                while (true) {
                    Request si = null;
                    // 阻塞获取队列
                    if (toFlush.isEmpty()) {
                        si = queuedRequests.take();
                    } else {
                        si = queuedRequests.poll();
                        if (si == null) {
                            flush(toFlush);
                            continue;
                        }
                    }
                    // 如果是关闭请求, Request.requestOfDeath , 则直接退出
                    if (si == requestOfDeath) {
                        break;
                    }
                    if (si != null) {
                        // track the number of records written to the log
                        if (zks.getZKDatabase().append(si)) {
                            logCount++;
                            if (logCount > (snapCount / 2 + randRoll)) {
                                randRoll = r.nextInt(snapCount/2);
                                // roll the log
                                zks.getZKDatabase().rollLog();
                                // take a snapshot
                                if (snapInProcess != null && snapInProcess.isAlive()) {
                                    LOG.warn("Too busy to snap, skipping");
                                } else {
                                    snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                            public void run() {
                                                try {
                                                    zks.takeSnapshot();
                                                } catch(Exception e) {
                                                    LOG.warn("Unexpected exception", e);
                                                }
                                            }
                                        };
                                    snapInProcess.start();
                                }
                                logCount = 0;
                            }
                        } else if (toFlush.isEmpty()) {
                            // optimization for read heavy workloads
                            // iff this is a read, and there are no pending
                            // flushes (writes), then just pass this to the next
                            // processor
                            if (nextProcessor != null) {
                                // 因其本身是一个独立线程,所以需要独立调用下一个处理器
                                nextProcessor.processRequest(si);
                                if (nextProcessor instanceof Flushable) {
                                    ((Flushable)nextProcessor).flush();
                                }
                            }
                            continue;
                        }
                        toFlush.add(si);
                        if (toFlush.size() > 1000) {
                            flush(toFlush);
                        }
                    }
                }
            } catch (Throwable t) {
                handleException(this.getName(), t);
            } finally{
                running = false;
            }
            LOG.info("SyncRequestProcessor exited!");
        }
        
        // 处理各类请求,进行不同类型的区分
        /**
         * This method will be called inside the ProcessRequestThread, which is a
         * singleton, so there will be a single thread calling this code.
         *
         * @param request
         */
        protected void pRequest(Request request) throws RequestProcessorException {
            // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
            // request.type + " id = 0x" + Long.toHexString(request.sessionId));
            request.setHdr(null);
            request.setTxn(null);
    
            try {
                switch (request.type) {
                case OpCode.createContainer:
                case OpCode.create:
                case OpCode.create2:
                    CreateRequest create2Request = new CreateRequest();
                    pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
                    break;
                case OpCode.createTTL:
                    CreateTTLRequest createTtlRequest = new CreateTTLRequest();
                    pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
                    break;
                case OpCode.deleteContainer:
                case OpCode.delete:
                    DeleteRequest deleteRequest = new DeleteRequest();
                    pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
                    break;
                case OpCode.setData:
                    SetDataRequest setDataRequest = new SetDataRequest();
                    pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
                    break;
                case OpCode.reconfig:
                    ReconfigRequest reconfigRequest = new ReconfigRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
                    pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
                    break;
                case OpCode.setACL:
                    SetACLRequest setAclRequest = new SetACLRequest();
                    pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
                    break;
                case OpCode.check:
                    CheckVersionRequest checkRequest = new CheckVersionRequest();
                    pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
                    break;
                case OpCode.multi:
                    MultiTransactionRecord multiRequest = new MultiTransactionRecord();
                    try {
                        ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
                    } catch(IOException e) {
                        request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
                                Time.currentWallTime(), OpCode.multi));
                        throw e;
                    }
                    List<Txn> txns = new ArrayList<Txn>();
                    //Each op in a multi-op must have the same zxid!
                    long zxid = zks.getNextZxid();
                    KeeperException ke = null;
    
                    //Store off current pending change records in case we need to rollback
                    Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
    
                    for(Op op: multiRequest) {
                        Record subrequest = op.toRequestRecord();
                        int type;
                        Record txn;
    
                        /* If we've already failed one of the ops, don't bother
                         * trying the rest as we know it's going to fail and it
                         * would be confusing in the logfiles.
                         */
                        if (ke != null) {
                            type = OpCode.error;
                            txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
                        }
    
                        /* Prep the request and convert to a Txn */
                        else {
                            try {
                                pRequest2Txn(op.getType(), zxid, request, subrequest, false);
                                type = request.getHdr().getType();
                                txn = request.getTxn();
                            } catch (KeeperException e) {
                                ke = e;
                                type = OpCode.error;
                                txn = new ErrorTxn(e.code().intValue());
    
                                if (e.code().intValue() > Code.APIERROR.intValue()) {
                                    LOG.info("Got user-level KeeperException when processing {} aborting" +
                                            " remaining multi ops. Error Path:{} Error:{}",
                                            request.toString(), e.getPath(), e.getMessage());
                                }
    
                                request.setException(e);
    
                                /* Rollback change records from failed multi-op */
                                rollbackPendingChanges(zxid, pendingChanges);
                            }
                        }
    
                        //FIXME: I don't want to have to serialize it here and then
                        //       immediately deserialize in next processor. But I'm
                        //       not sure how else to get the txn stored into our list.
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                        txn.serialize(boa, "request") ;
                        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
    
                        txns.add(new Txn(type, bb.array()));
                    }
    
                    request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                            Time.currentWallTime(), request.type));
                    request.setTxn(new MultiTxn(txns));
    
                    break;
    
                //create.close session don't require request record
                case OpCode.createSession:
                case OpCode.closeSession:
                    if (!request.isLocalSession()) {
                        pRequest2Txn(request.type, zks.getNextZxid(), request,
                                     null, true);
                    }
                    break;
    
                //All the rest don't need to create a Txn - just verify session
                case OpCode.sync:
                case OpCode.exists:
                case OpCode.getData:
                case OpCode.getACL:
                case OpCode.getChildren:
                case OpCode.getChildren2:
                case OpCode.ping:
                case OpCode.setWatches:
                case OpCode.checkWatches:
                case OpCode.removeWatches:
                    zks.sessionTracker.checkSession(request.sessionId,
                            request.getOwner());
                    break;
                default:
                    LOG.warn("unknown type " + request.type);
                    break;
                }
            } catch (KeeperException e) {
                if (request.getHdr() != null) {
                    request.getHdr().setType(OpCode.error);
                    request.setTxn(new ErrorTxn(e.code().intValue()));
                }
    
                if (e.code().intValue() > Code.APIERROR.intValue()) {
                    LOG.info("Got user-level KeeperException when processing {} Error Path:{} Error:{}",
                            request.toString(), e.getPath(), e.getMessage());
                }
                request.setException(e);
            } catch (Exception e) {
                // log at error level as we are returning a marshalling
                // error to the user
                LOG.error("Failed to process " + request, e);
    
                StringBuilder sb = new StringBuilder();
                ByteBuffer bb = request.request;
                if(bb != null){
                    bb.rewind();
                    while (bb.hasRemaining()) {
                        sb.append(Integer.toHexString(bb.get() & 0xff));
                    }
                } else {
                    sb.append("request buffer is null");
                }
    
                LOG.error("Dumping request buffer: 0x" + sb.toString());
                if (request.getHdr() != null) {
                    request.getHdr().setType(OpCode.error);
                    request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
                }
            }
            request.zxid = zks.getZxid();
            // 因该线程是第一个处理器,所以,需要把处理权让给下一个处理器,即 SyncRequestProcessor, 然后是 FinalRequestProcessor
            // 当然了,SyncRequestProcessor 的处理方式,仅仅是放入一个队列中而已, queuedRequests.add(request);
            nextProcessor.processRequest(request);
        }

      处理器启动完成后,接下来进行 JMX 的启动;

        // org.apache.zookeeper.server.ZooKeeperServer
        protected void registerJMX() {
            // register with JMX
            try {
                jmxServerBean = new ZooKeeperServerBean(this);
                MBeanRegistry.getInstance().register(jmxServerBean, null);
    
                try {
                    jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
                    MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
                } catch (Exception e) {
                    LOG.warn("Failed to register with JMX", e);
                    jmxDataTreeBean = null;
                }
            } catch (Exception e) {
                LOG.warn("Failed to register with JMX", e);
                jmxServerBean = null;
            }
        }
        

      最后,zk 将阻塞在 shutdownLatch.await(); 等待关闭信号,做优雅关闭!

      zkServer 启动完成!

    总体来说下启动逻辑:

      1. 集群和单机模式原本是一个启动入口;

      2. 在配置文件解析之后,才发现是一个单机模式,此时,则重新调用单机模式方法重新运行;

      3. 启动阶段主要为创建 selector, workerPool 等等的线程过程;

      4. 启动时将进行一次数据初始化或数据恢复;

      5. ZKDatabase 作为重要的存储结构贯穿 zk 的数据存储;

      6. zkServer 最终将阻塞在关闭信号等待处;

      7. 请求的处理使用责任链模式进行依次处理;

      扫的是启动过程,但是实际的处理业务逻辑并没有说明。(这可能就是所谓:然而这并没有什么卵用!)

      欲知后事如何,且听下回分解!

  • 相关阅读:
    Foundation框架中一些类的使用
    Objective-C知识总结(5)
    Javascript 严格模式详解
    JS-数组冒泡排序
    JS--垒房子
    JS-小球碰撞反弹
    Js制作的文字游戏
    JS产生随机一注彩票
    JS编写背景图切换
    JS编写全选,复选按钮
  • 原文地址:https://www.cnblogs.com/yougewe/p/10575094.html
Copyright © 2011-2022 走看看