zoukankan      html  css  js  c++  java
  • HDFS的块Topology位置重分布

    前言


    最近一段时间笔者在工作过程中遇到了HDFS块Topology不准确导致的一系列潜在问题,需要重新更新DataNode的Topology位置,使其上的块数据保持有高可用性。以下是笔者对此想到的一些操作方案和需要注意的点,或许对同领域的小伙伴有所帮助。

    DataNode/块 Topology不准确问题


    这里我们说的Topology位置并不是指的是物理位置的概念,而是逻辑上的。意思是说在逻辑空间上,我们将各个DataNode进行位置划分。在HDFS的Topology中,它是一树型结构,Switch和Router是其中的父亲节点,叶子节点代表的是实际的DN节点。

    在HDFS中,Topology的一个关键作用是用来保证数据的rack awareness属性的。通俗地理解,就是数据副本需要分布在不同的rack下,以此防止同一个rack崩溃导致数据不可用的情况。因此从这里,我们可以得出下面一个Topology位置和物理真实位置的一个关系:

    不同逻辑rack位置对应的物理rack位置必然是物理隔离的,不同物理位置rack的节点它们的逻辑rack可能是相同的。

    本小节标题所述的Topology不准确问题指的就是逻辑rack位置对应实际物理rack位置不完全隔离的问题。这种情况往往发生在集群管理员在初始设定集群Topology位置划分时没有考虑完全。当然,最简单的方案就是完全按照真实物理rack划分,逻辑rack名就按照物理rack名来。

    数据Topology位置不准确不仅仅会带来数据丢失的可能性,还有其它方面的影响。比如当我们利用逻辑位置来减少机房跨楼层间的带宽使用,我们只保证3副本中的1个副本在不同的楼层,2个在同个楼层。如果此时按照楼层所划分的大location位置不准确的话,就可能引起大量的楼层间的数据传输。

    因此笔者这里想重点强调的是,数据的placement在实际的运用场景中是十分重要的。

    HDFS的块位置重分布


    当我们发现自己的HDFS集群数据的Topology位置违背了rack awareness的原则时,我们需要尽快的纠正它,以免其在后续引发一系列的问题。

    这里笔者简单聊聊在HDFS中,有哪些方式可以让block块按照新的Topology规则进行分布。

    依赖现成Balancer工具做块迁移

    首先是第一种方法,利用现有Balancer移动块的逻辑进行块的重分布。HDFS Balancer工具在除了用于正常的块数据平衡之外,它也有一定的数据打散重分布的作用。在数据重新move的过程中 ,Balancer会根据当前最新的Topology位置来进行placement policy的满足。于是乎,旧的Topology位置块就能够被挪至新Topology下了。

    但是过度依赖于Balancer的块迁移动作往往覆盖不全所有的块,因为Balancer受限制于节点的存储使用空间进行源,目标块的选择。换句话说,如果一些节点存储使用空间都在平均值阈值附近,它上面的块就会很少得到被移动的机会了。

    NameNode的failover行为触发块的重分布


    HDFS在每次切换Active服务时,会做一次初始全量块的检查,里面检查的步骤包含以下几点:

    • 块副本数是否达到预期值,就是我们所说的mis-replica的block。如果replica少了,则发送一次replication请求。
    • 块副本数满足条件之后,再进行block placement的检查,是否满足rack awareness,意为是否副本分布于多个rack。如果全部位于一个rack下,则需要进行一次重分布。这里的重分布行为和Balancer就不一样了,它是通过发起一次replica replication请求,然后由系统再进行over replica清理老的replica来达到副本位置的更新的。

    相关代码如下,首先是切换Active时的replica检查方法:

      /**
       * Start services required in active state
       * @throws IOException
       */
      void startActiveServices() throws IOException {
        startingActiveService = true;
        LOG.info("Starting services required for active state");
        writeLock();
        ...
            blockManager.setPostponeBlocksFromFuture(false);
            blockManager.getDatanodeManager().markAllDatanodesStale();
            blockManager.clearQueues();
            blockManager.processAllPendingDNMessages();
    
            // Only need to re-process the queue, If not in SafeMode.
            if (!isInSafeMode()) {
              LOG.info("Reprocessing replication and invalidation queues");
              // BlockManager初始化副本队列
              blockManager.initializeReplQueues();
            }
    	...
    }
    

    这里进入BlockManager的initializeReplQueues方法:

      /**
       * Initialize replication queues.
       */
      public void initializeReplQueues() {
        LOG.info("initializing replication queues");
        processMisReplicatedBlocks();
        initializedReplQueues = true;
      }
    

    可以看到,上面processMisReplicatedBlocks方法就是做miss副本块的检查的,此方法最终会进入到processMisReplicatedBlock方法:

      /**
       * A block needs reconstruction if the number of redundancies is less than
       * expected or if it does not have enough racks.
       */
      boolean isNeededReconstruction(BlockInfo storedBlock,
          NumberReplicas numberReplicas, int pending) {
        return storedBlock.isComplete() &&
            !hasEnoughEffectiveReplicas(storedBlock, numberReplicas, pending);
      }
    
    ...
    
       // Check if the number of live + pending replicas satisfies
      // the expected redundancy.
      boolean hasEnoughEffectiveReplicas(BlockInfo block,
          NumberReplicas numReplicas, int pendingReplicaNum) {
        int required = getExpectedLiveRedundancyNum(block, numReplicas);
        int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
        return (numEffectiveReplicas >= required) &&
            (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
      }
    

    当BlockManager检测出当前块placement不正确的时候,会返回一个UNDER_REPLICATE的处理结果同时将此block加入待复制块列表中。

    ...
        // add to low redundancy queue if need to be
        if (isNeededReconstruction(block, num)) {
          if (neededReconstruction.add(block, numCurrentReplica,
              num.readOnlyReplicas(), num.outOfServiceReplicas(),
              expectedRedundancy)) {
            return MisReplicationResult.UNDER_REPLICATED;
          }
        }
    ...
    

    因为BlockManager在此过程中是对全量块做检查的,为了避免同时触发大量块复制所引发的性能问题,这里是将此过程拆分为小的iteration,每次iteration还会有睡眠间隔时间。

    同时为了避免block副本队列初始化过程实际太长,此过程已改为异步进行,不会阻后续服务的初始化。综上所述,通过failover行为方式触发的block块重分布是一个可行且高效的办法,但是需要我们控制好相关速率参数,相关参数如下:

    <property>
      <name>dfs.block.misreplication.processing.limit</name>
      <value>10000</value>
      <description>
        Maximum number of blocks to process for initializing replication queues.
      </description>
    </property>
    

    局部数据文件块位置迁移


    上面小节介绍的是集群全量块级别的块Topology位置迁移,但在有的使用场景中,我们可能只需要对极个别文件目录或是具体一些DN节点上的数据做块检查迁移,以此缩短Topology变更带来的影响。

    这个功能是在前段时间不久被合入的,HDFS-14053: Provide ability for NN to re-replicate based on topology changes.

    这个JIRA的的本质核心还是利用了上文讲述的BlockManager的process miss replica的方法。不过此时它传入的不是全量块,而是收集了指定目录下的块列表,作为参数传入。此功能命令被集成进了fsck命令来方便用户的使用。

    核心patch改动如下:

       }
    @@ -683,6 +694,7 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file,
         StringBuilder report = new StringBuilder();
         int blockNumber = 0;
         final LocatedBlock lastBlock = blocks.getLastLocatedBlock();
    +    List<BlockInfo> misReplicatedBlocks = new LinkedList<>();
         for (LocatedBlock lBlk : blocks.getLocatedBlocks()) {
           ExtendedBlock block = lBlk.getBlock();
           if (!blocks.isLastBlockComplete() && lastBlock != null &&
    @@ -791,6 +803,9 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file,
             }
             out.println(" Replica placement policy is violated for " +
                         block + ". " + blockPlacementStatus.getErrorDescription());
    +        if (doReplicate) {
    + 		   //如果需要replica重新replication,则收集fsck扫描的块结果
    +          misReplicatedBlocks.add(storedBlock);
    +        }
           }
     
           // count storage summary
    @@ -888,6 +903,19 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file,
             out.print(report + "
    ");
           }
         }
    +
    +    if (doReplicate && !misReplicatedBlocks.isEmpty()) {
    +      // 将收集好的列表传入BlockManager的processMisReplicatedBlocks的方法
    +      int processedBlocks = this.blockManager.processMisReplicatedBlocks(
    +              misReplicatedBlocks);
    +      if (processedBlocks < misReplicatedBlocks.size()) {
    +        LOG.warn("Fsck: Block manager is able to process only " +
    +                processedBlocks +
    +                " mis-replicated blocks (Total count : " +
    +                misReplicatedBlocks.size() +
    +                " ) for path " + path);
    +      }
    +      res.numBlocksQueuedForReplication += processedBlocks;
    +    }
       }
    

    如果是按照单台DN来的话,相似的道理,我们需要去收集目标DN上的所有块数据,g感兴趣尝试的同学可以参考BlockManager现有的移除节点的方法,下面的toRemove就是DN上的当前块汇总。

      /** Remove the blocks associated to the given datanode. */
      void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
        providedStorageMap.removeDatanode(node);
        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
          final Iterator<BlockInfo> it = storage.getBlockIterator();
          //add the BlockInfos to a new collection as the
          //returned iterator is not modifiable.
          Collection<BlockInfo> toRemove = new ArrayList<>();
          while (it.hasNext()) {
            toRemove.add(it.next());
          }
    
          for (BlockInfo b : toRemove) {
            removeStoredBlock(b, node);
          }
        }
        // Remove all pending DN messages referencing this DN.
        pendingDNMessages.removeAllMessagesForDatanode(node);
    
        node.resetBlocks();
        invalidateBlocks.remove(node);
      }
    

    以上就是笔者今天要阐述的所有内容了,希望对大家有所帮助。

    引言


    [1].https://issues.apache.org/jira/browse/HDFS-14053: Provide ability for NN to re-replicate based on topology changes

  • 相关阅读:
    php 调试
    php 格式
    php 函数 将数组转换成标量变量:extract()
    jQuery 方法
    php echo字符串的连接格式
    wampserver php 设置时间
    TableView使用CATransform3D特效动画
    苹果手机制作gif图片
    全局修改Lable/Button字体
    关于 presentViewController 时机
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183532.html
Copyright © 2011-2022 走看看