zoukankan      html  css  js  c++  java
  • hadoop2.5.2学习及实践笔记(四)—— namenode启动过程源码概览

    对namenode启动时的相关操作及相关类有一个大体了解,后续深入研究时,再对本文进行补充

    >实现类

    HDFS启动脚本为$HADOOP_HOME/sbin/start-dfs.sh,查看start-dfs.sh可以看出,namenode是通过bin/hdfs命令来启动

    $ vi  start-dfs.sh
    
    # namenodes
    NAMENODES=$($HADOOP_PREFIX/bin/hdfs getconf -namenodes)
    echo "Starting namenodes on [$NAMENODES]"
    "$HADOOP_PREFIX/sbin/hadoop-daemons.sh" 
      --config "$HADOOP_CONF_DIR" 
      --hostnames "$NAMENODES" 
      --script "$bin/hdfs" start namenode $nameStartOpt
    #---------------------------------------------------------

    查看$HADOOP_HOME/bin/hdfs,可以找到namenode启动所调用的java类。

    $ vi bin/hdfs:
    
    if [ "$COMMAND" = "namenode" ] ; then
      CLASS='org.apache.hadoop.hdfs.server.namenode.NameNode'
      HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS"

    >源码查看

    按照前文hadoop2.5.2学习及实践笔记(二)—— 编译源代码及导入源码至eclipse步骤,源码已经导入到eclipse中,快捷键ctrl+shift+R搜索并打开NameNode.java查看源码

    NameNode类中有一个静态代码块,表示在加载器加载NameNode类过程中的准备阶段,就会执行代码块中的代码。HdfsConfigurationinit()方法的方法体是空的,这里的作用是触发对HdfsConfiguration的主动调用,从而保证在执行NameNode类相关调用时,如果HdfsConfiguration类没有被加载和初始化,先触发HdfsConfiguration的初始化过程。

    //org.apache.hadoop.hdfs.server.namenode.NameNode.java
    
    static{
         //HdfsConfiguration类init()方法:public static void init() {}
        HdfsConfiguration.init();
    }

    查看其main方法,可以看出namenode相关操作的主要入口方法是createNameNode(String argv[], Configuration conf)方法。

    //org.apache.hadoop.hdfs.server.namenode.NameNode.java
    
    public static void main(String argv[]) throws Exception {
        if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
          System.exit(0);
        }
    
        try {
          //打印namenode启动或关闭日志信息
          StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
         //namenode相关主要操作
          NameNode namenode = createNameNode(argv, null);
          if (namenode != null) {
              //向客户端和datanode提供RPC服务,直到RPC服务器结束运行
            namenode.join();
          }
        } catch (Throwable e) {
          LOG.fatal("Exception in namenode join", e);
          terminate(1, e);
        }
      }

    createNameNode方法中通过一个switch语句对不同的命令执行不同的操作。比如搭建环境时格式化文件系统时的操作,可以查看FORMAT分支。

    //org.apache.hadoop.hdfs.server.namenode.NameNode.java
    
    public static NameNode createNameNode(String argv[], Configuration conf)
          throws IOException {
        LOG.info("createNameNode " + Arrays.asList(argv));
        if (conf == null)
          conf = new HdfsConfiguration();
        //参数为空时默认: -regular
        StartupOption startOpt = parseArguments(argv);
        if (startOpt == null) {
          printUsage(System.err);
          return null;
        }
        setStartupOption(conf, startOpt);
        
        switch (startOpt) {
          case FORMAT: {//格式化文件系统,伪分布式环境搭建时调用过namenode -format命令
            boolean aborted = format(conf, startOpt.getForceFormat(),
                startOpt.getInteractiveFormat());
            terminate(aborted ? 1 : 0);
            return null; // avoid javac warning
          }
          case GENCLUSTERID: {
            System.err.println("Generating new cluster id:");
            System.out.println(NNStorage.newClusterID());
            terminate(0);
            return null;
          }
          case FINALIZE: {
            System.err.println("Use of the argument '" + StartupOption.FINALIZE +
                "' is no longer supported. To finalize an upgrade, start the NN " +
                " and then run `hdfs dfsadmin -finalizeUpgrade'");
            terminate(1);
            return null; // avoid javac warning
          }
          case ROLLBACK: {
            boolean aborted = doRollback(conf, true);
            terminate(aborted ? 1 : 0);
            return null; // avoid warning
          }
          case BOOTSTRAPSTANDBY: {
            String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
            int rc = BootstrapStandby.run(toolArgs, conf);
            terminate(rc);
            return null; // avoid warning
          }
          case INITIALIZESHAREDEDITS: {
            boolean aborted = initializeSharedEdits(conf,
                startOpt.getForceFormat(),
                startOpt.getInteractiveFormat());
            terminate(aborted ? 1 : 0);
            return null; // avoid warning
          }
          case BACKUP:
          case CHECKPOINT: {//backupnode和checkpointnode启动
            NamenodeRole role = startOpt.toNodeRole();
            DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
              //backupnode继承NameNode类,代码最终执行的还是NameNode的构造方法
            return new BackupNode(conf, role);
          }
          case RECOVER: {
            NameNode.doRecovery(startOpt, conf);
            return null;
          }
          case METADATAVERSION: {
            printMetadataVersion(conf);
            terminate(0);
            return null; // avoid javac warning
          }
          default: {
            DefaultMetricsSystem.initialize("NameNode");
              //启动时startOpt=“-regular”,代码执行default分支,通过构造函数返回一个namenode实例
            return new NameNode(conf);
          }
        }
      }

    namenode的构造方法

    //org.apache.hadoop.hdfs.server.namenode.NameNode.java
    
     public NameNode(Configuration conf) throws IOException {
        this(conf, NamenodeRole.NAMENODE);
      }
     protected NameNode(Configuration conf, NamenodeRole role) 
          throws IOException { 
        this.conf = conf;
        this.role = role;
        //获取fs.defaultFS,设置namenode地址
        setClientNamenodeAddress(conf);
        String nsId = getNameServiceId(conf);
        String namenodeId = HAUtil.getNameNodeId(conf, nsId);
        //是否启用HA
        this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
        //HA状态:启用/备用
        state = createHAState(getStartupOption(conf));
        //读取dfs.ha.allow.stale.reads,设置namenode在备用状态时是否允许读操作,默认为false
        this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
        this.haContext = createHAContext();
        try {
         //联邦环境下,使用该方法配置一系列使用一个逻辑上的nsId组合在一起的namenode
          initializeGenericKeys(conf, nsId, namenodeId);
        //namenode初始化      
        initialize(conf);
          try {
            haContext.writeLock();
            state.prepareToEnterState(haContext);
            //namenode进入相应状态:active state/backup state/standby state
           state.enterState(haContext);
          } finally {
            haContext.writeUnlock();
          }
        } catch (IOException e) {
          this.stop();
          throw e;
        } catch (HadoopIllegalArgumentException e) {
          this.stop();
          throw e;
        }
      }

    namenode初始化方法代码

    //org.apache.hadoop.hdfs.server.namenode.NameNode.java
    
    protected void initialize(Configuration conf) throws IOException {
        if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
          String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
          if (intervals != null) {
            conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
              intervals);
          }
        }
        //设置权限,根据hadoop.security.authentication获取认证方式及规则
        UserGroupInformation.setConfiguration(conf);
        //登录:如果认证方式为simple则退出该方法     
        //否则调用UserGroupInformation.loginUserFromKeytab进行登陆,登陆使用dfs.namenode.kerberos.principal作为用户名
        loginAsNameNodeUser(conf);
    
        //初始化度量系统,用于度量namenode服务状态
        NameNode.initMetrics(conf, this.getRole());
        StartupProgressMetrics.register(startupProgress);
    
        if (NamenodeRole.NAMENODE == role) {
          //启动http服务器
          startHttpServer(conf);
        }
        //根据命令对命名空间进行操作,如:前文所述启动时加载本地命名空间镜像和应用编辑日志,在内存中建立命名空间的映像
        loadNamesystem(conf);
    
        //创建RPC服务器
        rpcServer = createRpcServer(conf);
        if (clientNamenodeAddress == null) {
          // This is expected for MiniDFSCluster. Set it now using 
          // the RPC server's bind address.
          clientNamenodeAddress = 
              NetUtils.getHostPortString(rpcServer.getRpcAddress());
          LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
              + " this namenode/service.");
        }
        if (NamenodeRole.NAMENODE == role) {
          httpServer.setNameNodeAddress(getNameNodeAddress());
          httpServer.setFSImage(getFSImage());
        }
        
        pauseMonitor = new JvmPauseMonitor(conf);
        pauseMonitor.start();
        metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
        
        //启动活动状态和备用状态的公共服务:RPC服务和namenode的插件程序启动
        startCommonServices(conf);
      }

    loadNamesystem(Configuration conf)方法调用FSNamesystem类的loadFromDisk(Configuration conf)。前文提到的,namenode启动时从本地文件系统加载镜像并重做编辑日志,都在此方法中实现。

    //org.apache.hadoop.hdfs.server.namenode.FSNamesystem.java
    
    static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
        //必须的编辑日志目录检查
        checkConfiguration(conf);
        //设在NNStorage,并初始化编辑日志目录。NNStorage主要功能是管理namenode使用的存储目录
        FSImage fsImage = new FSImage(conf,
            FSNamesystem.getNamespaceDirs(conf),
            FSNamesystem.getNamespaceEditsDirs(conf));
        //根据指定的镜像创建FSNamesystem对象
        FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
        StartupOption startOpt = NameNode.getStartupOption(conf);
    if (startOpt == StartupOption.RECOVER) { namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); } long loadStart = now(); try { //加载镜像、重做编辑日志,并打开一个新编辑文件都在此方法中 namesystem.loadFSImage(startOpt); } catch (IOException ioe) { LOG.warn("Encountered exception loading fsimage", ioe); fsImage.close(); throw ioe; } long timeTakenToLoadFSImage = now() - loadStart; LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs"); NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics(); if (nnMetrics != null) { nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage); } return namesystem; } private void loadFSImage(StartupOption startOpt) throws IOException { final FSImage fsImage = getFSImage(); // format before starting up if requested if (startOpt == StartupOption.FORMAT) { fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id startOpt = StartupOption.REGULAR; } boolean success = false; writeLock(); try { // We shouldn't be calling saveNamespace if we've come up in standby state. MetaRecoveryContext recovery = startOpt.createRecoveryContext(); final boolean staleImage = fsImage.recoverTransitionRead(startOpt, this, recovery); if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt)) { rollingUpgradeInfo = null; } final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade(); LOG.info("Need to save fs image? " + needToSave + " (staleImage=" + staleImage + ", haEnabled=" + haEnabled + ", isRollingUpgrade=" + isRollingUpgrade() + ")"); if (needToSave) { fsImage.saveNamespace(this); } else { // No need to save, so mark the phase done. StartupProgress prog = NameNode.getStartupProgress(); prog.beginPhase(Phase.SAVING_CHECKPOINT); prog.endPhase(Phase.SAVING_CHECKPOINT); } // This will start a new log segment and write to the seen_txid file, so // we shouldn't do it when coming up in standby state if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)) { fsImage.openEditLogForWrite(); } success = true; } finally { if (!success) { fsImage.close(); } writeUnlock(); } imageLoadComplete(); }
  • 相关阅读:
    [Swift]LeetCode241. 为运算表达式设计优先级 | Different Ways to Add Parentheses
    [Swift]LeetCode240. 搜索二维矩阵 II | Search a 2D Matrix II
    使用ADO.NET对SQL Server数据库进行訪问
    JavaScript中面向对象那点事
    总结文件操作函数(二)-C语言
    UVa
    深入研究java.lang.Object类
    TCP/IP具体解释--TCP/IP可靠的原理 滑动窗体 拥塞窗体
    W5500EVB UDP模式的測试与理解
    仿新浪首页、主题、详情页,纯html静态页面
  • 原文地址:https://www.cnblogs.com/zhaosk/p/4379548.html
Copyright © 2011-2022 走看看