zoukankan      html  css  js  c++  java
  • HDFS源码分析数据块复制选取复制源节点

    数据块的复制当然需要一个源数据节点,从其上拷贝数据块至目标数据节点。那么数据块复制是如何选取复制源节点的呢?本文我们将针对这一问题进行研究。

            在BlockManager中,chooseSourceDatanode()方法就是用来选取数据块复制时的源节点的,它负责解析数据块所属数据节点列表,并选择一个,用它作为数据块的复制源。其核心逻辑如下:

            我们优先选择正处于退役过程中的数据节点而不是其他节点,因为前者没有写数据传输量因此相对不是很繁忙。我们不使用已退役节点作为数据源。否则我们从它们之中随机选择一个数据节点,其复制工作量还没有达到阈值,然而,如果一个复制是最高优先级的复制的话,我们会随机选择一个数据节点,而不管复制阈值的限制。

            chooseSourceDatanode()方法代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.   @VisibleForTesting  
    2.   DatanodeDescriptor chooseSourceDatanode(Block block,  
    3.       List<DatanodeDescriptor> containingNodes,  
    4.       List<DatanodeStorageInfo>  nodesContainingLiveReplicas,  
    5.       NumberReplicas numReplicas,  
    6.       int priority) {  
    7.      
    8. // 清空containingNodes列表  
    9. // 包含指定block的节点列表  
    10.    containingNodes.clear();  
    11.      
    12.    // 清空nodesContainingLiveReplicas列表  
    13.    // 包含指定block活跃副本的节点列表  
    14.    nodesContainingLiveReplicas.clear();  
    15.      
    16.    DatanodeDescriptor srcNode = null;  
    17.    int live = 0;  
    18.    int decommissioned = 0;  
    19.    int corrupt = 0;  
    20.    int excess = 0;  
    21.      
    22.    // 根据Block实例block从corruptReplicas中获取坏块副本所在数据节点集合nodesCorrupt  
    23.    Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);  
    24.      
    25.    // 根据Block实例block从blocksMap中获取其对应的数据节点存储DatanodeStorageInfo实例storage  
    26.    for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {  
    27.        
    28.      // 从数据节点存储DatanodeStorageInfo实例storage中获取数据节点描述信息node  
    29.      final DatanodeDescriptor node = storage.getDatanodeDescriptor();  
    30.        
    31.      // 从excessReplicateMap集合中获取数据块集合excessBlocks,  
    32.      // 这些块对数据节点来说是多余的。我们最终会将这些多余的块删除。  
    33.      LightWeightLinkedSet<Block> excessBlocks =  
    34.        excessReplicateMap.get(node.getDatanodeUuid());  
    35.        
    36.      // 根据数据节点的存储状态确定其是否为可用副本  
    37.      int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;   
    38.        
    39.      // 如果坏块节点集合nodesCorrupt中包含该节点,坏块数corrupt累加  
    40.      if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))  
    41.        corrupt += countableReplica;  
    42.        
    43.      // 如果节点正在退役或者已经退役,退役数decommissioned累加  
    44.      else if (node.isDecommissionInProgress() || node.isDecommissioned())  
    45.        decommissioned += countableReplica;  
    46.        
    47.      // 如果多余数据块集合中包含该数据块,则多余数excess累加  
    48.      else if (excessBlocks != null && excessBlocks.contains(block)) {  
    49.        excess += countableReplica;  
    50.          
    51.      // 其他情况下  
    52.      } else {  
    53.         
    54.     // 将该存储添加到nodesContainingLiveReplicas集合  
    55.        nodesContainingLiveReplicas.add(storage);  
    56.          
    57.        // 累加活跃副本数live  
    58.        live += countableReplica;  
    59.      }  
    60.        
    61.      // 将该节点添加到containingNodes集合  
    62.      containingNodes.add(node);  
    63.        
    64.      // Check if this replica is corrupt  
    65.      // If so, do not select the node as src node  
    66.        
    67.      // 如果为坏块,跳过  
    68.      if ((nodesCorrupt != null) && nodesCorrupt.contains(node))  
    69.        continue;  
    70.        
    71.      // 如果复制级别不是最高级别,且数据节点正在复制的数据块数目大于等于最大复制块数maxReplicationStreams,跳过  
    72.      if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY  
    73.          && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)  
    74.      {  
    75.        continue; // already reached replication limit  
    76.      }  
    77.        
    78.      // 如果数据节点getNumberOfBlocksToBeReplicated大于等于复制块数上线replicationStreamsHardLimit,跳过  
    79.      if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit)  
    80.      {  
    81.        continue;  
    82.      }  
    83.        
    84.      // the block must not be scheduled for removal on srcNode  
    85.      // 如果数据块为多余的数据块,直接跳过  
    86.      if(excessBlocks != null && excessBlocks.contains(block))  
    87.        continue;  
    88.        
    89.      // never use already decommissioned nodes  
    90.      // 如果数据节点为已退役节点,跳过  
    91.      if(node.isDecommissioned())  
    92.        continue;  
    93.      // we prefer nodes that are in DECOMMISSION_INPROGRESS state  
    94.      // 如果数据节点正在退役,且srcNode还未选中,那么选择该数据节点为srcNode,并跳过  
    95.      if(node.isDecommissionInProgress() || srcNode == null) {  
    96.        srcNode = node;  
    97.        continue;  
    98.      }  
    99.        
    100.      // 如果源数据节点srcNode正在退役,则跳过  
    101.      if(srcNode.isDecommissionInProgress())  
    102.        continue;  
    103.      // switch to a different node randomly  
    104.      // this to prevent from deterministically selecting the same node even  
    105.      // if the node failed to replicate the block on previous iterations  
    106.      if(DFSUtil.getRandom().nextBoolean())  
    107.        srcNode = node;  
    108.    }  
    109.      
    110.    // 初始化数据块副本复制统计对象numReplicas  
    111.    if(numReplicas != null)  
    112.      numReplicas.initialize(live, decommissioned, corrupt, excess, 0);  
    113.      
    114.    // 返回srcNode  
    115.    return srcNode;  
    116.  }  

            chooseSourceDatanode()方法的整体逻辑如下:

            1、清空containingNodes列表:containingNodes为包含指定block的节点描述信息DatanodeDescriptor列表;

            2、清空nodesContainingLiveReplicas列表:nodesContainingLiveReplicas为包含指定block活跃副本的节点存储DatanodeStorageInfo列表;

            3、根据Block实例block从corruptReplicas中获取坏块副本所在数据节点集合nodesCorrupt;

            4、根据Block实例block从blocksMap中获取其对应的数据节点存储DatanodeStorageInfo集合,并遍历每一个数据节点存储DatanodeStorageInfo实例storage:

                  4.1、从数据节点存储DatanodeStorageInfo实例storage中获取数据节点描述信息node;

                  4.2、从excessReplicateMap集合中获取数据块集合excessBlocks:这些块对数据节点来说是多余的,我们最终会将这些多余的块删除;

                  4.3、根据数据节点的存储状态确定其是否为可用副本countableReplica;

                  以下为统计数据块副本情况:

                  4.4、如果坏块节点集合nodesCorrupt中包含该节点,坏块数corrupt累加;

                  4.5、如果节点正在退役或者已经退役,退役数decommissioned累加;

                  4.6、如果多余数据块集合中包含该数据块,则多余数excess累加;

                  4.7、其他情况下:

                          4.7.1、将该存储添加到nodesContainingLiveReplicas集合;

                          4.7.2、累加活跃副本数live;

                  4.8、将该节点添加到containingNodes集合;

                  4.9、如果为坏块,跳过;

                  4.10、如果复制级别不是最高级别,且节点正在复制的数据块数目大于等于最大复制块数maxReplicationStreams,跳过;

                  4.11、如果数据节点getNumberOfBlocksToBeReplicated大于等于复制块数上线replicationStreamsHardLimit,跳过;

                  4.12、如果数据块为多余的数据块,直接跳过;

                  4.13、如果数据节点为已退役节点,跳过;

                  4.14、如果数据节点正在退役,且srcNode还未选中,那么选择该数据节点为srcNode,并跳过;

                  4.15、如果源数据节点srcNode正在退役,则跳过;

                  4.16、随机选择源数据节点;

            5、初始化数据块副本复制统计对象numReplicas;

            6、返回块复制源数据节点srcNode。

            其中,有两个阈值需要单独说下,如下:

            1、maxReplicationStreams:一个给定节点除最高优先级复制外复制流的最大数目,取参数dfs.namenode.replication.max-streams,参数未配置默认为2;

            2、replicationStreamsHardLimit:一个给定节点全部优先级复制复制流的最大数目,取参数dfs.namenode.replication.max-streams-hard-limit,参数未配置默认为4。

            

            从上述整理流程中,大致总结如下:

            根据block从blocksMap中取数据块所在数据节点存储实例集合并遍历,统计数据块副本情况,包括损坏副本、多余副本、退役副本、活跃副本等,然后损坏副本、多余副本、退役节点直接跳过,这三种情况不能被选中为复制源数据节点,并且还有两种情况,一是如果复制级别不是最高级别,且数据节点正在复制的数据块数目大于等于最大复制块数maxReplicationStreams,二是如果数据节点正在复制的数据块数目大于等于复制块数上线replicationStreamsHardLimit,这两种情况也直接跳过,不能被选中为复制源数据节点,剩下的,则是随机选择源数据节点,并且其最喜欢选择正在退役的数据节点,这个最喜欢的意思是,选择的方式是随机选择,但是一旦正在退役节点被选中,则源节点不会再做变更,否则还是要通过随机选择来变更的。

  • 相关阅读:
    【转】C#字符串替换_无视大小写
    如何安装inf文件
    IDisposable模式
    VS2005 模板的制作方法
    ASP.NET Client Side State Management
    微软提供正确卸载IE7的方法并恢复IE6!!!!
    Access JavaScript variables on PostBack using ASP.NET Code
    Windows Mobile 5.0 SDK R2 for Pocket PC安装错误解决方案
    Oracle日期函数与应用
    XP下安装IIS6.0的办法
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5556281.html
Copyright © 2011-2022 走看看