zoukankan      html  css  js  c++  java
  • HDFS dfsclient读文件过程 源码分析

    HDFS读取文件的重要概念

    HDFS一个文件由多个block构成。HDFS在进行block读写的时候是以packet(默认每个packet为64K)为单位进行的。每一个packet由若干个chunk(默认512Byte)组成。Chunk是进行数据校验的基本单位,对每一个chunk生成一个校验和(默认4Byte)并将校验和进行存储。在读取一个block的时候,数据传输的基本单位是packet,每个packet由若干个chunk组成。

    HDFS客户端读文件示例代码

    FileSystem hdfs = FileSystem.get(new Configuration());
    Path path = new Path("/testfile");// reading
    FSDataInputStream dis = hdfs.open(path);
    byte[] writeBuf = new byte[1024];
    int len = dis.read(writeBuf);
    System.out.println(new String(writeBuf, 0, len, "UTF-8"));
    dis.close();
    
    hdfs.close();

    文件的打开

    HDFS打开一个文件,需要在客户端调用DistributedFileSystem.open(Path f, int bufferSize),其实现为:

    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
      return new DFSClient.DFSDataInputStream(
            dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
    }

    其中dfs为DistributedFileSystem的成员变量DFSClient,其open函数被调用,其中创建一个DFSInputStream(src, buffersize, verifyChecksum)并返回。

    DFSClient.DFSDataInputStream实现了HDFS的FSDataInputStream,里面简单包装了DFSInputStream,实际实现是DFSInputStream完成的。

    在DFSInputStream的构造函数中,openInfo函数被调用,其主要从namenode中得到要打开的文件所对应的blocks的信息,实现如下:

    synchronized void openInfo() throws IOException {
    LocatedBlocks newInfo
    = callGetBlockLocations(namenode, src, 0, prefetchSize); this.locatedBlocks = newInfo; this.currentNode = null; } private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,String src, long start, long length) throws IOException { return namenode.getBlockLocations(src, start, length); }

    LocatedBlocks主要包含一个链表的List<LocatedBlock> blocks,其中每个LocatedBlock包含如下信息:

    • Block b:此block的信息
    • long offset:此block在文件中的偏移量
    • DatanodeInfo[] locs:此block位于哪些DataNode上

    上面namenode.getBlockLocations是一个RPC调用,最终调用NameNode类的getBlockLocations函数。

    NameNode返回的是根据客户端请求的文件名字,文件偏移量,数据长度,返回文件对应的数据块列表,数据块所在的DataNode节点。

    文件的顺序读取

     hdfs文件的顺序读取是最经常使用的.

    文件顺序读取的时候,客户端利用文件打开的时候得到的FSDataInputStream.read(byte[] buffer, int offset, int length)函数进行文件读操作。

    FSDataInputStream会调用其封装的DFSInputStream的read(byte[] buffer, int offset, int length)函数,实现如下:

    public synchronized int read(byte buf[], int off, int len) throws IOException {
        ...
        if (pos < getFileLength()) {
        int retries = 2;
        while (retries > 0) {
          try {
            if (pos > blockEnd) {//首次pos=0,blockEnd=-1,必定调用方法blockSeekTo,初始化blockEnd,以后是读完了当前块,需要读下一个块,才会调用blockSeekTo
              currentNode = blockSeekTo(pos);//根据pos选择块和数据节点,选择算法是遍历块所在的所有数据节点,选择第一个非死亡节点
            }
            int realLen = Math.min(len, (int) (blockEnd - pos + 1));
            int result = readBuffer(buf, off, realLen);
            
            if (result >= 0) {
              pos += result;
            } else {
              throw new IOException("Unexpected EOS from the reader");
            }
            ...
            return result;
          } catch (ChecksumException ce) {
            throw ce;            
          } catch (IOException e) {
            ...
            if (currentNode != null) { addToDeadNodes(currentNode); }//遇到无法读的DataNode,添加到死亡节点
            if (--retries == 0) {//尝试读三次都失败,就抛出异常
              throw e;
            }
          }
        }
        }
        return -1;
    }

     blockSeekTo函数会更新blockEnd,并创建对应的BlockReader,这里的BlockReader的初始化和上面的fetchBlockByteRange差不多,如果客户端和块所属的DataNode是同个节点,则初始化一个通过本地读取的BlockReader,否则创建一个通过Socket连接DataNode的BlockReader。

    BlockReader的创建也是通过BlockReader.newBlockReader创建的,具体分析请看后面。

    readBuffer方法比较简单,直接调用BlockReader的read方法直接读取数据。

    BlockReader的read方法就根据请求的块起始偏移量,长度,通过socket连接DataNode,获取块内容,BlockReader的read方法不会做缓存优化。

    文件的随机读取

    对于MapReduce,在提交作业时,已经确定了每个map和reduce要读取的文件,文件的偏移量,读取的长度,所以MapReduce使用的大部分是文件的随机读取。

    文件随机读取的时候,客户端利用文件打开的时候得到的FSDataInputStream.read(long position, byte[] buffer, int offset, int length)函数进行文件读操作。

    FSDataInputStream会调用其封装的DFSInputStream的read(long position, byte[] buffer, int offset, int length)函数,实现如下:

    public int read(long position, byte[] buffer, int offset, int length)throws IOException {  
      long filelen = getFileLength();
      int realLen = length;
      if ((position + length) > filelen) {
        realLen = (int)(filelen - position);
      }
    
      //首先得到包含从offset到offset + length内容的block列表
      //比如对于64M一个block的文件系统来说,欲读取从100M开始,长度为128M的数据,则block列表包括第2,3,4块block
      List<LocatedBlock> blockRange = getBlockRange(position, realLen);
      int remaining = realLen;
      //对每一个block,从中读取内容
      //对于上面的例子,对于第2块block,读取从36M开始,读取长度28M,对于第3块,读取整一块64M,对于第4块,读取从0开始,长度为36M,共128M数据
      for (LocatedBlock blk : blockRange) {
        long targetStart = position - blk.getStartOffset();
        long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
        fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, buffer, offset);
        remaining -= bytesToRead;
        position += bytesToRead;
        offset += bytesToRead;
      }
      ...return realLen;
    }

    getBlockRange方法根据文件的偏移量和长度,获取对应的数据块信息。主要是根据NameNode类的getBlockLocations方法实现,并做了缓存和二分查找等优化。

     fetchBlockByteRange方法真正从数据块读取内容,实现如下:

    private void fetchBlockByteRange(LocatedBlock block, long start,long end, byte[] buf, int offset) throws IOException {
      Socket dn = null;
      int numAttempts = block.getLocations().length;
      //此while循环为读取失败后的重试次数
      while (dn == null && numAttempts-- > 0 ) {
        //选择一个DataNode来读取数据
        DNAddrPair retval = chooseDataNode(block);
        DatanodeInfo chosenNode = retval.info;
        InetSocketAddress targetAddr = retval.addr;
        BlockReader reader = null;
        int len = (int) (end - start + 1);
        try {
    
          if (shouldTryShortCircuitRead(targetAddr)) {
            //如果要读取的块所属的DataNode与客户端是同一个节点,直接通过本地磁盘访问,减少网络流量
            reader = getLocalBlockReader(conf, src, block.getBlock(),accessToken, chosenNode, DFSClient.this.socketTimeout, start);
          } else {
            //创建Socket连接到DataNode
            dn = socketFactory.createSocket();
            dn.connect(targetAddr, socketTimeout);
            dn.setSoTimeout(socketTimeout);
            //利用建立的Socket链接,生成一个reader负责从DataNode读取数据
            reader = BlockReader.newBlockReader(dn, src, block.getBlock().getBlockId(), accessToken,block.getBlock().getGenerationStamp(), start, len, buffersize, verifyChecksum, clientName);
          }        
          //读取数据
          int nread = reader.readAll(buf, offset, len);
          return;
        } finally {
          IOUtils.closeStream(reader);
          IOUtils.closeSocket(dn);
          dn = null;
        }
        //如果读取失败,则将此DataNode标记为失败节点
        addToDeadNodes(chosenNode);
      }
    }

    读取块内容,会尝试该数据块所在的所有DataNode,如果失败,就把对应的DataNode加入到失败节点,下次选择节点就会忽略失败节点(只在独立的客户端缓存失败节点,不上报到namenode)。

    BlockReader的创建也是通过BlockReader.newBlockReader创建的,具体分析请看后面。

    最后,通过BlockReader的readAll方法读取块的完整内容。

    dfsclient和datanode的通信协议

    dfsclient的连接

    dfsclient首次连接datanode时,通信协议实现主要是BlockReader.newBlockReader方法的实现,如下:

    public static BlockReader newBlockReader( Socket sock, String file,long blockId,long genStamp,long startOffset, long len,int bufferSize, boolean verifyChecksum,String clientName) throws IOException {
    
      //使用Socket建立写入流,向DataNode发送读指令
      DataOutputStream out = new DataOutputStream(
        new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));
      out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
      out.write( DataTransferProtocol.OP_READ_BLOCK );
      out.writeLong( blockId );
      out.writeLong( genStamp );
      out.writeLong( startOffset );
      out.writeLong( len );
      Text.writeString(out, clientName);
      out.flush();
      //使用Socket建立读入流,用于从DataNode读取数据
      DataInputStream in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(sock),bufferSize));
      short status = in.readShort();//块读取的状态标记,一般是成功
      DataChecksum checksum = DataChecksum.newDataChecksum( in );
      long firstChunkOffset = in.readLong();
      //生成一个reader,主要包含读入流,用于读取数据
      return new BlockReader( file, blockId, in, checksum, verifyChecksum,
                              startOffset, firstChunkOffset, sock );
    }

    这里的startOffset是相对于块的起始偏移量,len是要读取的长度。

    DataChecksum.newDataChecksum(in),会从DataNode获取该块的checksum加密方式,加密长度。

    BlockReader的readAll函数就是用上面生成的DataInputStream读取数据。

     下面是是读数据块时,客户端发送的信息:

    version operator blockid generationStamp startOffset length clientName  accessToken

    operator:byte Client所需要的操作,读取一个block、写入一个block等等
    version:short Client所需要的数据与Datanode所提供数据的版本是否一致
    blockId:long 所要读取block的blockId
    generationStamp:long 所需要读取block的generationStamp
    startOffset:long 读取block的的起始位置
    length:long 读取block的长度
    clientName:String Client的名字
    accessToken:Token Client提供的验证信息,用户名密码等

    DataNode对dfsclient的响应

    DataNode负责与客户端代码的通信协议交互的逻辑,主要是DataXceiver的readBlock方法实现的:

    private void readBlock(DataInputStream in) throws IOException {
      //读取指令
      long blockId = in.readLong();         
      Block block = new Block( blockId, 0 , in.readLong());
      long startOffset = in.readLong();
      long length = in.readLong();
      String clientName = Text.readString(in);
    
      //创建一个写入流,用于向客户端写数据
      OutputStream baseStream = NetUtils.getOutputStream(s,datanode.socketWriteTimeout);
      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
      //生成BlockSender用于读取本地的block的数据,并发送给客户端
      //BlockSender有一个成员变量InputStream blockIn用于读取本地block的数据
      BlockSender blockSender = new BlockSender(block, startOffset, length,true, true, false, datanode, clientTraceFmt);
      out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // 发送操作成功的状态
      //向客户端写入数据
      long read = blockSender.sendBlock(out, baseStream, null);
      ……
      } finally {
        IOUtils.closeStream(out);
        IOUtils.closeStream(blockSender);
      }
    }

    DataXceiver的sendBlock用于发送数据,数据发送包括应答头和后续的数据包。应答头如下(包含DataXceiver中发送的成功标识):

    DataXceiver的sendBlock的实现如下:

    long sendBlock(DataOutputStream out, OutputStream baseStream, 
                     BlockTransferThrottler throttler) throws IOException {
        ...
        try {
          try {
            checksum.writeHeader(out);//写入checksum的加密类型和加密长度
            if ( chunkOffsetOK ) {
              out.writeLong( offset );
            }
            out.flush();
          } catch (IOException e) { //socket error
            throw ioeToSocketException(e);
          }
          
          ...
          ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
          while (endOffset > offset) {//循环写入数据包
            long len = sendChunks(pktBuf, maxChunksPerPacket, 
                                  streamForSendChunks);
            offset += len;
            totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*checksumSize);
            seqno++;
          }
          try {
            out.writeInt(0); //标记结束    
            out.flush();
          } catch (IOException e) { //socket error
            throw ioeToSocketException(e);
          }
        }
        ...
        return totalRead;
    }

    DataXceiver的sendChunks尽可能在一个packet发送多个chunk,chunk的个数由maxChunks和剩余的块内容决定,实现如下:

    //默认是crc校验,bytesPerChecksum默认是512,checksumSize默认是4,表示数据块每512个字节,做一次checksum校验,checksum的结果是4个字节
    private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) 
                             throws IOException {
        int len = Math.min((int) (endOffset - offset),bytesPerChecksum * maxChunks);//len是要发送的数据长度
        if (len == 0) {
          return 0;
        }
    
        int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;//这次要发送的chunk数量
        int packetLen = len + numChunks*checksumSize + 4;//packetLen是整个包的长度,包括包头,校验码,数据
        pkt.clear();
        
        // write packet header
        pkt.putInt(packetLen);//整个packet的长度
        pkt.putLong(offset);//块的偏移量
        pkt.putLong(seqno);//序列号
        pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));//是否最后一个packet
        pkt.putInt(len);//发送的数据长度
        
        int checksumOff = pkt.position();
        int checksumLen = numChunks * checksumSize;
        byte[] buf = pkt.array();
        
        if (checksumSize > 0 && checksumIn != null) {
          try {
            checksumIn.readFully(buf, checksumOff, checksumLen);//填充chucksum的内容
          } catch (IOException e) {
            ...
          }
        }
        
        int dataOff = checksumOff + checksumLen;
        if (blockInPosition < 0) {
          IOUtils.readFully(blockIn, buf, dataOff, len);//填充块数据的内容
          if (verifyChecksum) {//默认是false,不验证
            //校验处理
          }
        }
        
        try {
          //通过socket发送数据到客户端
          
        } catch (IOException e) {
          throw ioeToSocketException(e);
        }
        ...
        return len;
    }

    数据组织成数据包来发送,数据包结构如下:

    packetLen offset sequenceNum isLastPacket startOffset dataLen checksum   data

    packetLen:int packet的长度,包括数据、数据的校验等等
    offset:long packet在block中的偏移量
    sequenceNum:long 该packet在这次block读取时的序号
    isLastPacket:byte packet是否是最后一个
    dataLen:int 该packet所包含block数据的长度,纯数据不包括校验和其他
    checksum:该packet每一个chunk的校验和,有多少个chunk就有多少个校验和
    data:该packet所包含的block数据

    数据传输结束的标志,是一个packetLen长度为0的包。客户端可以返回一个两字节的应答OP_STATUS_CHECKSUM_OK(5)

    dfsclient读取块内容

     hdfs文件的随机和顺序分析逻辑,都分析到BlockReader的readAll方法和read方法,这两个方法完成对数据块的内容读取。

    而readAll方法最后也是调用read方法,所以这里重点分析BlockReader的read方法,实现如下:

    public synchronized int read(byte[] buf, int off, int len) throws IOException {
          //第一次read, 忽略前面的额外数据
          if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
            int toSkip = (int)(startOffset - firstChunkOffset);
            if ( skipBuf == null ) {
              skipBuf = new byte[bytesPerChecksum];
            }
            if ( super.read(skipBuf, 0, toSkip) != toSkip ) {//忽略
              // should never happen
              throw new IOException("Could not skip required number of bytes");
            }
          }
          
          boolean eosBefore = gotEOS;
          int nRead = super.read(buf, off, len);
          // if gotEOS was set in the previous read and checksum is enabled :
          if (dnSock != null && gotEOS && !eosBefore && nRead >= 0
              && needChecksum()) {
            //checksum is verified and there are no errors.
            checksumOk(dnSock);
          }
          return nRead;
    }

    super.read即是FSInputChecker的read方法,实现如下

    public synchronized int read(byte[] b, int off, int len) throws IOException {
       //参数检查
        int n = 0;
        for (;;) {
          int nread = read1(b, off + n, len - n);
          if (nread <= 0) 
            return (n == 0) ? nread : n;
          n += nread;
          if (n >= len)
            return n;
        }
    }
    //read1的len被忽略,只返回一个chunk的数据长度(最后一个chunk可能不足一个完整chunk的长度)
    private int read1(byte b[], int off, int len)
      throws IOException {
        int avail = count-pos;
        if( avail <= 0 ) {
          if(len>=buf.length) {
            //直接读取一个数据chunk到用户buffer,避免多余一次复制
    //很巧妙,buf初始化的大小是chunk的大小,默认是512,这里的代码会在块的剩余内容大于一个chunk的大小时调用
    int nread = readChecksumChunk(b, off, len); return nread; } else { //读取一个数据chunk到本地buffer,也是调用readChecksumChunk方法
    //很巧妙,buf初始化大小是chunk的大小,默认是512,这里的代码会在块的剩余内容不足一个chunk的大小时进入调用
    fill(); if( count <= 0 ) { return -1; } else { avail = count; } } } //从本地buffer拷贝数据到用户buffer,避免最后一个chunk导致数组越界 int cnt = (avail < len) ? avail : len; System.arraycopy(buf, pos, b, off, cnt); pos += cnt; return cnt; }

    FSInputChecker的readChecksumChunk会读取一个数据块的chunk,并做校验,实现如下:

    //只返回一个chunk的数据长度(默认512,最后一个chunk可能不足一个完整chunk的长度)
    private int readChecksumChunk(byte b[], int off, int len)
      throws IOException {
        // invalidate buffer
        count = pos = 0;
              
        int read = 0;
        boolean retry = true;
        int retriesLeft = numOfRetries; //本案例中,numOfRetries是1,也就是说不会多次尝试
        do {
          retriesLeft--;
    
          try {
            read = readChunk(chunkPos, b, off, len, checksum);
            if( read > 0 ) {
              if( needChecksum() ) {//这里会做checksum校验
                sum.update(b, off, read);
                verifySum(chunkPos);
              }
              chunkPos += read;
            } 
            retry = false;
          } catch (ChecksumException ce) {
              ...
              if (retriesLeft == 0) {//本案例中,numOfRetries是1,也就是说不会多次尝试,失败了,直接抛出异常
                throw ce;
              }
              
              //如果读取的chunk校验失败,以当前的chunkpos为起始偏移量,尝试新的副本
              if (seekToNewSource(chunkPos)) {
                seek(chunkPos);
              } else {
                //找不到新的副本,抛出异常
                throw ce;
              }
            }
        } while (retry);
        return read;
    }

    readChunk方法由BlockReader实现,分析如下:

    //只返回一个chunk的数据长度(默认512,最后一个chunk可能不足一个完整chunk的长度)
    protected synchronized int readChunk(long pos, byte[] buf, int offset,int len, byte[] checksumBuf) throws IOException {
          //读取一个 DATA_CHUNK.
          long chunkOffset = lastChunkOffset;
          if ( lastChunkLen > 0 ) {
            chunkOffset += lastChunkLen;
          }
          
          //如果先前的packet已经读取完毕,就读下一个packet。
          if (dataLeft <= 0) {
            //读包的头部
            int packetLen = in.readInt();
            long offsetInBlock = in.readLong();
            long seqno = in.readLong();
            boolean lastPacketInBlock = in.readBoolean();
            int dataLen = in.readInt();
            //校验长度
            
            lastSeqNo = seqno;
            isLastPacket = lastPacketInBlock;
            dataLeft = dataLen;
            adjustChecksumBytes(dataLen);
            if (dataLen > 0) {
              IOUtils.readFully(in, checksumBytes.array(), 0,checksumBytes.limit());//读取当前包的所有数据块内容对应的checksum,后面的流程会讲checksum和读取的chunk内容做校验
            }
          }
    
          int chunkLen = Math.min(dataLeft, bytesPerChecksum); //确定此次读取的chunk长度,正常情况下是一个bytesPerChecksum(512字节),当文件最后不足一个bytesPerChecksum,读取剩余的内容。
          
          if ( chunkLen > 0 ) {
            IOUtils.readFully(in, buf, offset, chunkLen);//读取一个数据块的chunk
            checksumBytes.get(checksumBuf, 0, checksumSize);
          }
          
          dataLeft -= chunkLen;
          lastChunkOffset = chunkOffset;
          lastChunkLen = chunkLen;
          ...
          if ( chunkLen == 0 ) {
            return -1;
          }
          return chunkLen;
    }

    总结

     本文前面概要介绍了dfsclient读取文件的示例代码,顺序读取文件和随机读取文件的概要流程,最后还基于dfsclient和datanode读取块的过程,做了一个详细的分析。

     参考 http://caibinbupt.iteye.com/blog/284979

            http://www.cnblogs.com/forfuture1978/archive/2010/11/10/1874222.html

  • 相关阅读:
    项目常见异常
    mysql 存储过程中使用事物+事件定时执行存储过程
    Spring Mvc 配置 之 ContextLoaderListener
    Spring Boot 之 https
    Spring Boot 之 annotation注解
    用python打印99乘法口诀表
    gerrit代码审核工具之“error unpack failed error Missing unknown”错误解决思路
    在eclipse搭建python开发环境
    python2与python3语法区别之_重定向
    2_jenkins_git创建创建及项目构建
  • 原文地址:https://www.cnblogs.com/ggjucheng/p/2915307.html
Copyright © 2011-2022 走看看