zoukankan      html  css  js  c++  java
  • HDFS如何检测并删除多余副本块

    前言


    在HDFS中,每时每刻都在进行着大量block块的创建和删除操作,这些庞大的block块构建起了这套复杂的分布式系统.普通block的读写删除操作一般人都或多或少了解过一些,但是过量的副本清理机制是否有人知道呢,就是overReplicatedBlock的处理,针对过量的副本块,HDFS怎么处理,何时处理,处理的策略机制如何,本文就给大家分享HDFS在这方面的知识.

    过量副本块以及发生的场景


    过量副本块的意思通俗解释就是集群中有A副本3个,满足标准的3副本策略,但是此时发生了某种场景后,A副本块突然变为5个了,为了达到副本块的标准系数3个,系统就会进行多余2块副本的清除动作,而这个清除动作就是本文所要重点描述的.过量副本块的现象是比较好解释的,那么问题来了,到底有哪些潜在的原因或条件会触发多余副本块的发生呢(在此指的是HDFS中)?本人通过对HDFS源码的阅读,总结出一下3点

    • ReCommission节点重新上线.这类操作是运维操作引起的.节点下线操作会导致大量此节点的block块在集群中大量拷贝,一旦此节点取消下线,之前已拷贝的大量块必然会成为多余的副本块.

    • 人为重新设置block replication副本数.还是以A副本举例,A副本当前满足标准副本数3个,此时用户张三通过使用hdfs的API方法setReplication人为设置副本数为1.此时也会早A副本数多余2个的情况,即使说HDFS中的副本标准系数还是3个.

    • 新添加的block块记录在系统中被丢失.这种可能相对于前2种case的情况,是内部因素造成.这些新添加的丢失的block块记录会在BlockManager进行再次扫描检测,防止出现过量副本的现象.

    OK,以上3种情形就是可能发生过量副本块的原因.至于这3种情况是如何一步步的最终调用到处理多余副本块的过程在后面的描述中会再给出,先来看下多余副本块是如何被选出并处理掉的.

    OverReplication多余副本块处理


    多余副本块的处理分为2个子过程:

    • 多余副本块的选出
    • 选出的多余副本块的处理

    我们从源码中进行一一寻找原因,首先副本块的选出.

    多余副本块的选择


    进入blockManager的processOverReplicatedBlock方法,很显然,方法名已经表明了方法操作的本意了.

    /**
     * Find how many of the containing nodes are "extra",          if any.
     * If there are any extras, call chooseExcessReplicates() to
     * mark them in the excessReplicateMap.
     */
    private void processOverReplicatedBlock(final Block block, final short replication, final DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {

    此方法的注释的意思是找出存在”多余”的节点,如果他们是多余的,调用chooseExcessReplicates并标记他们,加入加入到excessReplicateMap中.下面进行细节的处理

    // 节点列表变量的声明
    Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
    // 从corruptReplicas变量中获取是否存在坏的block所在的节点
    Collection<DatanodeDescriptor> corruptNodes = corruptReplicas.getNodes(block);

    继续后面的处理

        // 遍历此过量副本块所在的节点列表
        for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
          final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
          ...
          LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
              .getDatanodeUuid());
          // 如果在当前过量副本图对象excessReplicateMap中不存在
          if (excessBlocks == null || !excessBlocks.contains(block)) {
            //并且所在节点不是已下线或下线中的节点
            if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
              // 并且这个副本块不是损坏的副本块
              // exclude corrupt replicas
              if (corruptNodes == null || !corruptNodes.contains(cur)) {
                // 将此过滤副本块的一个所在节点加入候选节点列表中
                nonExcess.add(storage);
              }
            }
          }
        }

    所以从这里看出nonExcess对象其实是一个候选节点的概念,将block副本块所在的节点列表进行多种条件的再判断和剔除.最后就调用到选择最终过量副本块节点的方法

    chooseExcessReplicates(nonExcess, block, replication, 
            addedNode, delNodeHint, blockplacement);

    进入chooseExcessReplicates方法

        // first form a rack to datanodes map and
        // 首先会形成机架对datanode节点的映射关系图
        BlockCollection bc = getBlockCollection(b);
        final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
        final List<StorageType> excessTypes = storagePolicy.chooseExcess(
            replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
    
        // 初始化机架->节点列表映射图对象
        final Map<String, List<DatanodeStorageInfo>> rackMap
            = new HashMap<String, List<DatanodeStorageInfo>>();
        // 超过1个副本数的节点列表
        final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
        // 恰好1个副本数的节点列表
        final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();

    为什么要划分不同节点列表的选择呢.因为在这里设计者做了优先选择,在同样拥有多余副本块的节点列表中,优先选择节点中副本数多于1个的,其次再是副本数恰好有1个的节点.这个设计很好理解,因为你上面的多余副本数更多嘛,我当然要先从多的开始删.

        // 节点划分成对应2个集合
        // split nodes into two sets
        // moreThanOne contains nodes on rack with more than one replica
        // exactlyOne contains the remaining nodes
        replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);

    进入划分方法

      public void splitNodesWithRack(
          final Iterable<DatanodeStorageInfo> storages,
          final Map<String, List<DatanodeStorageInfo>> rackMap,
          final List<DatanodeStorageInfo> moreThanOne,
          final List<DatanodeStorageInfo> exactlyOne) {
        // 遍历候选节点列表,形成机架->节点列表的对应关系
        for(DatanodeStorageInfo s: storages) {
          final String rackName = getRack(s.getDatanodeDescriptor());
          List<DatanodeStorageInfo> storageList = rackMap.get(rackName);
          if (storageList == null) {
            storageList = new ArrayList<DatanodeStorageInfo>();
            rackMap.put(rackName, storageList);
          }
          storageList.add(s);
        }

    下面给出的划分算法

        // split nodes into two sets
        for(List<DatanodeStorageInfo> storageList : rackMap.values()) {
          if (storageList.size() == 1) {
            // exactlyOne contains nodes on rack with only one replica
            // 如果机架中对应的节点数量只有1个,则节点上副本数就为1,否则就为多个
            exactlyOne.add(storageList.get(0));
          } else {
            // moreThanOne contains nodes on rack with more than one replica
            moreThanOne.addAll(storageList);
          }
        }

    上面划分的原理应该是与对应的block副本存放策略原理相关,这个我到没有仔细去了解原因,读者可以自行阅读相关BlockPlacementPolicy相关代码进行了解.于是在这段代码过后,节点组就被分为了2大类,exactlyOne和moreThanOne.至此chooseExcessReplicates的上半段代码执行完毕,接下来看下半段代码的执行过程

        // 选择一个待删除的节点会偏向delNodeHintStorage的节点
        // pick one node to delete that favors the delete hint
        // 否在会从节点列表中选出一个可用空间最小
        // otherwise pick one with least space from priSet if it is not empty
        // otherwise one node with least space from remains
        boolean firstOne = true;
        final DatanodeStorageInfo delNodeHintStorage
            = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
        final DatanodeStorageInfo addedNodeStorage
            = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);

    上面这3行注释传达出2个意思

    • 可以直接传入要删除的节点,如果可以,则优选选择传入的delHint节点
    • 在每个节点的内部列表中,优选会选择出可用空间最少的,这个也好理解,同样的副本数的节点列表中,当然要选择可用空间尽可能少的,以便释放出多的空间.
        // 如果目前过量副本所在节点数大于标准副本数,则进行循环移除
        while (nonExcess.size() - replication > 0) {
          final DatanodeStorageInfo cur;
          // 判断是否可以使用delNodeHintStorage节点进行代替
          if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
              moreThanOne, excessTypes)) {
            cur = delNodeHintStorage;
          } else { // regular excessive replica removal
            // 否则进行常规的节点选择
            cur = replicator.chooseReplicaToDelete(bc, b, replication,
                moreThanOne, exactlyOne, excessTypes);
          }

    判断是否可以使用delNodeHintStorage节点的判断逻辑这里就忽略了,主要看一下关键的chooseReplicaToDelete方法,这个分支处理才是最经常用到的处理方式.

        // 选择的节点要么是心跳时间最老的或者是可用空间最少的
        // Pick the node with the oldest heartbeat or with the least free space,
        // if all hearbeats are within the tolerable heartbeat interval
        for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) {
          if (!excessTypes.contains(storage.getStorageType())) {
            continue;
          }

    first和second的节点选择逻辑如下,非常的简单

      /**
       * Pick up replica node set for deleting replica as over-replicated. 
       * First set contains replica nodes on rack with more than one
       * replica while second set contains remaining replica nodes.
       * So pick up first set if not empty. If first is empty, then pick second.
       */
      protected Collection<DatanodeStorageInfo> pickupReplicaSet(
          Collection<DatanodeStorageInfo> first,
          Collection<DatanodeStorageInfo> second) {
        return first.isEmpty() ? second : first;
      }

    在节点列表的每次迭代循环中会进行下面2个指标的对比

          // 进行心跳时间的对比
          if (lastHeartbeat < oldestHeartbeat) {
            oldestHeartbeat = lastHeartbeat;
            oldestHeartbeatStorage = storage;
          }
          // 进行可用空间的对比
          if (minSpace > free) {
            minSpace = free;
            minSpaceStorage = storage;
          }

    然后进行选择,优先选择心跳时间最老的

        final DatanodeStorageInfo storage;
        if (oldestHeartbeatStorage != null) {
          storage = oldestHeartbeatStorage;
        } else if (minSpaceStorage != null) {
          storage = minSpaceStorage;
        } else {
          return null;
        }

    然后进行下面2个操作

          // 重新进行rackMap对象关系的调整
          // adjust rackmap, moreThanOne, and exactlyOne
          replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
              exactlyOne, cur);
          // 将选出的节点从候选节点列表中移除
          nonExcess.remove(cur);

    可以说,到了这里,多余副本块所在节点就被选出了.

    多余副本块的处理


    多余副本块的处理就显得很简单了,反正目标对象以及所在节点已经找到了,加入到相应的对象中即可.

          // 加入到excessReplicateMap对象中
          addToExcessReplicate(cur.getDatanodeDescriptor(), b);
    
          //
          // The 'excessblocks' tracks blocks until we get confirmation
          // that the datanode has deleted them; the only way we remove them
          // is when we get a "removeBlock" message.  
          //
          // The 'invalidate' list is used to inform the datanode the block 
          // should be deleted.  Items are removed from the invalidate list
          // upon giving instructions to the namenode.
          //
          // 将此节点上的b block加入到无效节点中
          addToInvalidates(b, cur.getDatanodeDescriptor());

    加入到invalidates无效block列表后不久,此block就将被清除.

    多余副本块清除的场景调用


    重新回到之前提到过的多余副本块的3大场景调用.有人可能会好奇我是怎么找到这3种使用场景的,通过查看chooseExcessReplicates哪里被调用就可以了,如下图所示
    这里写图片描述
    针对上述的5种调用情况,于是我总结了3种使用场景.下面一一进行对照.

    场景1: ReCommission重新上线过程


    在方法processOverReplicatedBlocksOnReCommission中调用了清除过量副本块的方法

      /**
       * Stop decommissioning the specified datanode. 
       * @param node
       */
      @VisibleForTesting
      public void stopDecommission(DatanodeDescriptor node) {
        if (node.isDecommissionInProgress() || node.isDecommissioned()) {
          // Update DN stats maintained by HeartbeatManager
          hbManager.stopDecommission(node);
          // Over-replicated blocks will be detected and processed when
          // the dead node comes back and send in its full block report.
          if (node.isAlive()) {
            blockManager.processOverReplicatedBlocksOnReCommission(node);
          }
          // Remove from tracking in DecommissionManager
          pendingNodes.remove(node);
          decomNodeBlocks.remove(node);
        } else {
          LOG.trace("stopDecommission: Node {} in {}, nothing to do." +
              node, node.getAdminState());
        }
      }

    下线操作重新恢复,必然要停止正在下线的动作,所以会在这个方法中进行调用.

    场景2: SetReplication人为设置副本数


    人为设置副本数是一个主动因素,调用的直接方法如下:

      /** Set replication for the blocks. */
      public void setReplication(final short oldRepl, final short newRepl,
          final String src, final Block... blocks) {
        if (newRepl == oldRepl) {
          return;
        }
    
        // update needReplication priority queues
        for(Block b : blocks) {
          updateNeededReplications(b, 0, newRepl-oldRepl);
        }
        // 当设置的新的副本数值比原有的小的时候,需要进行过量副本的清除操作
        if (oldRepl > newRepl) {
          // old replication > the new one; need to remove copies
          LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
              + " for " + src);
          for(Block b : blocks) {
            processOverReplicatedBlock(b, newRepl, null, null);
          }
        } else { // replication factor is increased
          LOG.info("Increasing replication from " + oldRepl + " to " + newRepl
              + " for " + src);
        }
      }

    这个API方法是可以被外面的Client端程序调用触发的.

    场景3: 丢失新添加的block记录信息


    丢失新添加的block信息导致集群中存在多余的副本.官方的解释是这样的:

      /*
       * Since the BlocksMapGset does not throw the ConcurrentModificationException
       * and supports further iteration after modification to list, there is a
       * chance of missing the newly added block while iterating. Since every
       * addition to blocksMap will check for mis-replication, missing mis-replication
       * check for new blocks will not be a problem.
       */

    因为存在丢失block信息的可能性,所以会开单独的线程去重新检测是否存在过量副本的现象.

      private void processMisReplicatesAsync() throws InterruptedException {
        ...
        while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
          int processed = 0;
          namesystem.writeLockInterruptibly();
          try {
            while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
              BlockInfoContiguous block = blocksItr.next();
              MisReplicationResult res = processMisReplicatedBlock(block);
              ...

    场景4: 其他场景的检测


    其他场景有的时候也会调用到processOverReplicatedBlock的方法,但不是外界的因素导致,而是出于一种谨慎性的考虑,比如在addStoredBlock,当新添加的块被加入到blockMap中时,会再次进行块的检测.还有1种是在文件最终写入完成的时候,也会调用一次checkReplication此方法,来确认集群中没有多余的相同块的情况.这2种情况的调用如上图所示,这里就不放出具体的代码了,可见,HDFS的设计者在细节方面的处理真的是很用心啊.

  • 相关阅读:
    A1051 Pop Sequence (25 分)
    A1060 Are They Equal (25分)
    A1063 Set Similarity (25分)
    A1037 Magic Coupon (25分)
    Mybatis获取插入记录的自增长ID
    压力测试工具ab的使用
    spring注解
    《spring技术内幕》读书笔记(1)——什么是POJO模式
    用HttpSessionListener统计在线用户或做账号在线人数管理
    (转)注释驱动的 Spring cache 缓存介绍
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183784.html
Copyright © 2011-2022 走看看