zoukankan      html  css  js  c++  java
  • NameNode处理上报block块逻辑分析

    前言

    在hadoop集群中,一个datanode执行启动操作后,会在namenode中进行节点的注册,然后namenode会与这个新注册的datanode通过心跳的形式,进行信息的传输,一方面datanode将会汇报自身的block块的情况,另一方面然后namenode接受到这些块后,进行一段分析,然后返回datanode相应的反馈命令.同时这个操作也用来判断,节点是否已经是dead状态了.但是这个过程只是宏观层面的一个过程描述,了解这点背景知识其实远远不够,一旦HDFS中出现了block块异常的情况,比如突然在某个时间点underReplicated blocks突然变多了,或者说pengdingDeleted blocks变多了,这个时候该怎么办,你需要了解这些块是如何被添加到这些操作对应的block列表里的,只有了解了hdfs中这些细节的处理,你才能够有根据的发现原因.本篇博文给大家分享的是namenode对与datanode上报块的处理过程,里面的很多东西还是非常有必要留意的.


    ProcessReport的block块处理5大分支

    在前言中提到,dn的block块上报是在心跳的过程中进行的,同样在上一篇我的文章中Hadoop中止下线操作后大量剩余复制块的解决方案也略微提过,在下面这段代码中执行的:

    /**
       * Main loop for each BP thread. Run until shutdown,
       * forever calling remote NameNode functions.
       */
      private void offerService() throws Exception {
        ...
        //
        // Now loop for a long time....
        //
        while (shouldRun()) {
          try {
            ...
    
            List<DatanodeCommand> cmds = blockReport();
            processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
            ...
    在blockReport块的调用过程中,最后会调用到最核心的BlockManager#processReport方法.这个方法中的注释很好的概况了这个方法所做的事情:

    private Collection<Block> processReport(
          final DatanodeStorageInfo storageInfo,
          final BlockListAsLongs report) throws IOException {
        // Normal case:
        // Modify the (block-->datanode) map, according to the difference
        // between the old and new block report.
        //
        ...
      }
    意思就是说根据新汇报上来的block块报告,进行适当的对应关系的修改.假设你没有看过这部分的代码,可能马上会联想到的上报的block的类型就2种,一个新添加的块addedBlock,另一个就是需要删除的块,removedBlock或deletedBlock.显然,hdfs在设计的时候不会这么简单,在processReport的头几行,就告诉了我们到底有多少种类型的block块列表.

    private Collection<Block> processReport(
          final DatanodeStorageInfo storageInfo,
          final BlockListAsLongs report) throws IOException {
        // Normal case:
        // Modify the (block-->datanode) map, according to the difference
        // between the old and new block report.
        //
        // 新添加的块
        Collection<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
        // 待移除的块
        Collection<Block> toRemove = new TreeSet<Block>();
        // 无效的块
        Collection<Block> toInvalidate = new LinkedList<Block>();
        // 损坏的块
        Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
        // 正在复制中的块
        Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
        ...
    在这5大类型的block块中,第一个和最后一个还是比较好理解的(最后一个的toUc是toUnderConstruction的缩写),而中间3个则会让人有点混淆的感觉,toRemove,toInvalidate和toCorrput不都表示这些块是待删除的意思吗,这个我会在后面一一进行分析,这里我做了一张分类图:



    ToAdd: 新添加的块

    新添加的块,指的是那些新的replicated的block块,而replicated的block块的特征是他的ReplicateState的状态是FINALIZED的.这些块是那些在过去的心跳时间间隔内,完成block块的写操作,并执行完finalized确认动作的block块.但是这里会有一个问题,datanode在上报replicatedBlock块的时候,是不区分新老block的,只要是存在于节点上并且完成的块,都上报.所以这部分的比较就自然的被移到了namenode这边处理,而比较的方法就是通过新旧的report报告.我们需要进入processReport接下来的内部处理逻辑中:

    private Collection<Block> processReport(
          final DatanodeStorageInfo storageInfo,
          final BlockListAsLongs report) throws IOException {
        // Normal case:
        // Modify the (block-->datanode) map, according to the difference
        // between the old and new block report.
        //
        Collection<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
        Collection<Block> toRemove = new TreeSet<Block>();
        Collection<Block> toInvalidate = new LinkedList<Block>();
        Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
        Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
        reportDiff(storageInfo, report,
            toAdd, toRemove, toInvalidate, toCorrupt, toUC);
        ...
    reportDiff方法,有点像git diff命令,"找不同"的意思,在这个方法中,这些列表都以参数的方法传入,reportDiff方法结束后,这些变量都将会被赋值.进入reportDiff方法:

      private void reportDiff(DatanodeStorageInfo storageInfo, 
          BlockListAsLongs newReport, 
          Collection<BlockInfoContiguous> toAdd,              // add to DatanodeDescriptor
          Collection<Block> toRemove,           // remove from DatanodeDescriptor
          Collection<Block> toInvalidate,       // should be removed from DN
          Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
          Collection<StatefulBlockInfo> toUC) { // add to under-construction list
        ...
        // scan the report and process newly reported blocks
        for (BlockReportReplica iblk : newReport) {
          ReplicaState iState = iblk.getState();
          BlockInfoContiguous storedBlock = processReportedBlock(storageInfo,
              iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
    
          // move block to the head of the list
          if (storedBlock != null &&
              (curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) {
            headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
          }
        }
        ...
    这上面这个for循环中,会进行新block块的扫描,processReportedBlock中的新的block块添加可以用一张流程图的方式进行直观的展现:


    最后会到达添加toAdd块的逻辑,如果在上述图中的某个判断框中满足条件了,将会提前return,使之无法成为新的block块.绝大多数块会在第一个分支中被处理并进行return操作.

    if (shouldPostponeBlocksFromFuture &&
            namesystem.isGenStampInFuture(block)) {
          queueReportedBlock(storageInfo, block, reportedState,
              QUEUE_REASON_FUTURE_GENSTAMP);
          return null;
        }
    经过这个方法处理之后,toAdd列表中就有了许多的新block块,然后方法返回reportDiff所在的processReport方法:

    private Collection<Block> processReport(
          final DatanodeStorageInfo storageInfo,
          final BlockListAsLongs report) throws IOException {
        // Normal case:
        // Modify the (block-->datanode) map, according to the difference
        // between the old and new block report.
        //
        Collection<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
        Collection<Block> toRemove = new TreeSet<Block>();
        Collection<Block> toInvalidate = new LinkedList<Block>();
        Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
        Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
        reportDiff(storageInfo, report,
            toAdd, toRemove, toInvalidate, toCorrupt, toUC);
        ...
        for (BlockInfoContiguous b : toAdd) {
          addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
          numBlocksLogged++;
        }    
        ...
      }
    会调用到addStoredBlock方法,这个方法在我的上篇文章中已经提到过,他会做一个很关键的动作,就是重新判断一下needReplications待复制块中是否存在此块,如果有则进行移除,及时更新目前的待复制块列表.

    /**
       * Modify (block-->datanode) map. Remove block from set of
       * needed replications if this takes care of the problem.
       * @return the block that is stored in blockMap.
       */
      private Block addStoredBlock(final BlockInfoContiguous block,
                                   DatanodeStorageInfo storageInfo,
                                   DatanodeDescriptor delNodeHint,
                                   boolean logEveryBlock)
      throws IOException {
        ...
        // handle underReplication/overReplication
        short fileReplication = bc.getBlockReplication();
        if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
          neededReplications.remove(storedBlock, numCurrentReplica,
              num.decommissionedReplicas(), fileReplication);
        }
        ...

    toAdd方面的block块添加的逻辑就是上述所描述的,中间略过了一些操作细节,感兴趣的同学可自行阅读源码进行学习.


    ToRemove: 待移除的块

    在日常生活中,我们可能会觉得,移除的意思差不多就是删除的意思,但是在processReport的块处理方法中,二者并不等同,在移除和删除操作中之间还隔了1步操作.在namenode这边判断1个块是否是待移除的块的标准是他有没有在这一轮被上报上来,如果没有表明datanode已经把这个块删了.方法中的定义是这样的:

    // collect blocks that have not been reported
    // all of them are next to the delimiter
    delimiter的意思是分隔符,这个方法的设计宗旨是让汇报上来的块和未汇报的块分别位于分隔符的2侧.

    // place a delimiter in the list which separates blocks 
    // that have been reported from those that have not

    而整个汇报上来的块和未汇上来的块的处理逻辑用下面一幅图形象的展现:


    几句话简单概况一下这个过程:

    1.首先会将分隔符block块插入到链表头部,这样所有的块默认是未汇报过的.

    2.其次遍历block块,将汇报过的块移到链表头部,间隔地从分隔符的右侧挪到了左侧.

    3.于是最后本轮没有汇报上来的块就全都在分隔符块的右侧了,应该说是next to delimeter.

    代码如下,大家进行对照着看:

    // place a delimiter in the list which separates blocks 
        // that have been reported from those that have not
        BlockInfoContiguous delimiter = new BlockInfoContiguous(new Block(), (short) 1);
        AddBlockResult result = storageInfo.addBlock(delimiter);
        ...
        // scan the report and process newly reported blocks
        for (BlockReportReplica iblk : newReport) {
          ...
          // move block to the head of the list
          if (storedBlock != null &&
              (curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) {
            headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
          }
        }
    
        // collect blocks that have not been reported
        // all of them are next to the delimiter
        Iterator<BlockInfoContiguous> it =
            storageInfo.new BlockIterator(delimiter.getNext(0));
        while(it.hasNext())
          toRemove.add(it.next());
        storageInfo.removeBlock(delimiter);
    经过了这个方法的处理之后,toRemove方法就被赋值了,然后重新跳回到processReport方法,观察toRemove被如何处理的.

    for (Block b : toRemove) {
          removeStoredBlock(b, node);
        }
    进入removeStoredBlock方法:

    /**
       * Modify (block-->datanode) map. Possibly generate replication tasks, if the
       * removed block is still valid.
       */
      public void removeStoredBlock(Block block, DatanodeDescriptor node) {
        blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
          ...
          if (!blocksMap.removeNode(block, node)) {
            blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
                " removed from node {}", block, node);
            return;
          }
          ...
    在这个方法做的一步很重要的操作就是将toRemove中的块从blocksMap映射关系中移除掉.OK,到了这里,终于知道remove的具体含义了,remove指的是blocksMap的remove动作.从blocksMap中移除掉block块之后,还触发到其他的行为,并不是就一个简简单单的remove动作.


    ToInvalidate: 无效的块

    在HDFS中,我觉得无效的块在某种程度上来说是更接近于待删除的块的意思.在processReportedBlock的方法中,toInvalidate无效块最根本的来源是源自blocksMap中不存在的块.

    // find block by blockId
        BlockInfoContiguous storedBlock = blocksMap.getStoredBlock(block);
        if(storedBlock == null) {
          // If blocksMap does not contain reported block id,
          // the replica should be removed from the data-node.
          toInvalidate.add(new Block(block));
          return null;
        }
    所有合法的块的信息都会记录在blocksMap中.所以能让blocksMap对象中不存在某些块的信息的场景无外乎2种:

    1.第一种,就是刚刚toRemove中块信息,使得blocksMap移除了对应的块信息.

    2.第二种,新汇报上来的块信息,datanode自身有这些块信息,而namenode自身的blocksMap中没有,也会被认为是无效块.

    第一种场景好理解,原本就是准备删除的块,就应该移到无效块中.但是这里要特别注意第二种场景,这个场景我们经历过一次,这个场景挺奇怪的,什么情况下datanode有block信息,而namenode又没有呢,答案是namenode用比较早些时间的fsimage做启动操作.因为时间早,fsimage当然不会有后面的元数据信息.所以这个时候如果启动了datanode,简直就是灾难.你会看到大量的块被移到toInvalidate.那我们什么时候会无意间用了早期fsimage启动namenode呢,比如下面这个条件:

    1.namenode是ha模式的.

    2.某台namenode1进程1个月前挂了,此时另外1台namenode2切换到了active状态继续提供服务,但是集群维护者并不知道.

    3.1个月后namenode2需要更改配置或别的原因需要重启集群,因为集群维护者并不知道另外1个namenode1已经挂了1个多月了,意外的先启动了namenode1,而namenode1的fsimage已经落后一个多月了.

    4.此时再次重启所有的datanode,就会发生上述的场景.

    所以这里给集群的运维人员一个建议,对于关键进程需要进行报警监控,其次每次启动集群时要特别留意一下是否出现异常现象,如大量的无效块,大量的删除操作等.在任何情况下,数据的安全性永远是最重要的.下面是简化的流程图场景展示:


    切回到刚刚的话题,加入到toInvalidate块后,toInvalidate中的块会被加入到invalidateBlocks中:

    for (Block b : toInvalidate) {
          addToInvalidates(b, node);
        }
    /**
       * Adds block to list of blocks which will be invalidated on specified
       * datanode and log the operation
       */
      void addToInvalidates(final Block block, final DatanodeInfo datanode) {
        if (!namesystem.isPopulatingReplQueues()) {
          return;
        }
        invalidateBlocks.add(block, datanode, true);
      }

    之后就会触发到删除操作了.这部分的操作将会在ReplicationMonitor监控线程中被调用:

      /**
       * Periodically calls computeReplicationWork().
       */
      private class ReplicationMonitor implements Runnable {
    
        @Override
        public void run() {
          while (namesystem.isRunning()) {
            try {
              // Process replication work only when active NN is out of safe mode.
              if (namesystem.isPopulatingReplQueues()) {
                computeDatanodeWork();
                processPendingReplications();
                rescanPostponedMisreplicatedBlocks();
              }
              Thread.sleep(replicationRecheckInterval);
              ...
    在computerDatanodeWork方法中会进行这些操作,方法的做了很好的解释:

    /**
       * Compute block replication and block invalidation work that can be scheduled
       * on data-nodes. The datanode will be informed of this work at the next
       * heartbeat.
       * 
       * @return number of blocks scheduled for replication or removal.
       */
      int computeDatanodeWork() {
    这些无效块会各自加入到对应的datanode描述符信息中,并会在接下来的操作中通过心跳通知节点执行删除操作:

    int blockCnt = 0;
        for (DatanodeInfo dnInfo : nodes) {
          int blocks = invalidateWorkForOneNode(dnInfo);
          if (blocks > 0) {
            blockCnt += blocks;
            if (--nodesToProcess == 0) {
              break;
            }
          }
        }

    在namenode页面中的pendingDeletionBlock计数其实就是invalidateBlocks的大小


    相关代码如下:

    @Override
      @Metric
      public long getPendingDeletionBlocks() {
        return blockManager.getPendingDeletionBlocksCount();
      }
    /** Used by metrics */
      public long getPendingDeletionBlocksCount() {
        return invalidateBlocks.numBlocks();
      }

    toInvalidate无效块还是非常重要的,知道这些无效块的缘由和去向对我们处理问题来说还是非常有帮助的.


    ToCorrupt: 损坏的块

    损坏的块一般发生于非系统内部的损坏操作导致,如人工的误删除.之后,就会在namenode的50070页面的上方区域显示出来,如下:


    在processReportedBlock的处理逻辑中,对损坏块的判断逻辑主要在checkReplicaCorrupt

    ...
        BlockToMarkCorrupt c = checkReplicaCorrupt(
            block, reportedState, storedBlock, ucState, dn);
        if (c != null) {
          if (shouldPostponeBlocksFromFuture) {
            // If the block is an out-of-date generation stamp or state,
            // but we're the standby, we shouldn't treat it as corrupt,
            // but instead just queue it for later processing.
            // TODO: Pretty confident this should be s/storedBlock/block below,
            // since we should be postponing the info of the reported block, not
            // the stored block. See HDFS-6289 for more context.
            queueReportedBlock(storageInfo, storedBlock, reportedState,
                QUEUE_REASON_CORRUPT_STATE);
          } else {
            toCorrupt.add(c);
          }
          return storedBlock;
        }
        ...
    进入此方法,你会看到判断一个块是否损坏的主要2大因素:

    private BlockToMarkCorrupt checkReplicaCorrupt(
          Block reported, ReplicaState reportedState, 
          BlockInfoContiguous storedBlock, BlockUCState ucState,
          DatanodeDescriptor dn) {
        switch(reportedState) {
        case FINALIZED:
          switch(ucState) {
          case COMPLETE:
          case COMMITTED:
            if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
              final long reportedGS = reported.getGenerationStamp();
              return new BlockToMarkCorrupt(storedBlock, reportedGS,
                  "block is " + ucState + " and reported genstamp " + reportedGS
                  + " does not match genstamp in block map "
                  + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
            } else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
              return new BlockToMarkCorrupt(storedBlock,
                  "block is " + ucState + " and reported length " +
                  reported.getNumBytes() + " does not match " +
                  "length in block map " + storedBlock.getNumBytes(),
                  Reason.SIZE_MISMATCH);
            } else {
              return null; // not corrupt
            }
          ...
    1个是长度大小不匹配,另1个是记录时间不匹配,这些标准值都保存于blocksMap中.判断完是否为损坏的块之后,在processReport中的markBlockAsCorrput中,会将block块加入到corruptReplicas对象中

      private void markBlockAsCorrupt(BlockToMarkCorrupt b,
          DatanodeStorageInfo storageInfo,
          DatanodeDescriptor node) throws IOException {
    
        ...
    
        // Add this replica to corruptReplicas Map
        corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
            b.reasonCode);
        ...

    这个对象中的某些方法信息,会在fsck命令中用到.仔细留意过namenode页面的corrupt信息的同学,肯定会发现,这个损坏块的信息每次打开信息都在,并不会消失,的确是这样的,除非你执行了fsck的-delete或-move操作,可以将损坏的块彻底删除掉了,元信息也将会删除掉.在namenode页面上所表示的损坏的块的个数就是corruptReplicas的大小.

    /** Returns number of blocks with corrupt replicas */
      @Metric({"CorruptBlocks", "Number of blocks with corrupt replicas"})
      public long getCorruptReplicaBlocks() {
        return blockManager.getCorruptReplicaBlocksCount();
      }
    void updateState() {
        ...
        corruptReplicaBlocksCount = corruptReplicas.size();
      }


    ToUc: 正在处理中的块

    Uc就是underConstruction,正在构建的意思,表明此block块正在写动作.判断是否是正在构建中的块的逻辑如下:

    if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
          toUC.add(new StatefulBlockInfo(
              (BlockInfoContiguousUnderConstruction) storedBlock,
              new Block(block), reportedState));
          return storedBlock;
        }
    private boolean isBlockUnderConstruction(BlockInfoContiguous storedBlock,
          BlockUCState ucState, ReplicaState reportedState) {
        switch(reportedState) {
        case FINALIZED:
          switch(ucState) {
          case UNDER_CONSTRUCTION:
          case UNDER_RECOVERY:
            return true;
          default:
            return false;
          }
        case RBW:
        case RWR:
          return (!storedBlock.isComplete());
        case RUR:       // should not be reported                                                                                             
        case TEMPORARY: // should not be reported                                                                                             
        default:
          return false;
        }
      }
    根据传入的ReplicaState进行判断.所有UC的blocks添加完毕之后,进入addStoreBlockUnderConstruction

    // Process the blocks on each queue
        for (StatefulBlockInfo b : toUC) { 
          addStoredBlockUnderConstruction(b, storageInfo);
        }
    void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
          DatanodeStorageInfo storageInfo) throws IOException {
        BlockInfoContiguousUnderConstruction block = ucBlock.storedBlock;
        block.addReplicaIfNotPresent(
            storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
        ...
    在addRelicaIfNotPresent方法最后会加入到relicas列表中

    void addReplicaIfNotPresent(DatanodeStorageInfo storage,
                         Block block,
                         ReplicaState rState) {
        Iterator<ReplicaUnderConstruction> it = replicas.iterator();
        ...
        replicas.add(new ReplicaUnderConstruction(block, storage, rState));
      }
    replicas的定义如下:

    /**
       * Block replicas as assigned when the block was allocated.
       * This defines the pipeline order.
       */
      private List<ReplicaUnderConstruction> replicas;

    toUc的相关块处理比较简单,主要过程就是上面所描述的.


    总结

    以上就是本文所讲的上报block块处理5大分支,每个分支处理都尽量地把与其相关的操作也纳入进来了,本文在解读此方面的代码时,其实也略过了一些细节,如果读者想要了解更加详细的过程,请阅读BlockManager中的processReport以及内部的reportDiff方法.


  • 相关阅读:
    大二(上期)学期末个人学习总结
    《梦断代码》阅读笔记01
    软件工程概论课程评价
    03《构建之法》阅读笔记第三篇(终结篇)
    02《构建之法》阅读笔记第二篇
    个人简评——2345王牌拼音输入法
    《人件集》阅读笔记第一篇
    个人学习进度条
    AcWing ST算法(区间求最值)打卡
    AcWing 101. 最高的牛 (差分) 打卡
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183797.html
Copyright © 2011-2022 走看看