zoukankan      html  css  js  c++  java
  • 【原创】大数据基础之HDFS(1)HDFS新创建文件如何分配Datanode

    HDFS中的File由Block组成,一个File包含一个或多个Block,当创建File时会创建一个Block,然后根据配置的副本数量(默认是3)申请3个Datanode来存放这个Block;

    通过hdfs fsck命令可以查看一个文件具体的Block、Datanode、Rack信息,例如:

    hdfs fsck /tmp/test.sql -files -blocks -locations -racks
    Connecting to namenode via http://name_node:50070
    FSCK started by hadoop (auth:SIMPLE) from /client for path /tmp/test.sql at Thu Dec 13 15:44:12 CST 2018
    /tmp/test.sql 16 bytes, 1 block(s): OK
    0. BP-436366437-name_node-1493982655699:blk_1449692331_378721485 len=16 repl=3 [/DEFAULT/server111:50010, /DEFAULT/server121:50010, /DEFAULT/server43:50010]

    Status: HEALTHY
    Total size: 16 B
    Total dirs: 0
    Total files: 1
    Total symlinks: 0
    Total blocks (validated): 1 (avg. block size 16 B)
    Minimally replicated blocks: 1 (100.0 %)
    Over-replicated blocks: 0 (0.0 %)
    Under-replicated blocks: 0 (0.0 %)
    Mis-replicated blocks: 0 (0.0 %)
    Default replication factor: 3
    Average block replication: 3.0
    Corrupt blocks: 0
    Missing replicas: 0 (0.0 %)
    Number of data-nodes: 193
    Number of racks: 1
    FSCK ended at Thu Dec 13 15:44:12 CST 2018 in 1 milliseconds


    The filesystem under path '/tmp/test.sql' is HEALTHY

    那3个Datanode是如何选择出来的?有一个优先级:

    1 当前机架(相对hdfs client而言)

    2 远程机架(相对hdfs client而言)

    3 另一机架

    4 全部随机

    然后每个机架能选择几个Datanode(即maxNodesPerRack)有一个计算公式,详见代码

    org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer

        private int findNewDatanode(final DatanodeInfo[] original
    
            ) throws IOException {
    
          if (nodes.length != original.length + 1) {
    
            throw new IOException(
    
                new StringBuilder()
    
                .append("Failed to replace a bad datanode on the existing pipeline ")
    
                .append("due to no more good datanodes being available to try. ")
    
                .append("(Nodes: current=").append(Arrays.asList(nodes))
    
                .append(", original=").append(Arrays.asList(original)).append("). ")
    
                .append("The current failed datanode replacement policy is ")
    
                .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ")
    
                .append("a client may configure this via '")
    
                .append(DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY)
    
                .append("' in its configuration.")
    
                .toString());
    
          }

    注释:当没有找到新的datanode时会报异常,报错如下:

    Caused by: java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[server82:50010], original=[server.82:50010]).
    The current failed datanode replacement policy is ALWAYS, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.

        private void addDatanode2ExistingPipeline() throws IOException {
    
    ...
    
          final DatanodeInfo[] original = nodes;
    
          final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
    
              src, fileId, block, nodes, storageIDs,
    
              failed.toArray(new DatanodeInfo[failed.size()]),
    
              1, dfsClient.clientName);
    
          setPipeline(lb);
    
     
    
          //find the new datanode
    
          final int d = findNewDatanode(original);

    注释:会调用getAdditionalDatanode方法来获取1个新的datanode,此处略去很多调用堆栈

                     

    org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault

      private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
    
                                        Node writer,
    
                                        List<DatanodeStorageInfo> chosenStorage,
    
                                        boolean returnChosenNodes,
    
                                        Set<Node> excludedNodes,
    
                                        long blocksize,
    
                                        final BlockStoragePolicy storagePolicy) {
    
    ...
    
        int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
    
        numOfReplicas = result[0];
    
        int maxNodesPerRack = result[1];
    
    ...
    
        final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
    
            blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy,
    
            EnumSet.noneOf(StorageType.class), results.isEmpty());

    注释:此处maxNodesPerRack表示每个机架最多只能分配几个datanode

      private Node chooseTarget(int numOfReplicas,
    
                                Node writer,
    
                                final Set<Node> excludedNodes,
    
                                final long blocksize,
    
                                final int maxNodesPerRack,
    
                                final List<DatanodeStorageInfo> results,
    
                                final boolean avoidStaleNodes,
    
                                final BlockStoragePolicy storagePolicy,
    
                                final EnumSet<StorageType> unavailableStorages,
    
                                final boolean newBlock) {
    
    ...
    
          if (numOfResults <= 1) {
    
            chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
    
                results, avoidStaleNodes, storageTypes);
    
            if (--numOfReplicas == 0) {
    
              return writer;
    
            }
    
          }

    注释:此处会尝试在远程机架(即与已有的datanode不同的机架)获取一个新的datanode

              

      protected void chooseRemoteRack(int numOfReplicas,
    
                                    DatanodeDescriptor localMachine,
    
                                    Set<Node> excludedNodes,
    
                                    long blocksize,
    
                                    int maxReplicasPerRack,
    
                                    List<DatanodeStorageInfo> results,
    
                                    boolean avoidStaleNodes,
    
                                    EnumMap<StorageType, Integer> storageTypes)
    
                                        throws NotEnoughReplicasException {
    
    ...
    
          chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
    
              excludedNodes, blocksize, maxReplicasPerRack, results,
    
              avoidStaleNodes, storageTypes);

    注释:此处会在所有可选的datanode中随机选择一个

                       

      protected DatanodeStorageInfo chooseRandom(int numOfReplicas,
    
                                String scope,
    
                                Set<Node> excludedNodes,
    
                                long blocksize,
    
                                int maxNodesPerRack,
    
                                List<DatanodeStorageInfo> results,
    
                                boolean avoidStaleNodes,
    
                                EnumMap<StorageType, Integer> storageTypes)
    
                                throws NotEnoughReplicasException {
    
    ...
    
        int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
    
            scope, excludedNodes);
    
    ...
    
        if (numOfReplicas>0) {
    
          String detail = enableDebugLogging;
    
          if (LOG.isDebugEnabled()) {
    
            if (badTarget && builder != null) {
    
              detail = builder.toString();
    
              builder.setLength(0);
    
            } else {
    
              detail = "";
    
            }
    
          }
    
          throw new NotEnoughReplicasException(detail);
    
        }

    注释:如果由于一些原因(比如节点磁盘满或者下线),导致numOfAvailableNodes计算结果为0,会抛出NotEnoughReplicasException

    其中maxNodesPerRack计算逻辑如下:

    org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault

      /**
    
       * Calculate the maximum number of replicas to allocate per rack. It also
    
       * limits the total number of replicas to the total number of nodes in the
    
       * cluster. Caller should adjust the replica count to the return value.
    
       *
    
       * @param numOfChosen The number of already chosen nodes.
    
       * @param numOfReplicas The number of additional nodes to allocate.
    
       * @return integer array. Index 0: The number of nodes allowed to allocate
    
       *         in addition to already chosen nodes.
    
       *         Index 1: The maximum allowed number of nodes per rack. This
    
       *         is independent of the number of chosen nodes, as it is calculated
    
       *         using the target number of replicas.
    
       */
    
      private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
    
        int clusterSize = clusterMap.getNumOfLeaves();
    
        int totalNumOfReplicas = numOfChosen + numOfReplicas;
    
        if (totalNumOfReplicas > clusterSize) {
    
          numOfReplicas -= (totalNumOfReplicas-clusterSize);
    
          totalNumOfReplicas = clusterSize;
    
        }
    
        // No calculation needed when there is only one rack or picking one node.
    
        int numOfRacks = clusterMap.getNumOfRacks();
    
        if (numOfRacks == 1 || totalNumOfReplicas <= 1) {
    
          return new int[] {numOfReplicas, totalNumOfReplicas};
    
        }
    
     
    
        int maxNodesPerRack = (totalNumOfReplicas-1)/numOfRacks + 2;
    
        // At this point, there are more than one racks and more than one replicas
    
        // to store. Avoid all replicas being in the same rack.
    
        //
    
        // maxNodesPerRack has the following properties at this stage.
    
        //   1) maxNodesPerRack >= 2
    
        //   2) (maxNodesPerRack-1) * numOfRacks > totalNumOfReplicas
    
        //          when numOfRacks > 1
    
        //
    
        // Thus, the following adjustment will still result in a value that forces
    
        // multi-rack allocation and gives enough number of total nodes.
    
        if (maxNodesPerRack == totalNumOfReplicas) {
    
          maxNodesPerRack--;
    
        }
    
        return new int[] {numOfReplicas, maxNodesPerRack};
    
      }

    注释:

    int maxNodesPerRack = (totalNumOfReplicas-1)/numOfRacks + 2;

        if (maxNodesPerRack == totalNumOfReplicas) {

          maxNodesPerRack--;

        }

  • 相关阅读:
    URAL 2046 A
    URAL 2056 Scholarship 水题
    Codeforces Gym 100286I iSharp 水题
    Codeforces Gym H. Hell on the Markets 贪心
    Codeforces Gym 100286G Giant Screen 水题
    Codeforces Gym 100286B Blind Walk DFS
    Codeforces Gym 100286F Problem F. Fibonacci System 数位DP
    Codeforces Gym 100286A. Aerodynamics 计算几何 求二维凸包面积
    Codeforces Gym 100418K Cards 暴力打表
    Codeforces Gym 100418J Lucky tickets 数位DP
  • 原文地址:https://www.cnblogs.com/barneywill/p/10114504.html
Copyright © 2011-2022 走看看