zoukankan      html  css  js  c++  java
  • HDFS源码分析数据块复制监控线程ReplicationMonitor(一)

     ReplicationMonitor是HDFS中关于数据块复制的监控线程,它的主要作用就是计算DataNode工作,并将复制请求超时的块重新加入到待调度队列。其定义及作为线程核心的run()方法如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * Periodically calls computeReplicationWork(). 
    3.  * 周期性调用computeReplicationWork()方法 
    4.  */  
    5. private class ReplicationMonitor implements Runnable {  
    6.   
    7.   @Override  
    8.   public void run() {  
    9.       
    10.     // 如果namesystem持续运行,while循环一直进行  
    11.     while (namesystem.isRunning()) {  
    12.       try {  
    13.         // Process replication work only when active NN is out of safe mode.  
    14.         if (namesystem.isPopulatingReplQueues()) {  
    15.           // 计算数据节点工作  
    16.         computeDatanodeWork();  
    17.         // 将复制请求超时的块重新加入到待调度队列  
    18.           processPendingReplications();  
    19.         }  
    20.           
    21.         // 线程休眠replicationRecheckInterval时间  
    22.         Thread.sleep(replicationRecheckInterval);  
    23.       } catch (Throwable t) {  
    24.         if (!namesystem.isRunning()) {  
    25.           LOG.info("Stopping ReplicationMonitor.");  
    26.           if (!(t instanceof InterruptedException)) {  
    27.             LOG.info("ReplicationMonitor received an exception"  
    28.                 + " while shutting down.", t);  
    29.           }  
    30.           break;  
    31.         } else if (!checkNSRunning && t instanceof InterruptedException) {  
    32.           LOG.info("Stopping ReplicationMonitor for testing.");  
    33.           break;  
    34.         }  
    35.         LOG.fatal("ReplicationMonitor thread received Runtime exception. ", t);  
    36.         terminate(1, t);  
    37.       }  
    38.     }  
    39.   }  
    40. }  

            ReplicationMonitor线程的run()方法运行逻辑比较清晰,如果namesystem持续运行,while循环一直进行,在这个循环内,仅当活跃NN不在安全模式时才会进行复制工作:

            1、调用computeDatanodeWork()方法计算数据节点工作;

            2、调用processPendingReplications()方法将复制请求超时的块重新加入到待调度队列

            3、线程休眠replicationRecheckInterval时间后继续运行。

            首先说下这个replicationRecheckInterval,它是名字节点检查新的复制工作的时间间隔,其初始化在BlockManager的构造函数中,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. this.replicationRecheckInterval =   
    2.   conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,   
    3.               DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;  

            其取值取参数dfs.namenode.replication.interval,参数未配置的话,默认为3秒。

            再来看下计算数据节点工作的computeDatanodeWork()方法,它负责计算块复制、块无效工作可以被调度到数据节点的总数,数据节点将在接下来的心跳中被指派该工作,并返回被调度的复制或移除的块的数目,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * Compute block replication and block invalidation work that can be scheduled 
    3.  * on data-nodes. The datanode will be informed of this work at the next 
    4.  * heartbeat. 
    5.  *  
    6.  * 计算块复制、块无效工作可以被调度到数据节点的总数。数据节点将在接下来的心跳中被指派该工作。 
    7.  * 返回被调度的复制或移除的块的数目 
    8.  *  
    9.  * @return number of blocks scheduled for replication or removal. 
    10.  */  
    11. int computeDatanodeWork() {  
    12.   // Blocks should not be replicated or removed if in safe mode.  
    13.   // It's OK to check safe mode here w/o holding lock, in the worst  
    14.   // case extra replications will be scheduled, and these will get  
    15.   // fixed up later.  
    16.    
    17. / 如果namesystem处于安全模式,直接返回0  
    18.   if (namesystem.isInSafeMode()) {  
    19.     return 0;  
    20.   }  
    21.   
    22.   // 通过心跳管理器heartbeatManager获取存活数据节点数  
    23.   final int numlive = heartbeatManager.getLiveDatanodeCount();  
    24.     
    25.   // blocksReplWorkMultiplier为集群每个周期每个DataNode平均待复制的数据块数量,  
    26.   // blocksToProcess为每个周期集群需要复制的数据块数量  
    27.   final int blocksToProcess = numlive  
    28.       * this.blocksReplWorkMultiplier;  
    29.     
    30.   // blocksInvalidateWorkPct为集群每个周期每个DataNode平均待删除的无效数据块百分比  
    31.   // nodesToProcess为集群每个周期待删除的无效数据块数量  
    32.   final int nodesToProcess = (int) Math.ceil(numlive  
    33.       * this.blocksInvalidateWorkPct);  
    34.   
    35.   // 计算复制工作量workFound  
    36.   int workFound = this.computeReplicationWork(blocksToProcess);  
    37.   
    38.   // Update counters  
    39.   // namesystem加写锁  
    40.   namesystem.writeLock();  
    41.   try {  
    42.       
    43.     // 调用updateState()方法更新相关状态  
    44.     this.updateState();  
    45.       
    46.     // 将计算得到的复制工作量workFound赋值给被调度复制的数据块数scheduledReplicationBlocksCount  
    47.     this.scheduledReplicationBlocksCount = workFound;  
    48.   } finally {  
    49.       
    50.     // namesystem释放写锁  
    51.     namesystem.writeUnlock();  
    52.   }  
    53.     
    54.   // 计算删除无效块工作量,并累加到workFound  
    55.   workFound += this.computeInvalidateWork(nodesToProcess);  
    56.     
    57.   // 返回总工作量workFound  
    58.   return workFound;  
    59. }  

           computeDatanodeWork()方法的处理逻辑大体如下:

            1、如果namesystem处于安全模式,直接返回0;

            2、通过心跳管理器heartbeatManager获取存活数据节点数numlive;

            3、计算每个周期集群需要复制的数据块数量blocksToProcess:存活数据节点数numlive乘以集群每个周期每个DataNode平均待复制的数据块数量blocksReplWorkMultiplier,blocksReplWorkMultiplier取参数dfs.namenode.replication.work.multiplier.per.iteration,参数未配置的话默认为2;

            4、计算集群每个周期待删除的无效数据块数量nodesToProcess:存活数据节点数numlive乘以集群每个周期每个DataNode平均待删除的无效数据块百分比blocksInvalidateWorkPct,blocksInvalidateWorkPct取参数dfs.namenode.invalidate.work.pct.per.iteration,参数未配置的话默认为0.32f,计算结果向上取整;

            5、调用computeReplicationWork()方法,传入blocksToProcess,计算复制工作量workFound;

            6、namesystem加写锁;

            7、调用updateState()方法更新相关状态;

            8、将计算得到的复制工作量workFound赋值给被调度复制的数据块数scheduledReplicationBlocksCount;

            9、namesystem释放写锁;

            10、调用computeInvalidateWork()方法,传入nodesToProcess(),计算删除无效块工作量,并累加到workFound;

            11、返回总工作量workFound。

            下面,我们看下计算复制工作量的computeReplicationWork()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * Scan blocks in {@link #neededReplications} and assign replication 
    3.  * work to data-nodes they belong to. 
    4.  * 
    5.  * The number of process blocks equals either twice the number of live 
    6.  * data-nodes or the number of under-replicated blocks whichever is less. 
    7.  * 
    8.  * @return number of blocks scheduled for replication during this iteration. 
    9.  */  
    10. int computeReplicationWork(int blocksToProcess) {  
    11.   List<List<Block>> blocksToReplicate = null;  
    12.     
    13.   // namesystem加写锁  
    14.   namesystem.writeLock();  
    15.   try {  
    16.     // Choose the blocks to be replicated  
    17.     // 通过neededReplications的chooseUnderReplicatedBlocks()方法,  
    18.     // 选取blocksToProcess个待复制的数据块,放入blocksToReplicate列表,  
    19.     // blocksToReplicate是一个数据块列表的列表,外层的位置索引代表数据块复制的优先级  
    20.     blocksToReplicate = neededReplications  
    21.         .chooseUnderReplicatedBlocks(blocksToProcess);  
    22.   } finally {  
    23.       
    24.     // namesystem释放写锁  
    25.     namesystem.writeUnlock();  
    26.   }  
    27.     
    28.   // 调用computeReplicationWorkForBlocks()方法,进行实际数据块复制操作,传入待复制数据块列表的列表,位置索引代表复制的优先级  
    29.   return computeReplicationWorkForBlocks(blocksToReplicate);  
    30. }  

            computeReplicationWork()方法比较短,逻辑也很清晰,如下:

            1、namesystem加写锁;

            2、通过neededReplications的chooseUnderReplicatedBlocks()方法,选取blocksToProcess个待复制的数据块,放入blocksToReplicate列表,blocksToReplicate是一个数据块列表的列表,外层的位置索引代表数据块复制的优先级:

            关于如何通过neededReplications的chooseUnderReplicatedBlocks()方法选取blocksToProcess个待复制的数据块,请参考《HDFS源码分析之UnderReplicatedBlocks(二)》一文;

            3、namesystem释放写锁;

            4、调用computeReplicationWorkForBlocks()方法,进行实际数据块复制操作,传入待复制数据块列表的列表,位置索引代表复制的优先级。

  • 相关阅读:
    网络流24题题解
    NOIP2018游记
    AGC016题解
    雅礼集训总结
    数学相关【真·NOIP】
    NOIP2018系列
    洛咕P4180 严格次小生成树
    矩阵乘法学习笔记
    一些比较神奇的思路
    点分治复习记
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5556277.html
Copyright © 2011-2022 走看看