zoukankan      html  css  js  c++  java
  • HDFS源码分析数据块复制之PendingReplicationBlocks

    PendingReplicationBlocks实现了所有正在复制的数据块的记账工作。它实现以下三个主要功能:

            1、记录此时正在复制的块;

            2、一种对复制请求进行跟踪的粗粒度计时器;

            3、一个定期识别未执行复制请求的线程。

            我们先看下它内部有哪些成员变量,如下:

    [java] view plain copy
     
    1. // 块和正在进行的块复制信息的映射集合  
    2. private final Map<Block, PendingBlockInfo> pendingReplications;  
    3.   
    4. // 复制请求超时的块列表  
    5. private final ArrayList<Block> timedOutItems;  
    6.   
    7. // 后台工作线程  
    8. Daemon timerThread = null;  
    9.   
    10. // 文件系统是否正在运行的标志位  
    11. private volatile boolean fsRunning = true;  
    12.   
    13. //  
    14. // It might take anywhere between 5 to 10 minutes before  
    15. // a request is timed out.  
    16. // 在一个请求超时之前可能需要5到10分钟  
    17.   
    18. // 请求超时阈值,默认为5分钟  
    19. private long timeout = 5 * 60 * 1000;  
    20.   
    21. // 超时检查固定值:5分钟  
    22. private final static long DEFAULT_RECHECK_INTERVAL = 5 * 60 * 1000;  

            首先是pendingReplications,它是块和正在进行的块复制信息的映射集合,所有正在复制的数据块及其对应复制信息都会被加入到这个集合。数据块复制信息PendingBlockInfo是对数据块开始复制时间timeStamp、待复制的目标数据节点列表List<DatanodeDescriptor>实例targets的一个封装,代码如下:

    [java] view plain copy
     
    1.  /** 
    2.   * An object that contains information about a block that  
    3.   * is being replicated. It records the timestamp when the  
    4.   * system started replicating the most recent copy of this 
    5.   * block. It also records the list of Datanodes where the  
    6.   * replication requests are in progress. 
    7.   *  
    8.   * 正在被复制的块信息。它记录系统开始复制块最新副本的时间,也记录复制请求正在执行的数据节点列表。 
    9.   */  
    10.  static class PendingBlockInfo {  
    11.     
    12. // 时间戳  
    13.    private long timeStamp;  
    14.    // 待复制的目标数据节点列表  
    15.    private final List<DatanodeDescriptor> targets;  
    16.   
    17.    // 构造方法  
    18.    PendingBlockInfo(DatanodeDescriptor[] targets) {  
    19.      // 时间戳赋值为当前时间  
    20.      this.timeStamp = now();  
    21.      this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()  
    22.          : new ArrayList<DatanodeDescriptor>(Arrays.asList(targets));  
    23.    }  
    24.   
    25.    long getTimeStamp() {  
    26.      return timeStamp;  
    27.    }  
    28.   
    29.    // 设置时间戳为当前时间  
    30.    void setTimeStamp() {  
    31.      timeStamp = now();  
    32.    }  
    33.   
    34.    // 增加复制数量,即增加目标数据节点  
    35.    void incrementReplicas(DatanodeDescriptor... newTargets) {  
    36.      if (newTargets != null) {  
    37.        for (DatanodeDescriptor dn : newTargets) {  
    38.          targets.add(dn);  
    39.        }  
    40.      }  
    41.    }  
    42.   
    43.    // 减少复制数量,即减少目标数据节点  
    44.    void decrementReplicas(DatanodeDescriptor dn) {  
    45.      targets.remove(dn);  
    46.    }  
    47.   
    48.    // 获取复制数量,即或许待复制的数据节点数目  
    49.    int getNumReplicas() {  
    50.      return targets.size();  
    51.    }  
    52.  }  

            它的构造方法中,即将时间戳timeStamp赋值为当前时间,并且提供了设置时间戳为当前时间的setTimeStamp()方法。同时提供了增加复制数量、减少复制数量、获取复制数量相关的三个方法,均是对待复制的目标数据节点列表的增加、减少与计数操作,上面注释很清楚,不再详述!

            另外两个比较重要的变量就是复制请求超时的块列表timedOutItems和后台工作线程timerThread。由后台工作线程周期性的检查pendingReplications列表中的待复制数据块,看看其是否超时,如果超时的话,将其加入timedOutItems列表。后台工作线程timerThread的初始化如下:

    [java] view plain copy
     
    1. // 启动块复制监控线程  
    2. void start() {  
    3.   timerThread = new Daemon(new PendingReplicationMonitor());  
    4.   timerThread.start();  
    5. }  

            它实际上是借助PendingReplicationMonitor来完成的。PendingReplicationMonitor实现了Runnable接口,是一个周期性工作的线程,用于浏览从未完成它们复制请求的数据块,这个从未完成实际上就是在规定时间内还未完成的数据块复制信息。PendingReplicationMonitor的实现如下:

    [java] view plain copy
     
    1. /* 
    2.  * A periodic thread that scans for blocks that never finished 
    3.  * their replication request. 
    4.  * 一个周期性线程,用于浏览从未完成它们复制请求的数据块 
    5.  */  
    6. class PendingReplicationMonitor implements Runnable {  
    7.   @Override  
    8.   public void run() {  
    9.       
    10.     // 如果标志位fsRunning为true,即文件系统正常运行,则while循环一直进行  
    11.     while (fsRunning) {  
    12.     // 检查周期:取timeout,最高为5分钟  
    13.       long period = Math.min(DEFAULT_RECHECK_INTERVAL, timeout);  
    14.       try {  
    15.           
    16.         // 检查方法  
    17.         pendingReplicationCheck();  
    18.           
    19.         // 线程休眠period  
    20.         Thread.sleep(period);  
    21.       } catch (InterruptedException ie) {  
    22.         if(LOG.isDebugEnabled()) {  
    23.           LOG.debug("PendingReplicationMonitor thread is interrupted.", ie);  
    24.         }  
    25.       }  
    26.     }  
    27.   }  
    28.   
    29.   /** 
    30.    * Iterate through all items and detect timed-out items 
    31.    * 通过所有项目迭代检测超时项目 
    32.    */  
    33.   void pendingReplicationCheck() {  
    34.       
    35.     // 使用synchronized关键字对pendingReplications进行同步  
    36.     synchronized (pendingReplications) {  
    37.         
    38.     // 获取集合pendingReplications的迭代器  
    39.       Iterator<Map.Entry<Block, PendingBlockInfo>> iter =  
    40.                                   pendingReplications.entrySet().iterator();  
    41.         
    42.       // 记录当前时间now  
    43.       long now = now();  
    44.       if(LOG.isDebugEnabled()) {  
    45.         LOG.debug("PendingReplicationMonitor checking Q");  
    46.       }  
    47.         
    48.       // 遍历pendingReplications集合中的每个元素  
    49.       while (iter.hasNext()) {  
    50.           
    51.         // 取出每个<Block, PendingBlockInfo>条目  
    52.         Map.Entry<Block, PendingBlockInfo> entry = iter.next();  
    53.           
    54.         // 取出Block对应的PendingBlockInfo实例pendingBlock  
    55.         PendingBlockInfo pendingBlock = entry.getValue();  
    56.           
    57.         // 判断pendingBlock自其生成时的timeStamp以来到现在,是否已超过timeout时间  
    58.         if (now > pendingBlock.getTimeStamp() + timeout) {  
    59.             
    60.         // 超过的话,  
    61.         // 取出timeout实例block  
    62.           Block block = entry.getKey();  
    63.             
    64.           // 使用synchronized关键字对timedOutItems进行同步  
    65.           synchronized (timedOutItems) {  
    66.               
    67.             // 将block添加入复制请求超时的块列表timedOutItems  
    68.             timedOutItems.add(block);  
    69.           }  
    70.           LOG.warn("PendingReplicationMonitor timed out " + block);  
    71.             
    72.           // 从迭代器中移除该条目  
    73.           iter.remove();  
    74.         }  
    75.       }  
    76.     }  
    77.   }  
    78. }  

            在它的run()方法内,如果标志位fsRunning为true,即文件系统正常运行,则while循环一直进行,然后在while循环内:

            1、先取检查周期period:取timeout,最高为5分钟;

            2、调用pendingReplicationCheck()方法进行检查;

            3、线程休眠period时间,再次进入while循环。

            pendingReplicationCheck的实现逻辑也很简单,如下:

            使用synchronized关键字对pendingReplications进行同步:

            1、获取集合pendingReplications的迭代器iter;

            2、记录当前时间now;

            3、遍历pendingReplications集合中的每个元素:

                  3.1、取出每个<Block, PendingBlockInfo>条目;

                  3.2、取出Block对应的PendingBlockInfo实例pendingBlock;

                  3.3、判断pendingBlock自其生成时的timeStamp以来到现在,是否已超过timeout时间,超过的话:

                           3.3.1、取出timeout实例block;

                           3.3.2、使用synchronized关键字对timedOutItems进行同步,使用synchronized关键字对timedOutItems进行同步;

                           3.3.3、从迭代器中移除该条目。

            PendingReplicationBlocks还提供了获取复制超时块数组的getTimedOutBlocks()方法,代码如下:

    [java] view plain copy
     
    1. /** 
    2.  * Returns a list of blocks that have timed out their  
    3.  * replication requests. Returns null if no blocks have 
    4.  * timed out. 
    5.  * 返回一个其复制请求已超时的数据块列表,如果没有则返回null 
    6.  */  
    7. Block[] getTimedOutBlocks() {  
    8.    
    9. / 使用synchronized关键字对timedOutItems进行同步  
    10.   synchronized (timedOutItems) {  
    11.       
    12.     // 如果timedOutItems中没有数据,则直接返回null  
    13.     if (timedOutItems.size() <= 0) {  
    14.       return null;  
    15.     }  
    16.       
    17.     // 将Block列表timedOutItems转换成Block数组  
    18.     Block[] blockList = timedOutItems.toArray(  
    19.         new Block[timedOutItems.size()]);  
    20.       
    21.     // 清空Block列表timedOutItems  
    22.     timedOutItems.clear();  
    23.       
    24.     // 返回Block数组  
    25.     return blockList;  
    26.   }  
    27. }  

            PendingReplicationBlocks另外还提供了增加一个块到正在进行的块复制信息列表中的increment()方法和减少正在复制请求的数量的decrement()方法,代码如下:

    [java] view plain copy
     
    1.  /** 
    2.   * Add a block to the list of pending Replications 
    3.   * 增加一个块到正在进行的块复制信息列表中 
    4.   *  
    5.   * @param block The corresponding block 
    6.   * @param targets The DataNodes where replicas of the block should be placed 
    7.   */  
    8.  void increment(Block block, DatanodeDescriptor[] targets) {  
    9.      
    10. // 使用synchronized关键字对pendingReplications进行同步  
    11. synchronized (pendingReplications) {  
    12.       
    13.   // 根据Block实例block先从集合pendingReplications中查找  
    14.      PendingBlockInfo found = pendingReplications.get(block);  
    15.      if (found == null) {  
    16.     // 如果没有找到,直接put进去,利用DatanodeDescriptor[]的实例targets构造PendingBlockInfo对象  
    17.        pendingReplications.put(block, new PendingBlockInfo(targets));  
    18.      } else {  
    19.     // 如果之前存在,增加复制数量,即增加目标数据节点  
    20.        found.incrementReplicas(targets);  
    21.        // 设置时间戳为当前时间  
    22.        found.setTimeStamp();  
    23.      }  
    24.    }  
    25.  }  
    26.   
    27.  /** 
    28.   * One replication request for this block has finished. 
    29.   * Decrement the number of pending replication requests 
    30.   * for this block. 
    31.   * 针对给定数据块的一个复制请求已完成。针对该数据块,减少正在复制请求的数量。 
    32.   *  
    33.   * @param The DataNode that finishes the replication 
    34.   */  
    35.  void decrement(Block block, DatanodeDescriptor dn) {  
    36.      
    37. // 使用synchronized关键字对pendingReplications进行同步  
    38. synchronized (pendingReplications) {  
    39.       
    40.   // 根据Block实例block先从集合pendingReplications中查找  
    41.      PendingBlockInfo found = pendingReplications.get(block);  
    42.      if (found != null) {  
    43.        if(LOG.isDebugEnabled()) {  
    44.          LOG.debug("Removing pending replication for " + block);  
    45.        }  
    46.          
    47.        // 减少复制数量,即减少目标数据节点  
    48.        found.decrementReplicas(dn);  
    49.          
    50.        // 如果数据块对应的复制数量总数小于等于0,复制工作完成,  
    51.        // 直接从pendingReplications集合中移除该数据块及其对应信息  
    52.        if (found.getNumReplicas() <= 0) {  
    53.          pendingReplications.remove(block);  
    54.        }  
    55.      }  
    56.    }  
    57.  }  

            以及统计块数量和块复制数量的方法,如下:

    [java] view plain copy
     
    1. /** 
    2.  * The total number of blocks that are undergoing replication 
    3.  * 正在被复制的块的总数 
    4.  */  
    5. int size() {  
    6.   return pendingReplications.size();  
    7. }   
    8.   
    9. /** 
    10.  * How many copies of this block is pending replication? 
    11.  * 块复制的总量 
    12.  */  
    13. int getNumReplicas(Block block) {  
    14.   synchronized (pendingReplications) {  
    15.     PendingBlockInfo found = pendingReplications.get(block);  
    16.     if (found != null) {  
    17.       return found.getNumReplicas();  
    18.     }  
    19.   }  
    20.   return 0;  
    21. }  

            上述方法代码逻辑都很简单,而且注释也很详细,此处不再过多赘述!

  • 相关阅读:
    nyoj67三角形面积
    hduoj1097A hard puzzle
    nyoj168房间安排
    nyoj73 比大小
    hduoj1021 Fibonacci Again
    hduoj1018 Big Number
    hduoj1108最小公倍数
    nyoj312 20岁生日
    hduoj1019 Least Common Multiple
    nyoj144小珂的苦恼
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5556245.html
Copyright © 2011-2022 走看看