zoukankan      html  css  js  c++  java
  • HDFS源码分析(四)-----节点Decommission机制

    前言

    在Hadoop集群中,按照集群规模来划分,规模可大可小,大的例如百度,据说有4000台规模大小的Hadoop集群,小的话,几十台机器组成的集群也都是存在的。但是不论说是大型的集群以及小规模的集群,都免不了出现节点故障的情况,尤其是超大型的集群,节点故障几乎天天发生,因此如何做到正确,稳妥的故障情况处理,就显得很重要了,这里提供一个在Hadoop集群中可以想到的办法,就是Decommission操作,节点下线操作,一般的情况是故障节点已经是一个dead节点,或是出现异常情况的节点。此时如若不处理,或许会影响到整个集群的性能。所以在这里分享一下Hadoop中的Decommision机制。


    Hadoop节点Decommision操作

    在分析相关源码之前,有必要了解一下,让一个数据节点下线的物理操作,操作步骤其实很简单,在以前老版本的Hadoop中,好像是可以通过hadoop dfsadmin带参数的形式执行,但是在最近新版的Hadoop中好像这类命令失效了,于是我在做测试的时候,用了一个更通用的办法来触发这一行为,就是把目标下线节点加入execlude文件中,就是拒绝接入Hadoop集群的节点名单,姑且可以理解为黑名单列表,对应的是include名单,默认这2个名单都没配,所以数据节点一启动,就会注册到namenode节点上。然后是再执行Hadoop 的refreshnode命令,此命令就会从对应的此配置文件中读取最新的数据节点信息,然后开始decommission操作,在50070的ui界面上就可以看到待下线节点的状态会从active状态变为decommision in progress,此时此数据节点的block将会被逐步的拷贝出去,最后随着操作的完成,最终状态就会被变为decommissioned,此时就可以正式下线此节点,用hadoop-demons.sh namenode stop即可。在执行过程的前后,可以执行hadoop fsck的方法观察block块的路径,判断block拷贝情况。


    相关涉及类

    为什么花了这么多的篇幅介绍,decommision操作呢,因为操作的顺序与实际代码的运行流程基本吻合,有很强的关联性。在下面具体的分析过程中将会逐步体现出来。下面简要列出相关的2个类。

    1.DecommissionManager--decommission操作管理类,里面包含了decommission操作状态监控。

    2.FSNamesystem--这是一个大的操作类,内部包含了许多模块的工作,包括之前介绍过的副本相关操作也是部分在此类中进行中转,与decommission主要的方法refreshNode()方法包含于此。


    Decommision代码跟踪分析

    在物理操作中,decommision操作的触发是因为添加了execlude文件,然后再输入refreshNode命令开始的,与此就会对应到了FSNamesystem的同名方法

    /**
       * Rereads the config to get hosts and exclude list file names.
       * Rereads the files to update the hosts and exclude lists.  It
       * checks if any of the hosts have changed states:
       * 1. Added to hosts  --> no further work needed here.
       * 2. Removed from hosts --> mark AdminState as decommissioned. 
       * 3. Added to exclude --> start decommission.
       * 4. Removed from exclude --> stop decommission.
       * 重新从配置中读取节点列表,移除掉准备下线的列表等
       */
      public void refreshNodes(Configuration conf) throws IOException {
        checkSuperuserPrivilege();
        // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
        // Update the file names and refresh internal includes and excludes list
        if (conf == null)
          conf = new Configuration();
        //重新读取配置文件中的dfs.hosts以及dfs.hosts.exclude属性
        hostsReader.updateFileNames(conf.get("dfs.hosts",""), 
                                    conf.get("dfs.hosts.exclude", ""));
        hostsReader.refresh();
    ....
    果然在这里会重新去读exclude,include文件数据,然后这里会遍历当前的数据节点,与配置中新增的节点进行匹配

    synchronized (this) {
          //遍历数据节点
          for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
               it.hasNext();) {
            DatanodeDescriptor node = it.next();
            // Check if not include.
            //判断数据节点是否在允许的主机列表内
            if (!inHostsList(node, null)) {
              //如果不是,则把此节点的状态设为Decommissioned,代表着此节点准备下线
              node.setDecommissioned();  // case 2.
            } else {
    inHost系列的判断方法如下,inExclude与此方法同,不附上代码

    /** 
       * Keeps track of which datanodes/ipaddress are allowed to connect to the namenode.
       * 如何判断节点是否包含在允许接入列表中的判断方法,exclude列表是同样的道理
       */
      private boolean inHostsList(DatanodeID node, String ipAddr) {
        //从hostReader中读取最新的host列表
        Set<String> hostsList = hostsReader.getHosts();
        //利用主机名去判断
        return (hostsList.isEmpty() || 
                (ipAddr != null && hostsList.contains(ipAddr)) ||
                hostsList.contains(node.getHost()) ||
                hostsList.contains(node.getName()) || 
                ((node instanceof DatanodeInfo) && 
                 hostsList.contains(((DatanodeInfo)node).getHostName())));
      }
    判断完毕之后,会进行逻辑判断,如果节点在exclude名单中,代表准备下线,则修改其状态,如果是正在下线的节点,则无须操作。完整逻辑如下

    /**
       * Rereads the config to get hosts and exclude list file names.
       * Rereads the files to update the hosts and exclude lists.  It
       * checks if any of the hosts have changed states:
       * 1. Added to hosts  --> no further work needed here.
       * 2. Removed from hosts --> mark AdminState as decommissioned. 
       * 3. Added to exclude --> start decommission.
       * 4. Removed from exclude --> stop decommission.
       * 重新从配置中读取节点列表,移除掉准备下线的列表等
       */
      public void refreshNodes(Configuration conf) throws IOException {
        checkSuperuserPrivilege();
        // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
        // Update the file names and refresh internal includes and excludes list
        if (conf == null)
          conf = new Configuration();
        //重新读取配置文件中的dfs.hosts以及dfs.hosts.exclude属性
        hostsReader.updateFileNames(conf.get("dfs.hosts",""), 
                                    conf.get("dfs.hosts.exclude", ""));
        hostsReader.refresh();
        synchronized (this) {
          //遍历数据节点
          for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
               it.hasNext();) {
            DatanodeDescriptor node = it.next();
            // Check if not include.
            //判断数据节点是否在允许的主机列表内
            if (!inHostsList(node, null)) {
              //如果不是,则把此节点的状态设为Decommissioned,代表着此节点准备下线
              node.setDecommissioned();  // case 2.
            } else {
               //入如果此节点是包含在不允许接入的列表名单中时
              if (inExcludedHostsList(node, null)) {
                //判断此时状态是否为还没开始下线操作,如果是开始decommission
                if (!node.isDecommissionInProgress() && 
                    !node.isDecommissioned()) {
                  startDecommission(node);   // case 3.
                }
              } else {
                //如果是其他的情况,如果节点处于decommsion操作,则停止操作
                if (node.isDecommissionInProgress() || 
                    node.isDecommissioned()) {
                  stopDecommission(node);   // case 4.
                } 
              }
            }
          }
        } 
          
      }
    然后开始沿着decommision操作继续往里走,就是startDecommision方法

    /**
       * Start decommissioning the specified datanode. 
       * 对指定节点开始进行decommission操作
       */
      private void startDecommission (DatanodeDescriptor node) 
        throws IOException {
    
        if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
          LOG.info("Start Decommissioning node " + node.getName());
          node.startDecommission();
          //设置节点decommison开始时间
          node.decommissioningStatus.setStartTime(now());
          //
          // all the blocks that reside on this node have to be 
          // replicated.
          //检查此时的decommission操作状态
          checkDecommissionStateInternal(node);
        }
      }

    在这个方法主要操作还是在于让节点启动相应的decommision下线操作,开始一波副本的拷贝工作了,在这里感觉第一次启动,就在这里判断decommision状态,个人感觉没有必要,一般操作不会很快结束,一般的decomission监控会有额外的线程周期性的监控此类操作。而这个线程进行检查的函数也是checkDecommissionStateInternal方法,他是如何进行检查判断的呢

    /**
       * Change, if appropriate, the admin state of a datanode to 
       * decommission completed. Return true if decommission is complete.
       * decommision的状态检测是根据其上的副本量来衡量的
       */
      boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
        //
        // Check to see if all blocks in this decommissioned
        // node has reached their target replication factor.
        //
        if (node.isDecommissionInProgress()) {
          //调用副本进度判断函数
          if (!isReplicationInProgress(node)) {
            node.setDecommissioned();
            LOG.info("Decommission complete for node " + node.getName());
          }
        }
        if (node.isDecommissioned()) {
          return true;
        }
        return false;
      }
    答案就在上面,通过剩余副本的拷贝情况,如果isReplicationInPogress返回FALSE代表了,已经全部完成拷贝工作了,状态就可以修改为decomissioned结束状态了,下面仔细看看这个isReplicationInProgress方法

    /**
       * Return true if there are any blocks on this node that have not
       * yet reached their replication factor. Otherwise returns false.
       * 如果当前数据节点block块的副本系数还没有满足期望的副本数值值,则表明还要添加复制请求
       */
      private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
        boolean status = false;
        int underReplicatedBlocks = 0;
        int decommissionOnlyReplicas = 0;
        int underReplicatedInOpenFiles = 0;
        
        //遍历此节点上的所有数据块
        for(final Iterator<Block> i = srcNode.getBlockIterator(); i.hasNext(); ) {
          final Block block = i.next();
          INode fileINode = blocksMap.getINode(block);
    
          if (fileINode != null) {
            NumberReplicas num = countNodes(block);
            //获取此数据块当前的副本数
            int curReplicas = num.liveReplicas();
            //获取此副本块的期望副本块数
            int curExpectedReplicas = getReplication(block);
            //如果期望副本块数大于当前副本块数,表明block还需要复制
            if (curExpectedReplicas > curReplicas) {
              // Log info about one block for this node which needs replication
              if (!status) {
                //做状态的修改,表明block还需要复制
                status = true;
                logBlockReplicationInfo(block, srcNode, num);
              }
              underReplicatedBlocks++;
              if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
                decommissionOnlyReplicas++;
              }
              if (fileINode.isUnderConstruction()) {
                underReplicatedInOpenFiles++;
              }
    
              if (!neededReplications.contains(block) &&
                pendingReplications.getNumReplicas(block) == 0) {
                //
                // These blocks have been reported from the datanode
                // after the startDecommission method has been executed. These
                // blocks were in flight when the decommission was started.
                //
                //添加新的副本复制请求
                neededReplications.add(block, 
                                       curReplicas,
                                       num.decommissionedReplicas(),
                                       curExpectedReplicas);
              }
            }
          }
        }
        srcNode.decommissioningStatus.set(underReplicatedBlocks,
            decommissionOnlyReplicas, underReplicatedInOpenFiles);
    
        return status;
      }
    原理很简单,对于待撤销数据节点上的每个block块,判断当前副本与期望副本数之间的差,如果不满足,就增强复制请求,至此,decommision这个核心流程就走通了.下面看看一个常驻的监控线程,毕竟他才是主要做监控进度这项任务的.


    DecommissionManager

    decommisionMannager,decommision操作管理器,所包含的变量很少

    /**
     * Manage node decommissioning.
     * 节点Decommission操作状态管理器
     */
    class DecommissionManager {
      static final Log LOG = LogFactory.getLog(DecommissionManager.class);
      
      //名字空间系统
      private final FSNamesystem fsnamesystem;
    
      DecommissionManager(FSNamesystem namesystem) {
        this.fsnamesystem = namesystem;
      }
    
      /** Periodically check decommission status. */
      //监控方法
      class Monitor implements Runnable {
    ...
    }

    关键看他内部的核心monitor runnable

    /** Periodically check decommission status. */
      //监控方法
      class Monitor implements Runnable {
        /** recheckInterval is how often namenode checks
         *  if a node has finished decommission
         * 定期检查周期
         */
        private final long recheckInterval;
        /** The number of decommission nodes to check for each interval */
        private final int numNodesPerCheck;
        /** firstkey can be initialized to anything. */
        private String firstkey = "";
    
        Monitor(int recheckIntervalInSecond, int numNodesPerCheck) {
          this.recheckInterval = recheckIntervalInSecond * 1000L;
          this.numNodesPerCheck = numNodesPerCheck;
        }
    
        /**
         * Check decommission status of numNodesPerCheck nodes
         * for every recheckInterval milliseconds.
         */
        public void run() {
          for(; fsnamesystem.isRunning(); ) {
            synchronized(fsnamesystem) {
              //调用check()方法
              check();
            }
      
            try {
              Thread.sleep(recheckInterval);
            } catch (InterruptedException ie) {
              LOG.info("Interrupted " + this.getClass().getSimpleName(), ie);
            }
          }
        }
    for循环内持周期性的续调check()方法,如果系统没有结束的话,docheck方法又会调用之前提到的checkDecommission的方法

    private void check() {
          int count = 0;
          //遍历每个数据节点
          for(Map.Entry<String, DatanodeDescriptor> entry
              : new CyclicIteration<String, DatanodeDescriptor>(
                  fsnamesystem.datanodeMap, firstkey)) {
            final DatanodeDescriptor d = entry.getValue();
            firstkey = entry.getKey();
     
            //如果数据节点正处于decommison操作的话,则做检查
            if (d.isDecommissionInProgress()) {
              try {
                //调用fsnamesystem的checkDecommissionStateInternal方法,此方法内部又会调用isReplicationInProgress进行副本的
                //情况判断
                fsnamesystem.checkDecommissionStateInternal(d);
              } catch(Exception e) {
                LOG.warn("entry=" + entry, e);
              }
              if (++count == numNodesPerCheck) {
                return;
              }
            }
          }
        }
      }
    他是每个数据节点遍历着判断.OK,希望通过我的分析,大家对Decommission有更多的了解,有所收获.

    全部代码的分析请点击链接https://github.com/linyiqun/hadoop-hdfs,后续将会继续更新HDFS其他方面的代码分析。


    参考文献

    《Hadoop技术内部–HDFS结构设计与实现原理》.蔡斌等





  • 相关阅读:
    2、Qt Project之鼠标事件监控
    1、Qt Project之基本文件打开与保存
    Qt界面设计基础
    基于Keil软件的MCU环境搭建
    一次性将word中的数字和字母全部改为“Times New Roman”字体
    PAT 1004 Counting Leaves
    PAT 1003 Emergency
    DevC++ 控制台项目初始代码修改方法
    Win7在命令提示符(cmd.exe)中如何进行复制、粘贴工作
    VMware虚拟机如何在后台运行,后台运行怎么设置其在电脑右下角显示托盘图标
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183879.html
Copyright © 2011-2022 走看看