zoukankan      html  css  js  c++  java
  • HDFS源码分析(四)-----节点Decommission机制

    前言

    在Hadoop集群中,按照集群规模来划分,规模可大可小,大的例如百度,据说有4000台规模大小的Hadoop集群,小的话,几十台机器组成的集群也都是存在的。但是不论说是大型的集群以及小规模的集群,都免不了出现节点故障的情况,尤其是超大型的集群,节点故障几乎天天发生,因此如何做到正确,稳妥的故障情况处理,就显得很重要了,这里提供一个在Hadoop集群中可以想到的办法,就是Decommission操作,节点下线操作,一般的情况是故障节点已经是一个dead节点,或是出现异常情况的节点。此时如若不处理,或许会影响到整个集群的性能。所以在这里分享一下Hadoop中的Decommision机制。


    Hadoop节点Decommision操作

    在分析相关源码之前,有必要了解一下,让一个数据节点下线的物理操作,操作步骤其实很简单,在以前老版本的Hadoop中,好像是可以通过hadoop dfsadmin带参数的形式执行,但是在最近新版的Hadoop中好像这类命令失效了,于是我在做测试的时候,用了一个更通用的办法来触发这一行为,就是把目标下线节点加入execlude文件中,就是拒绝接入Hadoop集群的节点名单,姑且可以理解为黑名单列表,对应的是include名单,默认这2个名单都没配,所以数据节点一启动,就会注册到namenode节点上。然后是再执行Hadoop 的refreshnode命令,此命令就会从对应的此配置文件中读取最新的数据节点信息,然后开始decommission操作,在50070的ui界面上就可以看到待下线节点的状态会从active状态变为decommision in progress,此时此数据节点的block将会被逐步的拷贝出去,最后随着操作的完成,最终状态就会被变为decommissioned,此时就可以正式下线此节点,用hadoop-demons.sh namenode stop即可。在执行过程的前后,可以执行hadoop fsck的方法观察block块的路径,判断block拷贝情况。


    相关涉及类

    为什么花了这么多的篇幅介绍,decommision操作呢,因为操作的顺序与实际代码的运行流程基本吻合,有很强的关联性。在下面具体的分析过程中将会逐步体现出来。下面简要列出相关的2个类。

    1.DecommissionManager--decommission操作管理类,里面包含了decommission操作状态监控。

    2.FSNamesystem--这是一个大的操作类,内部包含了许多模块的工作,包括之前介绍过的副本相关操作也是部分在此类中进行中转,与decommission主要的方法refreshNode()方法包含于此。


    Decommision代码跟踪分析

    在物理操作中,decommision操作的触发是因为添加了execlude文件,然后再输入refreshNode命令开始的,与此就会对应到了FSNamesystem的同名方法

    /**
       * Rereads the config to get hosts and exclude list file names.
       * Rereads the files to update the hosts and exclude lists.  It
       * checks if any of the hosts have changed states:
       * 1. Added to hosts  --> no further work needed here.
       * 2. Removed from hosts --> mark AdminState as decommissioned. 
       * 3. Added to exclude --> start decommission.
       * 4. Removed from exclude --> stop decommission.
       * 重新从配置中读取节点列表,移除掉准备下线的列表等
       */
      public void refreshNodes(Configuration conf) throws IOException {
        checkSuperuserPrivilege();
        // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
        // Update the file names and refresh internal includes and excludes list
        if (conf == null)
          conf = new Configuration();
        //重新读取配置文件中的dfs.hosts以及dfs.hosts.exclude属性
        hostsReader.updateFileNames(conf.get("dfs.hosts",""), 
                                    conf.get("dfs.hosts.exclude", ""));
        hostsReader.refresh();
    ....
    果然在这里会重新去读exclude,include文件数据,然后这里会遍历当前的数据节点,与配置中新增的节点进行匹配

    synchronized (this) {
          //遍历数据节点
          for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
               it.hasNext();) {
            DatanodeDescriptor node = it.next();
            // Check if not include.
            //判断数据节点是否在允许的主机列表内
            if (!inHostsList(node, null)) {
              //如果不是,则把此节点的状态设为Decommissioned,代表着此节点准备下线
              node.setDecommissioned();  // case 2.
            } else {
    inHost系列的判断方法如下,inExclude与此方法同,不附上代码

    /** 
       * Keeps track of which datanodes/ipaddress are allowed to connect to the namenode.
       * 如何判断节点是否包含在允许接入列表中的判断方法,exclude列表是同样的道理
       */
      private boolean inHostsList(DatanodeID node, String ipAddr) {
        //从hostReader中读取最新的host列表
        Set<String> hostsList = hostsReader.getHosts();
        //利用主机名去判断
        return (hostsList.isEmpty() || 
                (ipAddr != null && hostsList.contains(ipAddr)) ||
                hostsList.contains(node.getHost()) ||
                hostsList.contains(node.getName()) || 
                ((node instanceof DatanodeInfo) && 
                 hostsList.contains(((DatanodeInfo)node).getHostName())));
      }
    判断完毕之后,会进行逻辑判断,如果节点在exclude名单中,代表准备下线,则修改其状态,如果是正在下线的节点,则无须操作。完整逻辑如下

    /**
       * Rereads the config to get hosts and exclude list file names.
       * Rereads the files to update the hosts and exclude lists.  It
       * checks if any of the hosts have changed states:
       * 1. Added to hosts  --> no further work needed here.
       * 2. Removed from hosts --> mark AdminState as decommissioned. 
       * 3. Added to exclude --> start decommission.
       * 4. Removed from exclude --> stop decommission.
       * 重新从配置中读取节点列表,移除掉准备下线的列表等
       */
      public void refreshNodes(Configuration conf) throws IOException {
        checkSuperuserPrivilege();
        // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
        // Update the file names and refresh internal includes and excludes list
        if (conf == null)
          conf = new Configuration();
        //重新读取配置文件中的dfs.hosts以及dfs.hosts.exclude属性
        hostsReader.updateFileNames(conf.get("dfs.hosts",""), 
                                    conf.get("dfs.hosts.exclude", ""));
        hostsReader.refresh();
        synchronized (this) {
          //遍历数据节点
          for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
               it.hasNext();) {
            DatanodeDescriptor node = it.next();
            // Check if not include.
            //判断数据节点是否在允许的主机列表内
            if (!inHostsList(node, null)) {
              //如果不是,则把此节点的状态设为Decommissioned,代表着此节点准备下线
              node.setDecommissioned();  // case 2.
            } else {
               //入如果此节点是包含在不允许接入的列表名单中时
              if (inExcludedHostsList(node, null)) {
                //判断此时状态是否为还没开始下线操作,如果是开始decommission
                if (!node.isDecommissionInProgress() && 
                    !node.isDecommissioned()) {
                  startDecommission(node);   // case 3.
                }
              } else {
                //如果是其他的情况,如果节点处于decommsion操作,则停止操作
                if (node.isDecommissionInProgress() || 
                    node.isDecommissioned()) {
                  stopDecommission(node);   // case 4.
                } 
              }
            }
          }
        } 
          
      }
    然后开始沿着decommision操作继续往里走,就是startDecommision方法

    /**
       * Start decommissioning the specified datanode. 
       * 对指定节点开始进行decommission操作
       */
      private void startDecommission (DatanodeDescriptor node) 
        throws IOException {
    
        if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
          LOG.info("Start Decommissioning node " + node.getName());
          node.startDecommission();
          //设置节点decommison开始时间
          node.decommissioningStatus.setStartTime(now());
          //
          // all the blocks that reside on this node have to be 
          // replicated.
          //检查此时的decommission操作状态
          checkDecommissionStateInternal(node);
        }
      }

    在这个方法主要操作还是在于让节点启动相应的decommision下线操作,开始一波副本的拷贝工作了,在这里感觉第一次启动,就在这里判断decommision状态,个人感觉没有必要,一般操作不会很快结束,一般的decomission监控会有额外的线程周期性的监控此类操作。而这个线程进行检查的函数也是checkDecommissionStateInternal方法,他是如何进行检查判断的呢

    /**
       * Change, if appropriate, the admin state of a datanode to 
       * decommission completed. Return true if decommission is complete.
       * decommision的状态检测是根据其上的副本量来衡量的
       */
      boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
        //
        // Check to see if all blocks in this decommissioned
        // node has reached their target replication factor.
        //
        if (node.isDecommissionInProgress()) {
          //调用副本进度判断函数
          if (!isReplicationInProgress(node)) {
            node.setDecommissioned();
            LOG.info("Decommission complete for node " + node.getName());
          }
        }
        if (node.isDecommissioned()) {
          return true;
        }
        return false;
      }
    答案就在上面,通过剩余副本的拷贝情况,如果isReplicationInPogress返回FALSE代表了,已经全部完成拷贝工作了,状态就可以修改为decomissioned结束状态了,下面仔细看看这个isReplicationInProgress方法

    /**
       * Return true if there are any blocks on this node that have not
       * yet reached their replication factor. Otherwise returns false.
       * 如果当前数据节点block块的副本系数还没有满足期望的副本数值值,则表明还要添加复制请求
       */
      private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
        boolean status = false;
        int underReplicatedBlocks = 0;
        int decommissionOnlyReplicas = 0;
        int underReplicatedInOpenFiles = 0;
        
        //遍历此节点上的所有数据块
        for(final Iterator<Block> i = srcNode.getBlockIterator(); i.hasNext(); ) {
          final Block block = i.next();
          INode fileINode = blocksMap.getINode(block);
    
          if (fileINode != null) {
            NumberReplicas num = countNodes(block);
            //获取此数据块当前的副本数
            int curReplicas = num.liveReplicas();
            //获取此副本块的期望副本块数
            int curExpectedReplicas = getReplication(block);
            //如果期望副本块数大于当前副本块数,表明block还需要复制
            if (curExpectedReplicas > curReplicas) {
              // Log info about one block for this node which needs replication
              if (!status) {
                //做状态的修改,表明block还需要复制
                status = true;
                logBlockReplicationInfo(block, srcNode, num);
              }
              underReplicatedBlocks++;
              if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
                decommissionOnlyReplicas++;
              }
              if (fileINode.isUnderConstruction()) {
                underReplicatedInOpenFiles++;
              }
    
              if (!neededReplications.contains(block) &&
                pendingReplications.getNumReplicas(block) == 0) {
                //
                // These blocks have been reported from the datanode
                // after the startDecommission method has been executed. These
                // blocks were in flight when the decommission was started.
                //
                //添加新的副本复制请求
                neededReplications.add(block, 
                                       curReplicas,
                                       num.decommissionedReplicas(),
                                       curExpectedReplicas);
              }
            }
          }
        }
        srcNode.decommissioningStatus.set(underReplicatedBlocks,
            decommissionOnlyReplicas, underReplicatedInOpenFiles);
    
        return status;
      }
    原理很简单,对于待撤销数据节点上的每个block块,判断当前副本与期望副本数之间的差,如果不满足,就增强复制请求,至此,decommision这个核心流程就走通了.下面看看一个常驻的监控线程,毕竟他才是主要做监控进度这项任务的.


    DecommissionManager

    decommisionMannager,decommision操作管理器,所包含的变量很少

    /**
     * Manage node decommissioning.
     * 节点Decommission操作状态管理器
     */
    class DecommissionManager {
      static final Log LOG = LogFactory.getLog(DecommissionManager.class);
      
      //名字空间系统
      private final FSNamesystem fsnamesystem;
    
      DecommissionManager(FSNamesystem namesystem) {
        this.fsnamesystem = namesystem;
      }
    
      /** Periodically check decommission status. */
      //监控方法
      class Monitor implements Runnable {
    ...
    }

    关键看他内部的核心monitor runnable

    /** Periodically check decommission status. */
      //监控方法
      class Monitor implements Runnable {
        /** recheckInterval is how often namenode checks
         *  if a node has finished decommission
         * 定期检查周期
         */
        private final long recheckInterval;
        /** The number of decommission nodes to check for each interval */
        private final int numNodesPerCheck;
        /** firstkey can be initialized to anything. */
        private String firstkey = "";
    
        Monitor(int recheckIntervalInSecond, int numNodesPerCheck) {
          this.recheckInterval = recheckIntervalInSecond * 1000L;
          this.numNodesPerCheck = numNodesPerCheck;
        }
    
        /**
         * Check decommission status of numNodesPerCheck nodes
         * for every recheckInterval milliseconds.
         */
        public void run() {
          for(; fsnamesystem.isRunning(); ) {
            synchronized(fsnamesystem) {
              //调用check()方法
              check();
            }
      
            try {
              Thread.sleep(recheckInterval);
            } catch (InterruptedException ie) {
              LOG.info("Interrupted " + this.getClass().getSimpleName(), ie);
            }
          }
        }
    for循环内持周期性的续调check()方法,如果系统没有结束的话,docheck方法又会调用之前提到的checkDecommission的方法

    private void check() {
          int count = 0;
          //遍历每个数据节点
          for(Map.Entry<String, DatanodeDescriptor> entry
              : new CyclicIteration<String, DatanodeDescriptor>(
                  fsnamesystem.datanodeMap, firstkey)) {
            final DatanodeDescriptor d = entry.getValue();
            firstkey = entry.getKey();
     
            //如果数据节点正处于decommison操作的话,则做检查
            if (d.isDecommissionInProgress()) {
              try {
                //调用fsnamesystem的checkDecommissionStateInternal方法,此方法内部又会调用isReplicationInProgress进行副本的
                //情况判断
                fsnamesystem.checkDecommissionStateInternal(d);
              } catch(Exception e) {
                LOG.warn("entry=" + entry, e);
              }
              if (++count == numNodesPerCheck) {
                return;
              }
            }
          }
        }
      }
    他是每个数据节点遍历着判断.OK,希望通过我的分析,大家对Decommission有更多的了解,有所收获.

    全部代码的分析请点击链接https://github.com/linyiqun/hadoop-hdfs,后续将会继续更新HDFS其他方面的代码分析。


    参考文献

    《Hadoop技术内部–HDFS结构设计与实现原理》.蔡斌等





  • 相关阅读:
    Java8 Stream Function
    PLINQ (C#/.Net 4.5.1) vs Stream (JDK/Java 8) Performance
    罗素 尊重 《事实》
    小品 《研发的一天》
    Java8 λ表达式 stream group by max then Option then PlainObject
    这人好像一条狗啊。什么是共识?
    TOGAF TheOpenGroup引领开发厂商中立的开放技术标准和认证
    OpenMP vs. MPI
    BPMN2 online draw tools 在线作图工具
    DecisionCamp 2019, Decision Manager, AI, and the Future
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183880.html
Copyright © 2011-2022 走看看