zoukankan      html  css  js  c++  java
  • HDFS读写数据块--${dfs.data.dir}选择策略

      最近工作需要,看了HDFS读写数据块这部分。不过可能跟网上大部分帖子不一样,本文主要写了${dfs.data.dir}的选择策略,也就是block在DataNode上的放置策略。我主要是从我们工作需要的角度来读这部分代码的。  

    1 hdfs-site.xml
    2 <property>
    3   <name>dfs.data.dir</name>
    4   <value>/mnt/datadir1/data,/mnt/datadir2/data,/mnt/datadir3/data</value>
    5 </property>

       所谓${dfs.data.dir}的选择策略,就是当DataNode配置有多个${dfs.data.dir}目录时(如上面的配置),该选择哪个目录来存放block。一般多个硬盘分别挂载到不同的${dfs.data.dir}下,所以存储block是要决定block该放到哪个磁盘上。

      创建文件总共有两步:

      1、在写block之前,需要与NameNode通信来生成文件(INodeFile、INodeFileUnderConstruction)。首先在DFSClient端的create()方法中发起写请求,然后通过RPC由NameNode最终调用FSNameSystem的startFileInternal()方法来创建文件。

      1   private void startFileInternal(String src,
      2                                               PermissionStatus permissions,
      3                                               String holder, 
      4                                               String clientMachine, 
      5                                               boolean overwrite,
      6                                               boolean append,
      7                                               boolean createParent,
      8                                               short replication,
      9                                               long blockSize
     10                                               ) throws IOException {
     11     if (NameNode.stateChangeLog.isDebugEnabled()) {
     12       NameNode.stateChangeLog.debug("DIR* startFile: src=" + src
     13           + ", holder=" + holder
     14           + ", clientMachine=" + clientMachine
     15           + ", createParent=" + createParent
     16           + ", replication=" + replication
     17           + ", overwrite=" + overwrite
     18           + ", append=" + append);
     19     }
     20 
     21     FSPermissionChecker pc = getPermissionChecker();
     22     synchronized (this) {
     23       if (isInSafeMode())
     24         throw new SafeModeException("Cannot create " + src, safeMode);
     25       if (!DFSUtil.isValidName(src)) {
     26         throw new IOException("Invalid name: " + src);
     27       }
     28 
     29       // Verify that the destination does not exist as a directory already.
     30       boolean pathExists = dir.exists(src);
     31       if (pathExists && dir.isDir(src)) {
     32         throw new IOException("Cannot create "+ src + "; already exists as a directory");
     33       }
     34 
     35       if (isPermissionEnabled) {
     36         if (append || (overwrite && pathExists)) {
     37           checkPathAccess(pc, src, FsAction.WRITE);
     38         } else {
     39           checkAncestorAccess(pc, src, FsAction.WRITE);
     40         }
     41       }
     42 
     43       if (!createParent) {
     44         verifyParentDir(src);
     45       }
     46 
     47       try {
     48         INode myFile = dir.getFileINode(src); //根据路径寻找该文件
     49         recoverLeaseInternal(myFile, src, holder, clientMachine, false);
     50 
     51         try {
     52           verifyReplication(src, replication, clientMachine);
     53         } catch (IOException e) {
     54           throw new IOException("failed to create " + e.getMessage());
     55         }
     56         if (append) {//若是追加操作
     57           if (myFile == null) {
     58             throw new FileNotFoundException("failed to append to non-existent "
     59                 + src + " on client " + clientMachine);
     60           } else if (myFile.isDirectory()) {
     61             throw new IOException("failed to append to directory " + src
     62                 + " on client " + clientMachine);
     63           }
     64         } else if (!dir.isValidToCreate(src)) {
     65           if (overwrite) {//允许覆盖原来的文件
     66             delete(src, true);
     67           } else {
     68             throw new IOException("failed to create file " + src
     69                 + " on client " + clientMachine
     70                 + " either because the filename is invalid or the file exists");
     71           }
     72         }
     73 
     74         DatanodeDescriptor clientNode = host2DataNodeMap
     75             .getDatanodeByHost(clientMachine);
     76 
     77         if (append) {
     78           //
     79           // Replace current node with a INodeUnderConstruction.
     80           // Recreate in-memory lease record.
     81           //
     82           INodeFile node = (INodeFile) myFile;
     83           INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
     84               node.getLocalNameBytes(), node.getReplication(),
     85               node.getModificationTime(), node.getPreferredBlockSize(),
     86               node.getBlocks(), node.getPermissionStatus(), holder,
     87               clientMachine, clientNode);
     88           dir.replaceNode(src, node, cons);
     89           leaseManager.addLease(cons.clientName, src);
     90 
     91         } else {
     92           // Now we can add the name to the filesystem. This file has no
     93           // blocks associated with it.
     94           //
     95           checkFsObjectLimit();
     96 
     97           // increment global generation stamp
     98           long genstamp = nextGenerationStamp();
     99           INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
    100               replication, blockSize, holder, clientMachine, clientNode,
    101               genstamp);
    102           if (newNode == null) {
    103             throw new IOException("DIR* startFile: Unable to add to namespace");
    104           }
    105           leaseManager.addLease(newNode.clientName, src);
    106           if (NameNode.stateChangeLog.isDebugEnabled()) {
    107             NameNode.stateChangeLog.debug("DIR* startFile: "
    108                                        +"add "+src+" to namespace for "+holder);
    109           }
    110         }
    111       } catch (IOException ie) {
    112         NameNode.stateChangeLog.warn("DIR* startFile: "
    113                                      +ie.getMessage());
    114         throw ie;
    115       }
    116     }
    117   }
    startFileInternal()

      该方法的主要内容如下:

      1)首先做一些检查(安全模式、权限、该路径是否已经以文件夹形式存在等)

      2)若不是追加操作:

        生成generation stamp(针对每个文件生成一个);并构造INodeFileUnderConstruction对象(preferredBlockSize);将这个文件添加到filesystem;添加租约(即有时间限制的写锁);

            若是追加操作:

        将src下的INodeFile替换成INodeFileUnderConstruction;添加租约;

      2、在NameNode端生成文件之后,client向NameNode申请block,并将其写入到DataNode。在上面的工作完成后,就启动DataStreamer线程来向DataNode中写入block。整个流程如下:

      1)一些前期检查

      2)向NameNode申请block(与NameNode有一次通信)

        a. 根据副本放置策略,选择N个DataNode作为block的放置位置;

        b. 随机生成一个不重复的blockID;

        c. 把该block添加到对应的文件;

      3)将目标DN组织成pipeline,并向第一个DN发送Packet

       选择其中几个比较重要的方法分析下:

     1  /**
     2    * The client would like to obtain an additional block for the indicated
     3    * filename (which is being written-to).  Return an array that consists
     4    * of the block, plus a set of machines.  The first on this list should
     5    * be where the client writes data.  Subsequent items in the list must
     6    * be provided in the connection to the first datanode.
     7    *
     8    * Make sure the previous blocks have been reported by datanodes and
     9    * are replicated.  Will return an empty 2-elt array if we want the
    10    * client to "try again later".
    11    */
    12   //向NameNode申请block
    13   public LocatedBlock getAdditionalBlock(String src, 
    14                                          String clientName,
    15                                          HashMap<Node, Node> excludedNodes
    16                                          ) throws IOException {
    17     long fileLength, blockSize;
    18     int replication;
    19     DatanodeDescriptor clientNode = null;
    20     Block newBlock = null;
    21 
    22     NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: "
    23                                   +src+" for "+clientName);
    24 
    25     synchronized (this) {
    26       if (isInSafeMode()) {//check safemode first for failing-fast
    27         throw new SafeModeException("Cannot add block to " + src, safeMode);
    28       }
    29       // have we exceeded the configured limit of fs objects.
    30       checkFsObjectLimit();
    31 
    32       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
    33 
    34       //
    35       // If we fail this, bad things happen!
    36       //
    37       if (!checkFileProgress(pendingFile, false)) {
    38         throw new NotReplicatedYetException("Not replicated yet:" + src);
    39       }
    40       fileLength = pendingFile.computeContentSummary().getLength();
    41       blockSize = pendingFile.getPreferredBlockSize();
    42       clientNode = pendingFile.getClientNode();
    43       replication = (int)pendingFile.getReplication();
    44     }
    45 
    46     // choose targets for the new block to be allocated.
    47     //选择副本存放的位置
    48     DatanodeDescriptor targets[] = replicator.chooseTarget(src, 
    49                                                            replication,
    50                                                            clientNode,
    51                                                            excludedNodes,
    52                                                            blockSize);
    53     if (targets.length < this.minReplication) {
    54       throw new IOException("File " + src + " could only be replicated to " +
    55                             targets.length + " nodes, instead of " +
    56                             minReplication);
    57     }
    58 
    59     // Allocate a new block and record it in the INode. 
    60     synchronized (this) {
    61       if (isInSafeMode()) { //make sure it is not in safemode again.
    62         throw new SafeModeException("Cannot add block to " + src, safeMode);
    63       }
    64       INode[] pathINodes = dir.getExistingPathINodes(src);
    65       int inodesLen = pathINodes.length;
    66       checkLease(src, clientName, pathINodes[inodesLen-1]);
    67       INodeFileUnderConstruction pendingFile  = (INodeFileUnderConstruction) 
    68                                                 pathINodes[inodesLen - 1];
    69                                                            
    70       if (!checkFileProgress(pendingFile, false)) {
    71         throw new NotReplicatedYetException("Not replicated yet:" + src);
    72       }
    73 
    74       // allocate new block record block locations in INode.
    75       //分配block,并随机生成一个不重复的blockID,然后在INode中记录该block
    76       newBlock = allocateBlock(src, pathINodes);
    77       pendingFile.setTargets(targets);
    78       
    79       for (DatanodeDescriptor dn : targets) {
    80         dn.incBlocksScheduled();
    81       }
    82       dir.persistBlocks(src, pendingFile);
    83     }
    84     if (persistBlocks) {
    85       getEditLog().logSync();
    86     }
    87         
    88     // Create next block
    89     LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);
    90     if (isAccessTokenEnabled) {
    91       b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(), 
    92           EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
    93     }
    94     return b;
    95   }
    getAdditionalBlock

      上面的方法还涉及到了块的选择策略,这个留在下一篇再说。下面这个图来总结下上面方法的调用层次:

      最后重点说一下block在DataNode上的存储策略。其调度层次如下:

      首先说一下其中涉及到的数据结构:

    1  class FSVolume {    //卷信息,代表${dfs.data.dir}
    2     private File currentDir;      //存放block,即${dfs.data.dir}/current
    3     private FSDir dataDir;        //表示currentDir有哪些块文件
    4     private File tmpDir;          //存放一些临时文件,即${dfs.data.dir}/tmp
    5     private File blocksBeingWritten;    //放置正在写的block,即${dfs.data.dir}/ blocksBeingWritten
    6     private File detachDir;       //是否写分离,即${dfs.data.dir}/detach
    7     private DF usage;
    8     private DU dfsUsage;
    9     private long reserved;
    1   static class FSVolumeSet {  //卷信息集合,代表多个${dfs.data.dir}
    2     FSVolume[] volumes = null;    //代表多个FSVolume,并将其组织成一个数组
    3     int curVolume = 0;            //指示当前正在使用哪一个FSVolume  

      FSVolumeSet 代表多个${dfs.data.dir}目录的集合,它将这些目录组织成一个数组volumes,然后用curVolume来指示当前正在使用的是哪个${dfs.data.dir}目录。${dfs.data.dir}的选择策略如下:

      当有多个${dfs.data.dir}时,DataNode顺序地从volumes选择一个FSVolume用来存放block(先放在blocksBeingWritten目录下,写完后再转移到current目录下);

      每次写完一个block, curVolume增1。以此实现多个${dfs.data.dir}目录的轮流写。该策略在FSDataSet.FSVolumeSet的getNextVolume()方法中实现。

     1    synchronized FSVolume getNextVolume(long blockSize) throws IOException {
     2       
     3       if(volumes.length < 1) {
     4         throw new DiskOutOfSpaceException("No more available volumes");
     5       }
     6       
     7       // since volumes could've been removed because of the failure
     8       // make sure we are not out of bounds
     9       if(curVolume >= volumes.length) {
    10         curVolume = 0;
    11       }
    12       
    13       int startVolume = curVolume;
    14       
    15       while (true) {
    16         FSVolume volume = volumes[curVolume];
    17         curVolume = (curVolume + 1) % volumes.length;    //增1
    18         if (volume.getAvailable() > blockSize) { return volume; }
    19         if (curVolume == startVolume) {
    20           throw new DiskOutOfSpaceException("Insufficient space for an additional block");
    21         }
    22       }
    23     }

     

      接着来说一下读block的过程。在Map Task执行时,nextKeyValue()方法来从block中读取数据,主要步骤如下:

      1、根据创建Map Task时指定的文件偏移量和长度,来确定应该读取哪个block,并获取这个block的详细信息。(与NameNode有一次通信)。

      2、根据block所在的DataNode,选择一个最好的DN,并建立与该DN的socket连接(默认不启用本地读)。

      其方法的调用层次如下:

      Map Task读取数据是由RecordReader类来完成的。它是个接口,有两个子类:

      BlockReaderLocal:读取本地block(不通过DataNode)

      RemoteBlockReader:读取远程block(通过DataNode)

      Map Task在读取数据时,即使是本地数据也是使用RemoteBlockReader来读的,也就是通过socket,默认不开启本地读。通过这个链接的方法可以开启本地读(http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ShortCircuitLocalReads.html),也就是使用BlockReaderLocal直接来从本地读block,而不通过DataNode。以下的分析都是基于BlockReaderLocal来完成的。

          先说一下涉及到的数据结构:

    1 public class BlockLocalPathInfo implements Writable {    //用来描述block的位置信息
    2 
    3   private Block block;                  //特定的块文件
    4   private String localBlockPath = "";   //块文件的本地存储路径
    5   private String localMetaPath = "";    //块校验文件的本地存储路径
    1  //Stores the cache and proxy for a local datanode.
    2   private static class LocalDatanodeInfo {    //代表本机上的某个DataNode(一个机器上可能运行多个DataNode)
    3   private final Map<Block, BlockLocalPathInfo> cache;    //其中维护的表(block-->block位置信息)
    1   // Multiple datanodes could be running on the local machine. Store proxies in
    2   // a map keyed by the ipc port of the datanode.
    3   //BlockReaderLocal中维护的表:
    4   private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
    5    // Integer:表示端口号
    6    //  LocalDatanodeInfo:表示某个DataNode
    1 /**
    2  * This class is used by the datanode to maintain the map from a block 
    3  * to its metadata.
    4  */
    5 class DatanodeBlockInfo {    //表示该DN上的所有block信息(block-->block元信息)
    6 
    7   private FSVolume volume;       //block所在的FSVolume
    8   private File     file;         // block file
    9   private boolean detached;      // block的写复制是否完成
    1  //block与block元信息映射表
    2  HashMap<Block,DatanodeBlockInfo> volumeMap = new HashMap<Block, DatanodeBlockInfo>();;

      在读block时,首先根据localDatanodeInfoMap确定要访问的DataNode;然后从volumeMap中找到block对应的DatanodeBlockInfo信息(这其中就包括block对应的FSVolume,这是在存储block时确定的。本文前边有写);然后根据DatanodeBlockInfo来构造BlockLocalPathInfo对象,将block的相关信息存放到BlockLocalPathInfo对象中。最后BlockReaderLocal根据BlockLocalPathInfo对象来读取相应的block。 具体在BlockReaderLocal.newBlockReader()方法中。

     

      本文基于hadoop1.2.1

      如有错误,还请指正

      转载请注明出处:http://www.cnblogs.com/gwgyk/p/4124038.html

  • 相关阅读:
    Mysql元数据分析
    python编码encode和decode
    自己写的Python数据库连接类和sql语句拼接方法
    【甘道夫】Sqoop1.99.3基础操作--导入Oracle的数据到HDFS
    SVN配置以及自己主动部署到apache虚拟文件夹
    css中使用id和class 的不同
    Android OpenGL ES(七)----理解纹理与纹理过滤
    一键安装 redmine on windows 和发邮件设置
    足球大数据:致足球怀疑论者-The Counter(s)-Reformation反教条改革
    【Android进阶篇】Fragment的两种载入方式
  • 原文地址:https://www.cnblogs.com/gwgyk/p/4124038.html
Copyright © 2011-2022 走看看