zoukankan      html  css  js  c++  java
  • 【原创】大数据基础之HDFS(1)HDFS新创建文件如何分配Datanode

    HDFS中的File由Block组成,一个File包含一个或多个Block,当创建File时会创建一个Block,然后根据配置的副本数量(默认是3)申请3个Datanode来存放这个Block;

    通过hdfs fsck命令可以查看一个文件具体的Block、Datanode、Rack信息,例如:

    hdfs fsck /tmp/test.sql -files -blocks -locations -racks
    Connecting to namenode via http://name_node:50070
    FSCK started by hadoop (auth:SIMPLE) from /client for path /tmp/test.sql at Thu Dec 13 15:44:12 CST 2018
    /tmp/test.sql 16 bytes, 1 block(s): OK
    0. BP-436366437-name_node-1493982655699:blk_1449692331_378721485 len=16 repl=3 [/DEFAULT/server111:50010, /DEFAULT/server121:50010, /DEFAULT/server43:50010]

    Status: HEALTHY
    Total size: 16 B
    Total dirs: 0
    Total files: 1
    Total symlinks: 0
    Total blocks (validated): 1 (avg. block size 16 B)
    Minimally replicated blocks: 1 (100.0 %)
    Over-replicated blocks: 0 (0.0 %)
    Under-replicated blocks: 0 (0.0 %)
    Mis-replicated blocks: 0 (0.0 %)
    Default replication factor: 3
    Average block replication: 3.0
    Corrupt blocks: 0
    Missing replicas: 0 (0.0 %)
    Number of data-nodes: 193
    Number of racks: 1
    FSCK ended at Thu Dec 13 15:44:12 CST 2018 in 1 milliseconds


    The filesystem under path '/tmp/test.sql' is HEALTHY

    那3个Datanode是如何选择出来的?有一个优先级:

    1 当前机架(相对hdfs client而言)

    2 远程机架(相对hdfs client而言)

    3 另一机架

    4 全部随机

    然后每个机架能选择几个Datanode(即maxNodesPerRack)有一个计算公式,详见代码

    org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer

        private int findNewDatanode(final DatanodeInfo[] original
    
            ) throws IOException {
    
          if (nodes.length != original.length + 1) {
    
            throw new IOException(
    
                new StringBuilder()
    
                .append("Failed to replace a bad datanode on the existing pipeline ")
    
                .append("due to no more good datanodes being available to try. ")
    
                .append("(Nodes: current=").append(Arrays.asList(nodes))
    
                .append(", original=").append(Arrays.asList(original)).append("). ")
    
                .append("The current failed datanode replacement policy is ")
    
                .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ")
    
                .append("a client may configure this via '")
    
                .append(DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY)
    
                .append("' in its configuration.")
    
                .toString());
    
          }

    注释:当没有找到新的datanode时会报异常,报错如下:

    Caused by: java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[server82:50010], original=[server.82:50010]).
    The current failed datanode replacement policy is ALWAYS, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.

        private void addDatanode2ExistingPipeline() throws IOException {
    
    ...
    
          final DatanodeInfo[] original = nodes;
    
          final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
    
              src, fileId, block, nodes, storageIDs,
    
              failed.toArray(new DatanodeInfo[failed.size()]),
    
              1, dfsClient.clientName);
    
          setPipeline(lb);
    
     
    
          //find the new datanode
    
          final int d = findNewDatanode(original);

    注释:会调用getAdditionalDatanode方法来获取1个新的datanode,此处略去很多调用堆栈

                     

    org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault

      private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
    
                                        Node writer,
    
                                        List<DatanodeStorageInfo> chosenStorage,
    
                                        boolean returnChosenNodes,
    
                                        Set<Node> excludedNodes,
    
                                        long blocksize,
    
                                        final BlockStoragePolicy storagePolicy) {
    
    ...
    
        int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
    
        numOfReplicas = result[0];
    
        int maxNodesPerRack = result[1];
    
    ...
    
        final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
    
            blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy,
    
            EnumSet.noneOf(StorageType.class), results.isEmpty());

    注释:此处maxNodesPerRack表示每个机架最多只能分配几个datanode

      private Node chooseTarget(int numOfReplicas,
    
                                Node writer,
    
                                final Set<Node> excludedNodes,
    
                                final long blocksize,
    
                                final int maxNodesPerRack,
    
                                final List<DatanodeStorageInfo> results,
    
                                final boolean avoidStaleNodes,
    
                                final BlockStoragePolicy storagePolicy,
    
                                final EnumSet<StorageType> unavailableStorages,
    
                                final boolean newBlock) {
    
    ...
    
          if (numOfResults <= 1) {
    
            chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
    
                results, avoidStaleNodes, storageTypes);
    
            if (--numOfReplicas == 0) {
    
              return writer;
    
            }
    
          }

    注释:此处会尝试在远程机架(即与已有的datanode不同的机架)获取一个新的datanode

              

      protected void chooseRemoteRack(int numOfReplicas,
    
                                    DatanodeDescriptor localMachine,
    
                                    Set<Node> excludedNodes,
    
                                    long blocksize,
    
                                    int maxReplicasPerRack,
    
                                    List<DatanodeStorageInfo> results,
    
                                    boolean avoidStaleNodes,
    
                                    EnumMap<StorageType, Integer> storageTypes)
    
                                        throws NotEnoughReplicasException {
    
    ...
    
          chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
    
              excludedNodes, blocksize, maxReplicasPerRack, results,
    
              avoidStaleNodes, storageTypes);

    注释:此处会在所有可选的datanode中随机选择一个

                       

      protected DatanodeStorageInfo chooseRandom(int numOfReplicas,
    
                                String scope,
    
                                Set<Node> excludedNodes,
    
                                long blocksize,
    
                                int maxNodesPerRack,
    
                                List<DatanodeStorageInfo> results,
    
                                boolean avoidStaleNodes,
    
                                EnumMap<StorageType, Integer> storageTypes)
    
                                throws NotEnoughReplicasException {
    
    ...
    
        int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
    
            scope, excludedNodes);
    
    ...
    
        if (numOfReplicas>0) {
    
          String detail = enableDebugLogging;
    
          if (LOG.isDebugEnabled()) {
    
            if (badTarget && builder != null) {
    
              detail = builder.toString();
    
              builder.setLength(0);
    
            } else {
    
              detail = "";
    
            }
    
          }
    
          throw new NotEnoughReplicasException(detail);
    
        }

    注释:如果由于一些原因(比如节点磁盘满或者下线),导致numOfAvailableNodes计算结果为0,会抛出NotEnoughReplicasException

    其中maxNodesPerRack计算逻辑如下:

    org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault

      /**
    
       * Calculate the maximum number of replicas to allocate per rack. It also
    
       * limits the total number of replicas to the total number of nodes in the
    
       * cluster. Caller should adjust the replica count to the return value.
    
       *
    
       * @param numOfChosen The number of already chosen nodes.
    
       * @param numOfReplicas The number of additional nodes to allocate.
    
       * @return integer array. Index 0: The number of nodes allowed to allocate
    
       *         in addition to already chosen nodes.
    
       *         Index 1: The maximum allowed number of nodes per rack. This
    
       *         is independent of the number of chosen nodes, as it is calculated
    
       *         using the target number of replicas.
    
       */
    
      private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
    
        int clusterSize = clusterMap.getNumOfLeaves();
    
        int totalNumOfReplicas = numOfChosen + numOfReplicas;
    
        if (totalNumOfReplicas > clusterSize) {
    
          numOfReplicas -= (totalNumOfReplicas-clusterSize);
    
          totalNumOfReplicas = clusterSize;
    
        }
    
        // No calculation needed when there is only one rack or picking one node.
    
        int numOfRacks = clusterMap.getNumOfRacks();
    
        if (numOfRacks == 1 || totalNumOfReplicas <= 1) {
    
          return new int[] {numOfReplicas, totalNumOfReplicas};
    
        }
    
     
    
        int maxNodesPerRack = (totalNumOfReplicas-1)/numOfRacks + 2;
    
        // At this point, there are more than one racks and more than one replicas
    
        // to store. Avoid all replicas being in the same rack.
    
        //
    
        // maxNodesPerRack has the following properties at this stage.
    
        //   1) maxNodesPerRack >= 2
    
        //   2) (maxNodesPerRack-1) * numOfRacks > totalNumOfReplicas
    
        //          when numOfRacks > 1
    
        //
    
        // Thus, the following adjustment will still result in a value that forces
    
        // multi-rack allocation and gives enough number of total nodes.
    
        if (maxNodesPerRack == totalNumOfReplicas) {
    
          maxNodesPerRack--;
    
        }
    
        return new int[] {numOfReplicas, maxNodesPerRack};
    
      }

    注释:

    int maxNodesPerRack = (totalNumOfReplicas-1)/numOfRacks + 2;

        if (maxNodesPerRack == totalNumOfReplicas) {

          maxNodesPerRack--;

        }

  • 相关阅读:
    关于json操作,这里再为大家推荐几款比较实用的json在线工具
    JS操作JSON总结
    HTML WebSocket
    HTML应用程序缓存
    前段也能学习网址
    简述jpg。Gif。png-8.png-24的区别,分别使用场景
    HTML相关问题
    html5移动端知识点总结
    html5本地存储(localStorage)使用介绍
    node环境变量----新全局包管理配置
  • 原文地址:https://www.cnblogs.com/barneywill/p/10114504.html
Copyright © 2011-2022 走看看