zoukankan      html  css  js  c++  java
  • zookeeper启动入口

      最近正在研究zookeeper,一些心得记录一下,如有错误,还请大神指正。

    zookeeper下载地址:http://zookeeper.apache.org/releases.html,百度一下就能找到,不过还是在这里列一下。

    我认为学习一个东西,首先要理出一个头绪,否则感觉无从下手,这里我从启动开始研究,即从zkSever.sh入手。

    if [ "x$JMXDISABLE" = "x" ]
    then
        echo "JMX enabled by default" >&2
        # for some reason these two options are necessary on jdk6 on Ubuntu
        #   accord to the docs they are not necessary, but otw jconsole cannot
        #   do a local attach
        ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
    else
        echo "JMX disabled by user request" >&2
        ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
    fi
    

      从zkSever.sh可以看出,启动入口在QuorumPeerMain中,源码如下:

      // 入口函数
    public static void main(String[] args)
      {
        QuorumPeerMain main = new QuorumPeerMain();
       //...1、启动初始化
          main.initializeAndRun(args);
       // ...
      }
    
      protected void initializeAndRun(String[] args)
        throws QuorumPeerConfig.ConfigException, IOException
      {
        // 2、加载配置文件
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
    // 解析配置文件 config.parse(args[0]); } if ((args.length == 1) && (config.servers.size() > 0)) {
        // 配置文件的信息加载至QuorumPeer runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running in standalone mode"); ZooKeeperServerMain.main(args); } } public void runFromConfig(QuorumPeerConfig config) throws IOException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); try { NIOServerCnxn.Factory cnxnFactory = new NIOServerCnxn.Factory(config.getClientPortAddress(), config.getMaxClientCnxns()); // 3、启动QuorumPeer this.quorumPeer = new QuorumPeer(); this.quorumPeer.setClientPortAddress(config.getClientPortAddress()); //...加载各种配置信息 this.quorumPeer.start(); this.quorumPeer.join(); } catch (InterruptedException e) { LOG.warn("Quorum Peer interrupted", e); } }

      可以看出,配置文件的解析由QuorumPeerConfig 类完成,其部分源码如下:

      public void parse(String path)
        throws QuorumPeerConfig.ConfigException
      {
        File configFile = new File(path);
    
        LOG.info("Reading configuration from: " + configFile);
        try
        {
          if (!configFile.exists()) {
            throw new IllegalArgumentException(configFile.toString() + " file is missing");
          }
         // 将配置信息加载如property文件
          Properties cfg = new Properties();
          FileInputStream in = new FileInputStream(configFile);
          try {
            cfg.load(in);
          } finally {
            in.close();
          }
    
          parseProperties(cfg);
        } catch (IOException e) {
          throw new ConfigException("Error processing " + path, e);
        } catch (IllegalArgumentException e) {
          throw new ConfigException("Error processing " + path, e);
        }
      }
    
      public void parseProperties(Properties zkProp)
        throws IOException, QuorumPeerConfig.ConfigException
      {
        int clientPort = 0;
        String clientPortAddress = null;
    // 循环解析配置文件 for (Map.Entry entry : zkProp.entrySet()) { String key = entry.getKey().toString().trim(); String value = entry.getValue().toString().trim(); if (key.equals("dataDir")) { this.dataDir = value; } else if (key.equals("dataLogDir")) { this.dataLogDir = value; } else if (key.equals("clientPort")) {
        // 客户端连接的端口号 clientPort = Integer.parseInt(value); } else if (key.equals("clientPortAddress")) { clientPortAddress = value.trim(); } else if (key.equals("tickTime")) {
        // 心跳时间 this.tickTime = Integer.parseInt(value); } else if (key.equals("maxClientCnxns")) { this.maxClientCnxns = Integer.parseInt(value); } else if (key.equals("minSessionTimeout")) { this.minSessionTimeout = Integer.parseInt(value); } else if (key.equals("maxSessionTimeout")) { this.maxSessionTimeout = Integer.parseInt(value); } else if (key.equals("initLimit")) { this.initLimit = Integer.parseInt(value); } else if (key.equals("syncLimit")) { this.syncLimit = Integer.parseInt(value); } else if (key.equals("electionAlg")) {
        // 选举算法的类型,默认算法为FastLeaderElection this.electionAlg = Integer.parseInt(value); } else if (key.equals("peerType")) { if (value.toLowerCase().equals("observer")) this.peerType = QuorumPeer.LearnerType.OBSERVER; else if (value.toLowerCase().equals("participant")) { this.peerType = QuorumPeer.LearnerType.PARTICIPANT; } else throw new ConfigException("Unrecognised peertype: " + value); } //...

      回到QuorumPeerMain类的runFromConfig方法。此方法中,会将配置信息加载至QuorumPeer,并调用其start方法:

      public synchronized void start()
      {
        try {
          this.zkDb.loadDataBase();
        } catch (IOException ie) {
          LOG.fatal("Unable to load database on disk", ie);
          throw new RuntimeException("Unable to run quorum server ", ie);
        }
        this.cnxnFactory.start();
        startLeaderElection();
        super.start();
      }
    

      在start方法中,会现价在硬盘中的数据,

     this.zkDb.loadDataBase();即ZKDatabase中
      public long loadDataBase()
        throws IOException
      {
        FileTxnSnapLog.PlayBackListener listener = new FileTxnSnapLog.PlayBackListener() {
          public void onTxnLoaded(TxnHeader hdr, Record txn) {
            Request r = new Request(null, 0L, hdr.getCxid(), hdr.getType(), null, null);
    
            r.txn = txn;
            r.hdr = hdr;
            r.zxid = hdr.getZxid();
            ZKDatabase.this.addCommittedProposal(r);
          }
        };
        long zxid = this.snapLog.restore(this.dataTree, this.sessionsWithTimeouts, listener);
        this.initialized = true;
        return zxid;
      }
    

      然后开确定选类型,startLeaderElection

      public synchronized void startLeaderElection() {
        this.currentVote = new Vote(this.myid, getLastLoggedZxid());
        for (QuorumServer p : getView().values()) {
          if (p.id == this.myid) {
            this.myQuorumAddr = p.addr;
            break;
          }
        }
        if (this.myQuorumAddr == null) {
          throw new RuntimeException("My id " + this.myid + " not in the peer list");
        }
        if (this.electionType == 0) {
          try {
            this.udpSocket = new DatagramSocket(this.myQuorumAddr.getPort());
            this.responder = new ResponderThread();
            this.responder.start();
          } catch (SocketException e) {
            throw new RuntimeException(e);
          }
        }
        this.electionAlg = createElectionAlgorithm(this.electionType);//加载选举类型
    }

      然后启动run方法

  • 相关阅读:
    线程池
    单例设计模式
    String,StringBuffer,StringBuilder
    马踏棋盘算法
    最短路径问题 (迪杰斯特拉算法,弗洛伊德算法)
    最小生成树 修路问题(普里姆算法,克鲁斯卡尔算法)
    贪心算法 求解集合覆盖问题
    Stream 数组转换
    unittest与pytest对比
    条件编译
  • 原文地址:https://www.cnblogs.com/shaohz2014/p/5281563.html
Copyright © 2011-2022 走看看