zoukankan      html  css  js  c++  java
  • HDFS副本存放策略

      在client向DataNode写入block之前,会与NameNode有一次通信,由NameNode来选择指定数目的DataNode来存放副本。具体的副本选择策略在BlockPlacementPolicy接口中,其子类实现是BlockPlacementPolicyDefault。该类中会有多个chooseTarget()方法重载,但最终调用了下面的方法:

     1 /**
     2    * This is not part of the public API but is used by the unit tests.
     3    */
     4   DatanodeDescriptor[] chooseTarget(int numOfReplicas,
     5                                     DatanodeDescriptor writer,
     6                                     List<DatanodeDescriptor> chosenNodes,
     7                                     HashMap<Node, Node> excludedNodes,
     8                                     long blocksize) {
     9       //numOfReplicas:要选择的副本个数
    10       //clusterMap.getNumOfLeaves():整个集群的DN个数
    11     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
    12       return new DatanodeDescriptor[0];
    13     }
    14       
    15     //excludedNodes:排除的DN(因为有些DN已经被选中,所以不再选择他们)
    16     if (excludedNodes == null) {
    17       excludedNodes = new HashMap<Node, Node>();
    18     }
    19      
    20     int clusterSize = clusterMap.getNumOfLeaves();
    21     //总的副本个数=已选择的个数 + 指定的副本个数
    22     int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
    23     if (totalNumOfReplicas > clusterSize) {    //若总副本个数 > 整个集群的DN个数
    24       numOfReplicas -= (totalNumOfReplicas-clusterSize);
    25       totalNumOfReplicas = clusterSize;
    26     }
    27       
    28     //计算每个一个rack能有多少个DN被选中
    29     int maxNodesPerRack = 
    30       (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
    31       
    32     List<DatanodeDescriptor> results = 
    33       new ArrayList<DatanodeDescriptor>(chosenNodes);
    34     for (DatanodeDescriptor node:chosenNodes) {
    35       // add localMachine and related nodes to excludedNodes
    36       addToExcludedNodes(node, excludedNodes);
    37       adjustExcludedNodes(excludedNodes, node);
    38     }
    39       
    40     //客户端不是DN
    41     if (!clusterMap.contains(writer)) {
    42       writer=null;
    43     }
    44       
    45     boolean avoidStaleNodes = (stats != null && stats
    46         .shouldAvoidStaleDataNodesForWrite());
    47     
    48     //选择numOfReplicas个DN,并返回本地DN
    49     DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
    50         excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
    51       
    52     results.removeAll(chosenNodes);
    53       
    54     // sorting nodes to form a pipeline
    55     //将选中的DN(result中的元素)组织成pipe
    56     return getPipeline((writer==null)?localNode:writer,
    57                        results.toArray(new DatanodeDescriptor[results.size()]));
    58   }

      方法含义大概就如注释中写的,不过要注意其中的变量含义。在第48行,又调用chooseTarget()方法来选择指定数目的DN(选中的DN存放在result中),并返回一个DN作为本地DN。下面分析这个方法。 

     1 /* choose <i>numOfReplicas</i> from all data nodes */
     2   private DatanodeDescriptor chooseTarget(int numOfReplicas,
     3       DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
     4       long blocksize, int maxNodesPerRack, List<DatanodeDescriptor> results,
     5       boolean avoidStaleNodes) {
     6       
     7     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
     8       return writer;
     9     }
    10     int totalReplicasExpected = numOfReplicas + results.size();
    11       
    12     int numOfResults = results.size();
    13     boolean newBlock = (numOfResults==0);
    14     if (writer == null && !newBlock) {
    15       writer = results.get(0);
    16     }
    17         
    18     // Keep a copy of original excludedNodes
    19     final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ? 
    20         new HashMap<Node, Node>(excludedNodes) : null;
    21     
    22     try {
    23       if (numOfResults == 0) {    //选择本地DN
    24         writer = chooseLocalNode(writer, excludedNodes, blocksize,
    25             maxNodesPerRack, results, avoidStaleNodes);
    26         if (--numOfReplicas == 0) {
    27           return writer;
    28         }
    29       }
    30       if (numOfResults <= 1) {    //选择远程rack上的DN
    31         chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
    32             maxNodesPerRack, results, avoidStaleNodes);
    33         if (--numOfReplicas == 0) {
    34           return writer;
    35         }
    36       }
    37       if (numOfResults <= 2) {
    38         if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {  //若前两个DN在同一个rack上
    39             //已选择的前两个DN在同一个rack上,则选择与第1个DN不在同一个rack上的DN
    40           chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
    41               maxNodesPerRack, results, avoidStaleNodes);
    42         } else if (newBlock){
    43             //选择与第2个DN在同一个rack上的DN
    44           chooseLocalRack(results.get(1), excludedNodes, blocksize,
    45               maxNodesPerRack, results, avoidStaleNodes);
    46         } else {
    47             //选择与write在同一个rack上的DN
    48           chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
    49               results, avoidStaleNodes);
    50         }
    51         if (--numOfReplicas == 0) {
    52           return writer;
    53         }
    54       }
    55       //在整个集群中随机选择剩余的DN
    56       chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
    57           maxNodesPerRack, results, avoidStaleNodes);
    58     } catch (NotEnoughReplicasException e) {
    59       FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
    60                + (totalReplicasExpected - results.size()) + " to reach "
    61                + totalReplicasExpected + "
    "
    62                + e.getMessage());
    63       if (avoidStaleNodes) {
    64         // Retry chooseTarget again, this time not avoiding stale nodes.
    65 
    66         // excludedNodes contains the initial excludedNodes and nodes that were
    67         // not chosen because they were stale, decommissioned, etc.
    68         // We need to additionally exclude the nodes that were added to the 
    69         // result list in the successful calls to choose*() above.
    70         for (Node node : results) {
    71           oldExcludedNodes.put(node, node);
    72         }
    73         // Set numOfReplicas, since it can get out of sync with the result list
    74         // if the NotEnoughReplicasException was thrown in chooseRandom().
    75         numOfReplicas = totalReplicasExpected - results.size();
    76         return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
    77             maxNodesPerRack, results, false);
    78       }
    79     }
    80     return writer;
    81   }

       下面依次分析这3个DN的选择过程。

    1、选择本地DN:chooseLocalNode() 

     1  /* choose <i>localMachine</i> as the target.
     2    * if <i>localMachine</i> is not available, 
     3    * choose a node on the same rack
     4    * @return the chosen node
     5    */
     6   protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
     7       HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
     8       List<DatanodeDescriptor> results, boolean avoidStaleNodes)
     9     throws NotEnoughReplicasException {
    10     // if no local machine, randomly choose one node
    11     if (localMachine == null)    //client端上没有DN
    12         //从整个集群中随机选择一个DN作为本地DN
    13       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
    14           maxNodesPerRack, results, avoidStaleNodes);
    15       
    16     // otherwise try local machine first
    17     Node oldNode = excludedNodes.put(localMachine, localMachine);
    18     if (oldNode == null) { // was not in the excluded list
    19         //该client端的DN还没有被选中时,判断这个DN是否负载过重
    20       if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
    21           results, avoidStaleNodes)) {
    22         results.add(localMachine);
    23         // add localMachine and related nodes to excludedNode
    24         addToExcludedNodes(localMachine, excludedNodes);
    25         return localMachine;
    26       }
    27     } 
    28       
    29     // try a node on local rack
    30     //选择与该client同rack的DN
    31     return chooseLocalRack(localMachine, excludedNodes, blocksize,
    32         maxNodesPerRack, results, avoidStaleNodes);
    33   }

       本地DN的选择分三步:

      1.1)如果client上没有DN,则从整个集群中随机选择一个DN(chooseRandom()方法),并判断是否该DN是否负载过重(步骤如1.2);如果负载过重则重新随机选择一个。以此类推.....

      1.2)如果该client有DN,则判断该DN是否负载过重(isGoodTarget()方法),步骤如下:结点是否可用、结点是否在“stale”状态、结点容量是否足够、结点流量情况、该节点所在的机架中存放当前数据的DN是否过多;

      1.3)如果前两个条件都不满足,则选择与client同rack的DN(chooseLocalRack()方法)作为本地结点,步骤如下:

      a)随机选择一个与client同rack的DN(步骤同1.1);

      b)否则从整个集群中随机选择一个DN(步骤同1.1)。

      这两步需要解释一下,他们的步骤与1.1都是相同的,那么怎么会得出不同的结果。原因在于传给chooseRandom()方法的第一个参数。如果参数是“NodeBase.ROOT”,实质上就是"/",表示的是整个集群;如果是“localMachine.getNetworkLocation()”,则表示localMachine所在的rack。这样,通过第一个参数就可以表示要进行选择的范围。在NetworkTopology接口中定义了DN与rack的关系,机架感知也是借此来实现。

    2、选择远程rack上的DN:chooseRemoteRack()

     1 /* choose <i>numOfReplicas</i> nodes from the racks 
     2    * that <i>localMachine</i> is NOT on.
     3    * if not enough nodes are available, choose the remaining ones 
     4    * from the local rack
     5    */
     6   protected void chooseRemoteRack(int numOfReplicas,
     7                                 DatanodeDescriptor localMachine,
     8                                 HashMap<Node, Node> excludedNodes,
     9                                 long blocksize,
    10                                 int maxReplicasPerRack,
    11                                 List<DatanodeDescriptor> results,
    12                                 boolean avoidStaleNodes)
    13     throws NotEnoughReplicasException {
    14     int oldNumOfReplicas = results.size();
    15     // randomly choose one node from remote racks
    16     try {
    17         //选择与localMachine不在同一个rack上的DN
    18       chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
    19           excludedNodes, blocksize, maxReplicasPerRack, results,
    20           avoidStaleNodes);
    21     } catch (NotEnoughReplicasException e) {
    22         //选择与localMachine在同一个rack上的DN
    23       chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
    24                    localMachine.getNetworkLocation(), excludedNodes, blocksize, 
    25                    maxReplicasPerRack, results, avoidStaleNodes);
    26     }
    27   }

      远程DN的选择分两步:

      2.1)从非本地rack上选择一个DN(步骤同1.1);

      2.2)否则从本地rack上选择一个DN(步骤同1.1);

      同样,这两步还是复用了chooseRandom()方法。2.1)的参数为"~" + localMachine.getNetworkLocation(),即在集群中除了localMachine所在的rack中选择一个DN(“~”表示排除);2.2)的参数为“localMachine.getNetworkLocation()”,表示从localMachine所在的rack中选择一个DN。这里很重要,可以看到,选择的第二个DN与第一个DN并不一定就在不同的rack

    3、选择第3个DN

      代码在上面第二段代码分析的第37~50行中,具体步骤如下:  

      3.1)如果前两个DN在同一个rack上,则选择一个与他们不在同一个rack上的DN,同步骤2; 

      3.2)否则,如果newBlock为true,则选择与第二个DN同rack的DN,步骤同1.3; 

      3.3)否则,选择与第一个DN同rack的DN,步骤同1.3;

    4、 从整个集群中选择剩余副本个数的DN,步骤同1.1。(代码在上面第二段代码分析的第56行)

      最后返回到上面第一段代码的最后部分,将这些选中的DN组织成pipeline。

      通过上面的分析也就明白一个问题:网上经常会看到,有人说第三个DN是与第二个DN是同rack的,也有人说第三个DN是与第一个DN同rack的。那么到底哪个说法对呢?关键就看第二个DN的选择,我在上面写了,第二个DN可能是与第一个DN不在同一个rack,但也可能在同一个rack中,具体要根据当时集群中的情况来分析。所以不能简单的认死理。

      本文基于hadoop1.2.1

      如有错误,还请指正

      参考文章:http://blog.csdn.net/xhh198781/article/details/7109764

      转载请注明出处:http://www.cnblogs.com/gwgyk/p/4137060.html

  • 相关阅读:
    007_排序_多重排序
    Locust 运行模式
    Locust介绍
    Locust环境搭建及报错解决
    8-02全局变量与输出语句
    8-01变量
    7-15ALL、 ANY、SOME子查询
    7-14 EXISTS子查询
    7-13IN和NOT IN 子查询
    7-12简单子查询
  • 原文地址:https://www.cnblogs.com/gwgyk/p/4137060.html
Copyright © 2011-2022 走看看