zoukankan      html  css  js  c++  java
  • HDFS读文件过程分析:读取文件的Block数据

    转自http://shiyanjun.cn/archives/962.html

    我们可以从java.io.InputStream类中看到,抽象出一个read方法,用来读取已经打开的InputStream实例中的字节,每次调用read方法,会读取一个字节数据,该方法抽象定义,如下所示:
    public abstract int read() throws IOException;
    Hadoop的DFSClient.DFSInputStream类实现了该抽象逻辑,如果我们清楚了如何从HDFS中读取一个文件的一个block的一个字节的原理,更加抽象的顶层只需要迭代即可获取到该文件的全部数据。
    HDFS读文件过程分析:获取文件对应的Block列表(http://shiyanjun.cn/archives/925.html)中,我们已经获取到一个文件对应的Block列表信息,打开一个文件,接下来就要读取实际的物理块数据,我们从下面的几个方面来详细说明读取数据的过程。

    Client从Datanode读取文件的一个字节

    下面,我们通过分析DFSClient.DFSInputStream中实现的代码,读取HDFS上文件的内容。首先从下面的方法开始:

    1 @Override
    2 public synchronized int read() throws IOException {
    3   int ret = read( oneByteBuf, 01 );
    4   return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
    5 }

    上面调用read(oneByteBuf, 0, 1)读取一个字节到单字节缓冲区oneByteBuf中,具体实现见如下方法:

    01 @Override
    02 public synchronized int read(byte buf[], int off, int len) throws IOException {
    03   checkOpen(); // 检查Client是否正在运行
    04   if (closed) {
    05     throw new IOException("Stream closed");
    06   }
    07   failures = 0;
    08  
    09   if (pos < getFileLength()) { // getFileLength()获取文件所包含的总字节数,pos表示读取当前文件的第(pos+1)个字节
    10     int retries = 2;
    11     while (retries > 0) {
    12       try {
    13         if (pos > blockEnd) { // blockEnd表示文件的长度(字节数)
    14           currentNode = blockSeekTo(pos); // 找到第pos个字节数据所在的Datanode(实际根据该字节数据所在的block元数据来定位)
    15         }
    16         int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));
    17         int result = readBuffer(buf, off, realLen); // 读取一个字节到缓冲区中
    18         
    19         if (result >= 0) {
    20           pos += result; // 每成功读取result个字节,pos增加result
    21         else {
    22           // got a EOS from reader though we expect more data on it.
    23           throw new IOException("Unexpected EOS from the reader");
    24         }
    25         if (stats != null && result != -1) {
    26           stats.incrementBytesRead(result);
    27         }
    28         return result;
    29       catch (ChecksumException ce) {
    30         throw ce;          
    31       catch (IOException e) {
    32         if (retries == 1) {
    33           LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
    34         }
    35         blockEnd = -1;
    36         if (currentNode != null) { addToDeadNodes(currentNode); }
    37         if (--retries == 0) {
    38           throw e;
    39         }
    40       }
    41     }
    42   }
    43   return -1;
    44 }

    读取文件数据的一个字节,具体过程如下:

    1. 检查流对象是否处于打开状态(前面已经获取到文件对应的block列表的元数据,并打开一个InputStream对象)
    2. 从文件的第一个block开始读取,首先需要找到第一个block对应的数据块所在的Datanode,可以从缓存的block列表中查询到(如果查找不到,则会与Namenode进行一次RPC通信请求获取到)
    3. 打开一个到该读取的block所在Datanode节点的流,准备读取block数据
    4. 建立了到Datanode的连接后,读取一个字节数据到字节缓冲区中,返回读取的字节数(1个字节)

    在读取的过程中,以字节为单位,通过判断某个偏移位置的字节属于哪个block(根据block元数据所限定的字节偏移范围),在根据这个block去定位某一个Datanode节点,这样就可连续地读取一个文件的全部数据(组成文件的、连续的多个block数据块)。

    查找待读取的一个字节所在的Datanode节点

    上面public synchronized int read(byte buf[], int off, int len) throws IOException方法,调用了blockSeekTo方法来获取,文件某个字节索引位置的数据所在的Datanode节点。其实,很容易就能想到,想要获取到数据所在的Datanode节点,一定是从block元数据中计算得到,然后根据Client缓存的block映射列表,找到block对应的Datanode列表,我们看一下blockSeekTo方法的代码实现:

    01 private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
    02   ... ...
    03  
    04   DatanodeInfo chosenNode = null;
    05   int refetchToken = 1// only need to get a new access token once
    06   while (true) {
    07     LocatedBlock targetBlock = getBlockAt(target, true); // 获取字节偏移位置为target的字节数据所在的block元数据对象
    08     assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
    09     long offsetIntoBlock = target - targetBlock.getStartOffset();
    10  
    11     DNAddrPair retval = chooseDataNode(targetBlock); // 选择一个Datanode去读取数据
    12     chosenNode = retval.info;
    13     InetSocketAddress targetAddr = retval.addr;
    14  
    15     // 先尝试从本地读取数据,如果数据不在本地,则正常去读取远程的Datanode节点
    16     Block blk = targetBlock.getBlock();
    17     Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
    18     if (shouldTryShortCircuitRead(targetAddr)) {
    19       try {
    20         blockReader = getLocalBlockReader(conf, src, blk, accessToken,
    21             chosenNode, DFSClient.this.socketTimeout, offsetIntoBlock); // 创建一个用来读取本地数据的BlockReader对象
    22         return chosenNode;
    23       catch (AccessControlException ex) {
    24         LOG.warn("Short circuit access failed ", ex);
    25         //Disable short circuit reads
    26         shortCircuitLocalReads = false;
    27       catch (IOException ex) {
    28         if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
    29           /* Get a new access token and retry. */
    30           refetchToken--;
    31           fetchBlockAt(target);
    32           continue;
    33         else {
    34           LOG.info("Failed to read " + targetBlock.getBlock()
    35               " on local machine" + StringUtils.stringifyException(ex));
    36           LOG.info("Try reading via the datanode on " + targetAddr);
    37         }
    38       }
    39     }
    40  
    41     // 本地读取失败,按照更一般的方式去读取远程的Datanode节点来获取数据
    42     try {
    43       s = socketFactory.createSocket();
    44       LOG.debug("Connecting to " + targetAddr);
    45       NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);
    46       s.setSoTimeout(socketTimeout);
    47       blockReader = RemoteBlockReader.newBlockReader(s, src, blk.getBlockId(),
    48           accessToken,
    49           blk.getGenerationStamp(),
    50           offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
    51           buffersize, verifyChecksum, clientName); // 创建一个远程的BlockReader对象
    52       return chosenNode;
    53     catch (IOException ex) {
    54       if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
    55         refetchToken--;
    56         fetchBlockAt(target);
    57       else {
    58         LOG.warn("Failed to connect to " + targetAddr
    59             ", add to deadNodes and continue" + ex);
    60         if (LOG.isDebugEnabled()) {
    61           LOG.debug("Connection failure", ex);
    62         }
    63         // Put chosen node into dead list, continue
    64         addToDeadNodes(chosenNode); // 读取失败,会将选择的Datanode加入到Client的dead node列表,为下次读取选择合适的Datanode读取文件数据提供参考元数据信息
    65       }
    66       if (s != null) {
    67         try {
    68           s.close();
    69         catch (IOException iex) { }                      
    70       }
    71       s = null;
    72     }
    73   }
    74 }

    上面代码中,主要包括如下几个要点:

    • 选择合适的Datanode节点,提高读取效率

    在读取文件的时候,首先会从Namenode获取文件对应的block列表元数据,返回的block列表是按照Datanode的网络拓扑结构进行排序过的(本地节点优先,其次是同一机架节点),而且,Client还维护了一个dead node列表,只要此时bock对应的Datanode列表中节点不出现在dead node列表中就会被返回,用来作为读取数据的Datanode节点。

    • 如果Client为集群Datanode节点,尝试从本地读取block

    通过调用chooseDataNode方法返回一个Datanode结点,通过判断,如果该节点地址是本地地址,并且该节点上对应的block元数据信息的状态不是正在创建的状态,则满足从本地读取数据块的条件,然后会创建一个LocalBlockReader对象,直接从本地读取。在创建LocalBlockReader对象的过程中,会先从缓存中查找一个本地Datanode相关的LocalDatanodeInfo对象,该对象定义了与从本地Datanode读取数据的重要信息,以及缓存了待读取block对应的本地路径信息,可以从LocalDatanodeInfo类定义的属性来说明:

    1 private ClientDatanodeProtocol proxy = null;
    2 private final Map<Block, BlockLocalPathInfo> cache;

    如果缓存中存在待读取的block的相关信息,可以直接进行读取;否则,会创建一个proxy对象,以及计算待读取block的路径信息BlockLocalPathInfo,最后再加入到缓存,为后续可能的读取加速。我们看一下如果没有从缓存中找到LocalDatanodeInfo信息(尤其是BlockLocalPathInfo),则会执行如下逻辑:

    1 // make RPC to local datanode to find local pathnames of blocks
    2 pathinfo = proxy.getBlockLocalPathInfo(blk, token);

    上面proxy为ClientDatanodeProtocol类型,Client与Datanode进行RPC通信的协议,RPC调用getBlockLocalPathInfo获取block对应的本地路径信息,可以在Datanode类中查看具体实现,如下所示:

    1 BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);

    Datanode调用FSDataset(实现接口FSDatasetInterface)的getBlockLocalPathInfo,如下所示:

    1 @Override //FSDatasetInterface
    2 public BlockLocalPathInfo getBlockLocalPathInfo(Block block)
    3     throws IOException {
    4   File datafile = getBlockFile(block); // 获取本地block在本地Datanode文件系统中的文件路径
    5   File metafile = getMetaFile(datafile, block);  // 获取本地block在本地Datanode文件系统中的元数据的文件路径
    6   BlockLocalPathInfo info = new BlockLocalPathInfo(block, datafile.getAbsolutePath(), metafile.getAbsolutePath());
    7   return info;
    8 }

    接着可以直接去读取该block文件(如果需要检查校验和文件,会读取block的元数据文件metafile):

    01 ... // BlockReaderLocal类的newBlockReader静态方法
    02 // get a local file system
    03 File blkfile = new File(pathinfo.getBlockPath());
    04 dataIn = new FileInputStream(blkfile);
    05  
    06 if (!skipChecksum) { // 如果检查block的校验和
    07   // get the metadata file
    08   File metafile = new File(pathinfo.getMetaPath());
    09   checksumIn = new FileInputStream(metafile);
    10  
    11   // read and handle the common header here. For now just a version
    12   BlockMetadataHeader header = BlockMetadataHeader.readHeader(new DataInputStream(checksumIn));
    13   short version = header.getVersion();
    14   if (version != FSDataset.METADATA_VERSION) {
    15     LOG.warn("Wrong version (" + version + ") for metadata file for " + blk + " ignoring ...");
    16   }
    17   DataChecksum checksum = header.getChecksum();
    18   localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, checksum, true, dataIn, checksumIn);
    19 else {
    20   localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, dataIn);
    21 }

    在上面代码中,返回了BlockLocalPathInfo,但是很可能在这个过程中block被删除了,在删除block的时候,Namenode会调度指派该Datanode删除该block,恰好在这个时间间隔内block对应的BlockLocalPathInfo信息已经失效(文件已经被删除),所以上面这段代码再try中会抛出异常,并在catch中捕获到IO异常,会从缓存中再清除掉失效的block到BlockLocalPathInfo的映射信息。

    • 如果Client非集群Datanode节点,远程读取block

    如果Client不是Datanode本地节点,则只能跨网络节点远程读取,首先创建Socket连接:

    1 s = socketFactory.createSocket();
    2 LOG.debug("Connecting to " + targetAddr);
    3 NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);
    4 s.setSoTimeout(socketTimeout);

    建立Client到目标Datanode(targetAddr)的连接,然后同样也是创建一个远程BlockReader对象RemoteBlockReader来辅助读取block数据。创建RemoteBlockReader过程中,首先向目标Datanode发送RPC请求:

    01 // in and out will be closed when sock is closed (by the caller)
    02 DataOutputStream out = new DataOutputStream(newBufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));
    03  
    04 //write the header.
    05 out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); // Client与Datanode之间传输数据的版本号
    06 out.write( DataTransferProtocol.OP_READ_BLOCK ); // 传输操作类型:读取block
    07 out.writeLong( blockId ); // block ID
    08 out.writeLong( genStamp ); // 时间戳信息
    09 out.writeLong( startOffset ); // block起始偏移量
    10 out.writeLong( len ); // block长度
    11 Text.writeString(out, clientName); // 客户端标识
    12 accessToken.write(out);
    13 out.flush();

    然后获取到DataInputStream对象来读取Datanode的响应信息:

    1 DataInputStream in = new DataInputStream(
    2     new BufferedInputStream(NetUtils.getInputStream(sock), bufferSize));

    最后,返回一个对象RemoteBlockReader:

    1 return new RemoteBlockReader(file, blockId, in, checksum, verifyChecksum, startOffset, firstChunkOffset, sock);

    借助BlockReader来读取block字节

    我们再回到blockSeekTo方法中,待读取block所在的Datanode信息、BlockReader信息都已经具备,接着就可以从包含输入流(InputStream)对象的BlockReader中读取数据块中一个字节数据:

    1 int result = readBuffer(buf, off, realLen);

    将block数据中一个字节读取到buf中,如下所示:

    01 private synchronized int readBuffer(byte buf[], int off, int len) throws IOException {
    02   IOException ioe;
    03   boolean retryCurrentNode = true;
    04  
    05   while (true) {
    06     // retry as many times as seekToNewSource allows.
    07     try {
    08       return blockReader.read(buf, off, len); // 调用blockReader的read方法读取字节数据到buf中
    09     catch ( ChecksumException ce ) {
    10       LOG.warn("Found Checksum error for " + currentBlock + " from " + currentNode.getName() + " at " + ce.getPos());        
    11       reportChecksumFailure(src, currentBlock, currentNode);
    12       ioe = ce;
    13       retryCurrentNode = false// 只尝试读取当前选择的Datanode一次,失败的话就会被加入到Client的dead node列表中
    14     catch ( IOException e ) {
    15       if (!retryCurrentNode) {
    16         LOG.warn("Exception while reading from " + currentBlock + " of " + src + " from " + currentNode + ": " + StringUtils.stringifyException(e));
    17       }
    18       ioe = e;
    19     }
    20     boolean sourceFound = false;
    21     if (retryCurrentNode) {
    22       /* possibly retry the same node so that transient errors don't
    23        * result in application level failures (e.g. Datanode could have
    24        * closed the connection because the client is idle for too long).
    25        */
    26       sourceFound = seekToBlockSource(pos);
    27     else {
    28       addToDeadNodes(currentNode); // 加入到Client的dead node列表中
    29       sourceFound = seekToNewSource(pos); // 从当前选择的Datanode上读取数据失败,会再次选择一个Datanode,这里seekToNewSource方法内部调用了blockSeekTo方法去选择一个Datanode
    30     }
    31     if (!sourceFound) {
    32       throw ioe;
    33     }
    34     retryCurrentNode = false;
    35   }
    36 }

    通过BlockReaderLocal或者RemoteBlockReader来读取block数据,逻辑非常类似,主要是控制读取字节的偏移量,记录偏移量的状态信息,详细可以查看它们的源码。

    DataNode节点处理读文件Block请求

    我们可以在DataNode端看一下,如何处理一个读取Block的请求。如果Client与DataNode不是同一个节点,则为远程读取文件Block,首先Client需要发送一个请求头信息,代码如下所示:

    01 //write the header.
    02 out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); // Client与Datanode之间传输数据的版本号
    03 out.write( DataTransferProtocol.OP_READ_BLOCK ); // 传输操作类型:读取block
    04 out.writeLong( blockId ); // block ID
    05 out.writeLong( genStamp ); // 时间戳信息
    06 out.writeLong( startOffset ); // block起始偏移量
    07 out.writeLong( len ); // block长度
    08 Text.writeString(out, clientName); // 客户端标识
    09 accessToken.write(out);
    10 out.flush();

    DataNode节点端通过验证数据传输版本号(DataTransferProtocol.DATA_TRANSFER_VERSION)一致以后,会判断传输操作类型,如果是读操作DataTransferProtocol.OP_READ_BLOCK,则会通过Client建立的Socket来创建一个OutputStream对象,然后通过BlockSender向Client发送Block数据,代码如下所示:

    1 try {
    2   blockSender = new BlockSender(block, startOffset, length, truetruefalse, datanode, clientTraceFmt); // 创建BlockSender对象
    3 catch(IOException e) {
    4   out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
    5   throw e;
    6 }
    7  
    8 out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // 回复一个响应Header信息:成功状态
    9 long read = blockSender.sendBlock(out, baseStream, null); // 发送请求的Block数据
  • 相关阅读:
    如何使用SAP Intelligent Robotic Process Automation自动操作Excel
    OpenSAML 使用引导 IV: 安全特性
    Spring Cloud Zuul 网关使用与 OAuth2.0 认证授权服务
    微服务架构集大成者—Spring Cloud (转载)
    Spring Cloud Eureka 服务注册列表显示 IP 配置问题
    使用 Notification API 开启浏览器桌面提醒
    SignalR 中使用 MessagePack 序列化提高 WebSocket 通信性能
    配置 Nginx 的目录浏览功能
    关于 Nginx 配置 WebSocket 400 问题
    Migrate from ASP.NET Core 2.0 to 2.1
  • 原文地址:https://www.cnblogs.com/YDDMAX/p/6753591.html
Copyright © 2011-2022 走看看