zoukankan      html  css  js  c++  java
  • HDFS源码分析之UnderReplicatedBlocks(二)

     UnderReplicatedBlocks还提供了一个数据块迭代器BlockIterator,用于遍历其中的数据块。它是UnderReplicatedBlocks的内部类,有三个成员变量,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 当前迭代级别  
    2.    private int level;  
    3.      
    4.    // 标志位:是否为特定复制优先级的迭代器  
    5.    private boolean isIteratorForLevel = false;  
    6.      
    7.    // 数据块Block迭代器Iterator列表,存储各级别数据块迭代器  
    8.    private final List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();  

            其中,level代表了迭代器当前处于的迭代级别,表示正在哪个块复制级别迭代数据块;isIteratorForLevel是一个标志位,是否为特定复制优先级的迭代器的标志位,也就意味着只在特定级别进行迭代;而iterators则是一个数据块Block迭代器Iterator列表,由前往后、由高到低的存储各级别数据块迭代器。
            BlockIterator提供了两个构造函数,一个是无参构造函数:生成所有级别的数据块迭代器,另外一个是有参构造函数:生成指定级别的数据块迭代器,代码分别如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * Construct an iterator over all queues. 
    3.  * 无参构造函数:生成所有级别的数据块迭代器 
    4.  */  
    5. private BlockIterator() {  
    6.   // 当前迭代级别level设置为0  
    7.   level=0;  
    8.   // iterators中添加全部级别的数据块迭代器  
    9.   for(int i=0; i<LEVEL; i++) {  
    10.     iterators.add(priorityQueues.get(i).iterator());  
    11.   }  
    12. }  
    13.   
    14. /** 
    15.  * Constrict an iterator for a single queue level 
    16.  * 有参构造函数:生成指定级别的数据块迭代器 
    17.  * @param l the priority level to iterate over 
    18.  */  
    19. private BlockIterator(int l) {  
    20.   // 当前迭代级别level设置为指定级别  
    21.   level = l;  
    22.   // 标志位:是否为特定复制优先级的迭代器设置为true  
    23.   isIteratorForLevel = true;  
    24.   // iterators中添加指定级别的数据块迭代器  
    25.   iterators.add(priorityQueues.get(level).iterator());  
    26. }  

            注释很清晰,读者可自行阅读。另外,数据块是根据复制级别由高到低的顺序迭代的,当某一级别数据块迭代完毕,那么我们需要更新当前迭代级别,此时update()方法就完成这一个工作,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 如果需要,更新当前迭代级别(由高往低迭代)  
    2. private void update() {  
    3.       
    4.   // 标志位:是否为特定复制优先级的迭代器,为true的话,直接返回  
    5.   if (isIteratorForLevel) {  
    6.     return;  
    7.   }  
    8.     
    9.   // 当前迭代级别小于LEVEL-1(也就是还没到最低级别),并且当前迭代级别已遍历所有数据块  
    10.   while(level< LEVEL-1 && !iterators.get(level).hasNext()) {  
    11.     // 当前迭代级别level加1  
    12.     level++;  
    13.   }  
    14. }  

            它会先判断标志位isIteratorForLevel:是否为特定复制优先级的迭代器,为true的话,直接返回;否则当当前迭代级别小于LEVEL-1(也就是还没到最低级别),并且当前迭代级别已遍历所有数据块,当前迭代级别level加1。

            既然是一个迭代器,那么我们看下它最重要的两个方法,hasNext()和next(),分别如下:

            hasNext():判断是否还存在未迭代元素

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 判断是否还有下一个元素  
    2. @Override  
    3. public boolean hasNext() {  
    4.       
    5.   // 标志位:是否为特定复制优先级的迭代器,为true的话,直接返回iterators列表中第一个迭代器的判断下一个元素的结果  
    6.   if (isIteratorForLevel) {  
    7.     return iterators.get(0).hasNext();  
    8.   }  
    9.     
    10.   // 如果需要,更新当前迭代级别(由高往低迭代)  
    11.   update();  
    12.     
    13.   // 取当前级别的迭代器的判断是否存在下一个元素的结果  
    14.   return iterators.get(level).hasNext();  
    15. }  

            hasNext()方法,也是先判断标志位isIteratorForLevel:是否为特定复制优先级的迭代器,为true的话,直接返回iterators列表中第一个迭代器的判断下一个元素的结果,否则,如果需要,调用update()方法更新当前迭代级别(由高往低迭代),取当前级别的迭代器的判断是否存在下一个元素的结果。

            next()方法:跌倒下一个元素

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 取下一个元素  
    2. @Override  
    3. public Block next() {  
    4.       
    5.   // 标志位:是否为特定复制优先级的迭代器,为true的话,直接返回iterators列表中第一个迭代器的下一个元素  
    6.   if (isIteratorForLevel) {  
    7.     return iterators.get(0).next();  
    8.   }  
    9.     
    10.   // 如果需要,更新当前迭代级别(由高往低迭代)  
    11.   update();  
    12.     
    13.   // 取当前级别的迭代器的下一个元素  
    14.   return iterators.get(level).next();  
    15. }  

            处理逻辑与hasNext()方法一致,不再赘述。

            BlockIterator还提供了从迭代器中移除元素remove()方法及获取当前迭代级别的getPriority()方法,代码分别如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 移除  
    2. @Override  
    3. public void remove() {  
    4.       
    5.   // 标志位:是否为特定复制优先级的迭代器,为true的话,直接返回iterators列表中第一个迭代器的移除结果  
    6.   if (isIteratorForLevel) {  
    7.     iterators.get(0).remove();  
    8.   } else {  
    9.     // 取当前级别的迭代器的移除的结果  
    10.     iterators.get(level).remove();  
    11.   }  
    12. }  
    13.   
    14. // 获取当前迭代级别  
    15. int getPriority() {  
    16.   return level;  
    17. }  

            UnderReplicatedBlocks还提供了按照优先级由高到低的顺序,获取指定数目的待复制数据块的chooseUnderReplicatedBlocks()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  /** 
    2.   * Get a list of block lists to be replicated. The index of block lists 
    3.   * represents its replication priority. Replication index will be tracked for 
    4.   * each priority list separately in priorityToReplIdx map. Iterates through 
    5.   * all priority lists and find the elements after replication index. Once the 
    6.   * last priority lists reaches to end, all replication indexes will be set to 
    7.   * 0 and start from 1st priority list to fulfill the blockToProces count. 
    8.   *  
    9.   * @param blocksToProcess - number of blocks to fetch from underReplicated blocks. 
    10.   * @return Return a list of block lists to be replicated. The block list index 
    11.   *         represents its replication priority. 
    12.   */  
    13.  public synchronized List<List<Block>> chooseUnderReplicatedBlocks(  
    14.      int blocksToProcess) {  
    15.      
    16. // initialize data structure for the return value  
    17. // 初始化做为返回值的数据结构,LEVEL大小的一个块列表的列表  
    18.    List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(LEVEL);  
    19.     
    20.    // 每种优先级都添加一个空的数据块ArrayList列表  
    21.    for (int i = 0; i < LEVEL; i++) {  
    22.      blocksToReplicate.add(new ArrayList<Block>());  
    23.    }  
    24.   
    25.    // 如果不存在需要复制的数据块,直接返回空的列表blocksToReplicate  
    26.    if (size() == 0) { // There are no blocks to collect.  
    27.      return blocksToReplicate;  
    28.    }  
    29.      
    30.    int blockCount = 0;  
    31.      
    32.    // 按照优先级从高到低处理  
    33.    for (int priority = 0; priority < LEVEL; priority++) {   
    34.        
    35.      // Go through all blocks that need replications with current priority.  
    36.      // 构造指定级别priority的数据块迭代器BlockIterator实例neededReplicationsIterator  
    37.      BlockIterator neededReplicationsIterator = iterator(priority);  
    38.        
    39.      // 根据指定级别priority从priorityToReplIdx中取之前已经处理到的位置索引replIndex  
    40.      Integer replIndex = priorityToReplIdx.get(priority);  
    41.        
    42.      // skip to the first unprocessed block, which is at replIndex  
    43.      // 利用replIndex,数据块迭代器跳过之前已处理的数据块,指向下一个该处理的正确位置  
    44.      for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {  
    45.        neededReplicationsIterator.next();  
    46.      }  
    47.   
    48.      // blocksToProcess的值不能超过所有待复制数据块总数  
    49.      blocksToProcess = Math.min(blocksToProcess, size());  
    50.        
    51.      // 如果已获取到足够的数据块,即blockCount等于blocksToProcess,直接跳出循环  
    52.      if (blockCount == blocksToProcess) {  
    53.        break;  // break if already expected blocks are obtained  
    54.      }  
    55.        
    56.      // Loop through all remaining blocks in the list.  
    57.      // 通过迭代器neededReplicationsIterator迭代数据块,添加入返回集合blocksToReplicate中,  
    58.      // 并累加索引位置replIndex  
    59.        
    60.      // 判断条件为当前已取数据块blockCount还未达到要求的blocksToProcess,  
    61.      // 同时数据块迭代器neededReplicationsIterator还有下一个元素  
    62.      while (blockCount < blocksToProcess  
    63.          && neededReplicationsIterator.hasNext()) {  
    64.          
    65.     // 通过数据块迭代器neededReplicationsIterator取下一个数据块  
    66.     Block block = neededReplicationsIterator.next();  
    67.       
    68.     // 将数据块添加到优先级priority对应的列表  
    69.        blocksToReplicate.get(priority).add(block);  
    70.          
    71.        // 累加索引位置replIndex  
    72.        replIndex++;  
    73.          
    74.        // 累加已获取数据块数目blockCount  
    75.        blockCount++;  
    76.      }  
    77.        
    78.      // 如果迭代器中已没有元素,且已处理到最高级别,重置位置索引priorityToReplIdx为0,跳出循环  
    79.      if (!neededReplicationsIterator.hasNext()  
    80.          && neededReplicationsIterator.getPriority() == LEVEL - 1) {  
    81.        // reset all priorities replication index to 0 because there is no  
    82.        // recently added blocks in any list.  
    83.        for (int i = 0; i < LEVEL; i++) {  
    84.          priorityToReplIdx.put(i, 0);  
    85.        }  
    86.        break;  
    87.      }  
    88.        
    89.      // 记录索引位置replIndex  
    90.      priorityToReplIdx.put(priority, replIndex);   
    91.    }  
    92.      
    93.    // 返回获取到的数据块列表  
    94.    return blocksToReplicate;  
    95.  }  

            其处理逻辑大体如下:

            1、初始化做为返回值的数据结构,LEVEL大小的一个块列表的列表blocksToReplicate;

            2、每种优先级都添加一个空的数据块ArrayList列表;

            3、如果不存在需要复制的数据块,直接返回空的列表blocksToReplicate;

            4、按照优先级从高到低处理:

                  4.1、构造指定级别priority的数据块迭代器BlockIterator实例neededReplicationsIterator;

                  4.2、根据指定级别priority从priorityToReplIdx中取之前已经处理到的位置索引replIndex;

                  4.3、利用replIndex,数据块迭代器跳过之前已处理的数据块,指向下一个该处理的正确位置;

                  4.4、blocksToProcess的值不能超过所有待复制数据块总数;

                  4.5、如果已获取到足够的数据块,即blockCount等于blocksToProcess,直接跳出循环;

                  4.6、通过迭代器neededReplicationsIterator迭代数据块,添加入返回集合blocksToReplicate中,并累加索引位置replIndex,判断条件为当前已取数据块blockCount还未达到要求的blocksToProcess,同时数据块迭代器neededReplicationsIterator还有下一个元素:

                           4.6.1、通过数据块迭代器neededReplicationsIterator取下一个数据块;

                           4.6.2、将数据块添加到优先级priority对应的列表;

                           4.6.3、累加索引位置replIndex;

                           4.6.4、累加已获取数据块数目blockCount;

                  4.7、如果迭代器中已没有元素,且已处理到最高级别,重置位置索引priorityToReplIdx为0,跳出循环;

                  4.8、priorityToReplIdx中记录优先级priority对应索引位置replIndex;

            5、返回获取到的数据块列表。

            未完待续,敬请期待《HDFS源码分析之UnderReplicatedBlocks(二)》

  • 相关阅读:
    hashlib加密算法
    gc 模块常用函数
    functools函数中的partial函数及wraps函数
    ctime使用及datetime简单使用
    __new__方法理解
    __getattribute__小例子
    == 和 is 的区别
    线程_可能发生的问题
    线程_进程池
    【网站】 简单通用微信QQ跳转浏览器打开代码
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5556275.html
Copyright © 2011-2022 走看看