zoukankan      html  css  js  c++  java
  • Hadoop IO 特性详解(2)

    (本文引用了microheartggjucheng的一些资料,在此感谢。charles觉得知识无价,开源共享无价
    这一次我们接着分析文件IO校验的相关代码,看看最底层是如何实现这种大数据集的文件校验的,不得不说设计这个系统的程序员是世界上最具有智慧的一群人,面对复杂难解的问题总是可以找到很好的解决方法。
    其实对于文件校验这件事情,hadoop为什么重要上一篇文章讲过几个方面,提到的bit rot衰减其实很多人没有直观感受。我就举一个直观的例子以便于普通人感受一下bit rot的影响。一个磁盘,十年前我放500GB的日本爱情动作片在上面,第二天,第三天我再去打开它也还是没问题,高清无码,一年之后我再打开,可能也还是没问题。5年的时候,你可能发现有时候怎么会卡顿,十年之后基本上已经不能够完整读取了。可惜了你的500GB大片。
    为什么?因为时间越长,硬件总会经受各种损坏,温度,湿度,外力,甚至自身运行,即使一直不用他,也会发生变化,导致数据损坏,这就是bit rot.

    言归正传,看校验

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    /****************************************************************
     * Abstract Checksumed FileSystem.
     * It provide a basic implementation of a Checksumed FileSystem,
     * which creates a checksum file for each raw file.
     * It generates & verifies checksums at the client side.
     *
     *****************************************************************/
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public abstract class ChecksumFileSystem extends FilterFileSystem {
      private static final byte[] CHECKSUM_VERSION = new byte[] {'c''r''c', 0};
      private int bytesPerChecksum = 512;
      private boolean verifyChecksum = true;
      private boolean writeChecksum = true;
    }

    这个ChecksumFileSystem 在package org.apache.hadoop.fs中,继承FilterFileSystem,FilterFileSystem继承FileSystem类。那么首先看最上面的父类FileSystem:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    public abstract class FileSystem extends Configured implements Closeable {
     
     
      /**
       *作用是将本地文件拷贝到目标文件,如果目标还是在本地就不执行任何操作,如果是远程就执行
       * @param fsOutputFile path of output file
       * @param tmpLocalFile path to local tmp file
       */
      public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
        throws IOException {
        moveFromLocalFile(tmpLocalFile, fsOutputFile);
      }
     
      /**
       * 将本地文件src拷贝到远程中去,也就是增加到FS中,操作之后本地文件还是原封不动,保持完整。
       * @param src path
       * @param dst path
       */
      public void copyFromLocalFile(Path src, Path dst)
        throws IOException {
        copyFromLocalFile(false, src, dst);
      }
     
     
      /**
       * 在指定的地点利用给定的校验和选项创建一个FSDataOutputStream
       * @param f the file name to open
       * @param permission访问权限
       * @param flags {@link CreateFlag}s to use for this stream.
       * @param bufferSize the size of the buffer to be used.
       * @param replication required block replication for the file.副本
       * @param blockSize
       * @param progress
       * @param checksumOpt checksum parameter. If null, the values
       *        found in conf will be used.
       * @throws IOException
       * @see #setPermission(Path, FsPermission)
       */
      public FSDataOutputStream create(Path f,
          FsPermission permission,
          EnumSet<CreateFlag> flags,
          int bufferSize,
          short replication,
          long blockSize,
          Progressable progress,
          ChecksumOpt checksumOpt) throws IOException {
        // Checksum options are ignored by default. The file systems that
        // implement checksum need to override this method. The full
        // support is currently only available in DFS.
        return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
            bufferSize, replication, blockSize, progress);
      }
     
      /** Return true iff file is a checksum file name.是不是校验和文件呢。.crc结尾嘛,上一篇文章已经讲过这个点了*/
      public static boolean isChecksumFile(Path file) {
        String name = file.getName();
        return name.startsWith(".") && name.endsWith(".crc");
      }
     
      /** Return the name of the checksum file associated with a file.*/
      public Path getChecksumFile(Path file) {
        return new Path(file.getParent(), "." + file.getName() + ".crc");
      }
     
      /** Return the length of the checksum file given the size of the 
       * actual file.
       **/
      public long getChecksumFileLength(Path file, long fileSize) {
        return getChecksumLength(fileSize, getBytesPerSum());
      }
     
     
      /**
       * Set whether to verify checksum.
       */
      @Override
      public void setVerifyChecksum(boolean verifyChecksum) {
        this.verifyChecksum = verifyChecksum;
      }
     
      @Override
      public void setWriteChecksum(boolean writeChecksum) {
        this.writeChecksum = writeChecksum;
      }
     
      /** get the raw file system */
      @Override
      public FileSystem getRawFileSystem() {
        return fs;
      }
      public boolean reportChecksumFailure(Path f, FSDataInputStream in,            
                                long inPos, FSDataInputStream sums, long sumsPos) {     return false;   }
     
     
    }
    代码很多,其实功能也很清晰简单:我们闭上眼睛想想如果要多文件进行操作,就需要创建文件,创建输出流,接收输入流,对输入流的数据进行校验,有时候还要追加数据到某一个文件,还包括其他一些文件的常规操作。
    明白了这些想法,再来看这个类里面的代码,其实就简单了。
    根据microheart的说法:
    Hadoop抽象文件系统的方法分为两部分:
    处理文件和目录的相关事务 
    以下部分来自microheart的git写的太好我又没本事分析这么透彻,看半天眼睛都花了。所以引用之。
    读写文件数据

    FileSystem 接口
    fs.FileSystem
    FileSystem抽象类主要包含一下几类接口:
    打开或创建文件:FileSystem.open(), FileSystem.create(), FileSystem.append()
    读取文件流数据:FSDataInputStream.read()
    写文件流数据:FSDataOutputStream.write()
    关闭文件:FSDataInputStream.close(), FSDataOutputStream.close()
    删除文件:FileSystem.delete()
    文件重命名:FileSystem.rename()
    创建目录:FileSystem.mkdirs()
    定位文件流位置:FSDataInputStream.seek()
    获取目录/文件属性:FileSystem.getFileStatus(), FileSystem.get*()
    设置目录/文件属性:FileSystem.set*()
    设置/获取当前目录:FileSystem.getWorkingDirectory(), FileSystem.setWorkingDirectory()
    获取具体的文件系统:FileSystem.get(), FileSystem.getLocal()
    FileSystem.get()为工厂模式实现,用于创建多种文件系统产品。

    FileStatus
    Hadoop 通过FileSystem.getFileStatus()可获得文件/目录的属性,这些属性封装在FileStatus中。 FileStatus返回给客户端关于文件的元数据信息,包含路径,长度、修改时间、访问时间等基本信息和分布式文件系统特有的副本数。
    // Interface that represents the client side information for a file.
    public class FileStatus implements Writable, Comparable {
      private Path path;
      private long length;
      private boolean isdir;
      private short block_replication;
      private long blocksize;
      private long modification_time;
      private long access_time;
      private FsPermission permission;
      private String owner;
      private String group;
      ...
    }
    FileStatus实现了Writable接口,因此FileStatus对象可序列化后在网络上传输。 FileStatus几乎包含了文件/目录的所有属性,这样设计的好处可以减少在分布式系统中进行网络传输的次数。


    FSDataInputStream/FSDataOutputStream

    Hadoop基于流机制进行文件读写。通过FileSystem.open()可创建FSDataInputStream;通过FileSystem.create()/append()可创建FSDataOutputStream。

    FSDataInputStream实现了Seekable接口和PositionedReadable接口 FSDataInputStream是装饰器模式的典型运用,实现Seekable接口和PositionedReadable接口借助其装饰的InputStream对象。

      public class FSDataInputStream extends DataInputStream 
       implements Seekable, PositionedReadable, Closeable, HasFileDescriptor {
        public FSDataInputStream(InputStream in) throws IOException {
          super(in);
          if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
            throw new IllegalArgumentException(  "In is not an instance of Seekable or PositionedReadable");
          }
        }
    
        public synchronized void seek(long desired) throws IOException {
          ((Seekable)in).seek(desired);
        }
    
        public void readFully(long position, byte[] buffer)
          throws IOException {
          ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
        }
        ...
    }   
    

    Seekable接口提供了在流中进行随机存取的方法,可在流中随机定位位置,然后读取输入流。 seekToNewSource()重新选择一个副本。

    public interface Seekable {
      // Seek to the given offset from the start of the file.
      void seek(long pos) throws IOException;
    
      // Return the current offset from the start of the file
      long getPos() throws IOException;
    
      // Seeks a different copy of the data.  Returns true if found a new source, false otherwise.
      boolean seekToNewSource(long targetPos) throws IOException;
      }
    

    PositionedReadable接口提供了从输入流中某个位置读取数据的方法,这些方法读取数据后并不改变流的当前位置。 read()和readFully()方法都是线程安全的,区别在于:前者试图读取指定长度的数据,后者读取制定长度的数据,直到读满缓冲区或者流结束。

    public interface PositionedReadable {
      public int read(long position, byte[] buffer, int offset, int length) throws IOException;
    
      public void readFully(long position, byte[] buffer, int offset, int length) throws IOException;
    
      public void readFully(long position, byte[] buffer) throws IOException;
    }
    

    FSInputStream抽象类继承InputStream,并实现PositionedReadable接口。FSInputStream拥有多个子类,具体的文件系统实现相应的输入流。

    FSDataOutputStream继承DataOutputStream,Hadoop文件系统不支持随机写,因而没有实现Seekable接口。 FSDataOutputStream实现了Syncable接口,Syncable.sync()将流中的数据同步至设备中。

    public class FSDataOutputStream extends DataOutputStream implements Syncable {...}

    Hadoop 具体文件系统

    Hadoop提供大量具体的文件系统实现,以满足用户访问各种数据需求。 这些文件系统直接或者间接的继承org.apache.hadoop.fs.FileSystem。

    其中FilterFileSystem类似于java.io.FilterInputStream,用于在已有的文件系统之上提供新的功能,同样是包装器设计模式的运用。 ChecksumFileSystem用于在原始文件系统之上提供校验功能。

    继承关系为:

    FileSystem <-- FilterFileSystem <-- ChecksumFileSystem <-- LocalFileSystem
                                                           <-- ChecksumDistributeFileSystem

    其他的不多说,着重分析一下checksum的运行原理吧。

    ChecksumFileSystem

    ChecksumFileSystem继承FilterFileSystem,基于CRC-32提供对文件系统的数据校验。 与其他文件系统一样,ChecksumFileSystem需要提供处理文件/目录相关事务和文件读写服务。

    文件/目录相关事务

    这部分逻辑主要保持数据文件和CRC-32校验信息文件的一致性,如数据文件重命名,则校验文件也需要重命名。 如果数据文件为:foo.txt,则校验文件为:.foo.txt.crc

    以ChecksumFileSystem.delete()方法删除文件文件为例。若文件为目录则递归删除(recursive=true);若为普通文件,则删除对应的校验文件(若存在)。

    public boolean delete(Path f, boolean recursive) throws IOException{
      FileStatus fstatus = null;
      try {
        fstatus = fs.getFileStatus(f);
      } catch(FileNotFoundException e) {
        return false;
      }
      if(fstatus.isDir()) {
        return fs.delete(f, recursive);
      } else {
        Path checkFile = getChecksumFile(f);
        if (fs.exists(checkFile)) {
          fs.delete(checkFile, true);
        }
        return fs.delete(f, true);
      }
    }
    

    读文件

    Hadoop读文件时,需要从数据文件和校验文件中分别读出内容,并根据校验信息对读入的数据文件内容进行校验,以判断文件的完整性。 注:若校验事变,ChecksumFileSystem无法确定是数据文件出错还是校验文件出错。

    读数据流程与ChecksumFSInputChecker和其父类FSInputChecker相关。 FSInputChecker的成员变量包含数据缓冲区、校验和缓冲区和读取位置等变量。

    abstract public class FSInputChecker extends FSInputStream {
      protected Path file;        // The file name from which data is read from 
      private Checksum sum;    
      private boolean verifyChecksum = true;
      private byte[] buf;                        // 数据缓冲区
      private byte[] checksum;                   // 校验和缓冲区
      private int pos;
      private int count;
      private int numOfRetries;                  // 出错重试次数
      private long chunkPos = 0;  // cached file position
      ...
    }
    

    ChecksumFSInputChecker构造方法对基类FSInputChecker的成员进行初始化,基于CRC-32校验,校验和大小为4字节。 对校验文件首先要进行版本校验,即文件头部是否匹配魔数"crc"

    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
      throws IOException {
      super( file, fs.getFileStatus(file).getReplication() );
      ...
      try {
        ...
        if (!Arrays.equals(version, CHECKSUM_VERSION))
          throw new IOException("Not a checksum file: "+sumFile);
        this.bytesPerSum = sums.readInt();
        set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4);
      } catch (...) {         // ignore
        set(fs.verifyChecksum, null, 1, 0);
      }
    }
    

    FSInputChecker.read()循环调用read1()方法直到读取len个字节或者没有数据可读,返回读取的字节数。

    public synchronized int read(byte[] b, int off, int len) throws IOException {
      ... // 参数校验
      int n = 0;
      for (;;) {
        int nread = read1(b, off + n, len - n);
        if (nread <= 0) 
          return (n == 0) ? nread : n;
        n += nread;
        if (n >= len)
          return n;
      }
    }
    

    FSInputChecker.read1()方法为了提高效率,减少内存复制的次数,若当前FSInputChecker.buf没有数据可读且要读取的len字节数大于或等于数据块大小(buf.length,默认512字节),则通过readchecksumChunk()方法将数据直接读取目标数组中,而不需经过FSInputChecker.buf的中转。 若buf没有数据可读且读取的len字节数小于数据块大小,则通过fill()方法从数据流中一次读取一个数据块。

    private int read1(byte b[], int off, int len) throws IOException {
      int avail = count-pos;
      if( avail <= 0 ) {
        if(len>=buf.length) {
          int nread = readChecksumChunk(b, off, len);   // read a chunk to user buffer directly; avoid one copy
          return nread;
        } else {
          fill();  // read a chunk into the local buffer
          if( count <= 0 ) {
            return -1;
          } else {
            avail = count;
          }
        }
      }
      // copy content of the local buffer to the user buffer
      int cnt = (avail < len) ? avail : len;
      System.arraycopy(buf, pos, b, off, cnt);
      pos += cnt;
      return cnt;    
    }    
    

    FSInputChecker.readChecksumChunk()方法通常需要对读取的字节序列进行校验(默认为true),若校验不通过,可选择新的副本进行重读,如果进行了retriesLeft次重读仍然不能校验通过,则抛出异常。 readChunk()方法是一个抽象方法,FSInputChecker的子类实现它,以定义实际读取数据的逻辑。

    private int readChecksumChunk(byte b[], int off, int len)  throws IOException {
      // invalidate buffer
      count = pos = 0;
      int read = 0;
      boolean retry = true;
      int retriesLeft = numOfRetries; 
      do {
        retriesLeft--;
        try {
          read = readChunk(chunkPos, b, off, len, checksum);
          if( read > 0 ) {
            if( needChecksum() ) {
              sum.update(b, off, read);
              verifySum(chunkPos);
            }
            chunkPos += read;
          } 
          retry = false;
        } catch (ChecksumException ce) {
            if (retriesLeft == 0) {
              throw ce;
            }
            if (seekToNewSource(chunkPos)) { // 重试一个新的数据副本
              seek(chunkPos);
            } else {
              throw ce;
            }
          }
      } while (retry);
      return read;
    }
    

    ChecksumFileSystem.ChecksumFSInputChecker实现了readChunk()的逻辑。 readChunk()它读取数据块和校验数据和,不进行两者的校验。 getChecksumFilePos()方法定位到校验和文件中pos位置对应块的边界,以便读取一个数据块对应的完整校验和。

    // ChecksumFSInputChecker.readChunk()
    protected int readChunk(long pos, byte[] buf, int offset, int len,
        byte[] checksum) throws IOException {
      boolean eof = false;
      if(needChecksum()) {
        try {
          long checksumPos = getChecksumFilePos(pos); 
          if(checksumPos != sums.getPos()) {
            sums.seek(checksumPos);
          }
          sums.readFully(checksum);
        } catch (EOFException e) {
          eof = true;
        }
        len = bytesPerSum;
      }
      if(pos != datas.getPos()) {
        datas.seek(pos);
      }
      int nread = readFully(datas, buf, offset, len);
      if( eof && nread > 0) {
        throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
      }
      return nread;
    }
    

    写文件

    与文件/目录元数据信息的维护和读文件相比,写文件相对起来比较复杂,ChecksumFileSystem需要维护字节流上的数据读写和基于块的校验和关系。 一般而言,每{io.bytes.per.checksum}(默认512)个数据字节对应一个单独的校验和,CRC-32校验和的输出为4个字节。因此校验数据所带来的存储开销小于1%。

    ChecksumFSOutputSummer继承FSOutputSummer,在基本的具体文件系统的输出流上,添加数据文件和校验文件流的输出。 继承关系:OutputStream <-- FSOutputSummer <-- ChecksumFSOutputSummer

    FSOutputSummer是一个生成校验和的通用输出流,包含4个成员变量。

    abstract public class FSOutputSummer extends OutputStream {
      private Checksum sum;      // data checksum 计算校验和
      private byte buf[]; // internal buffer for storing data before it is checksumed 输出数据缓冲区
      private byte checksum[]; // internal buffer for storing checksum 校验和缓冲区
      private int count; // The number of valid bytes in the buffer. 已使用空间计数
      ...
    }
    

    FSOutputSummer逻辑非常清晰,根据提供的字节数组,每{io.bytes.per.checksum}求出一个校验和,并根据子类所实现的writeChunk()方法写出到响应的输出流中,在ChecksumFSOutputSummer中,则分别写入文件数据流和校验文件数据流。

    // ChecksumFileSystem.CheckSumFSOutputSummer 
    private static class ChecksumFSOutputSummer extends FSOutputSummer {
      private FSDataOutputStream datas;    
      private FSDataOutputStream sums;
    
      ...
      @Override
      protected void writeChunk(byte[] b, int offset, int len, byte[] checksum) throws IOException {
        datas.write(b, offset, len);
        sums.write(checksum);
      }
    }
    

    FSOutputSummer.write()方法循环调用write1()方法进行校验和计算和数据流输出。当buf的count数等于buf.length,则将数据和校验和输出到对应的流中。

    public synchronized void write(byte b[], int off, int len) throws IOException {
      ... //参数校验
      for (int n=0;n<len;n+=write1(b, off+n, len-n)) {  }
    }
    
    private int write1(byte b[], int off, int len) throws IOException {
      if(count==0 && len>=buf.length) {
        final int length = buf.length;
        sum.update(b, off, length);
        writeChecksumChunk(b, off, length, false);
        return length;
      }
    
      // copy user data to local buffer
      int bytesToCopy = buf.length-count;
      bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
      sum.update(b, off, bytesToCopy);
      System.arraycopy(b, off, buf, count, bytesToCopy);
      count += bytesToCopy;
      if (count == buf.length) { // local buffer is full
        flushBuffer();  
      } 
      return bytesToCopy;
    }
    
    private void writeChecksumChunk(byte b[], int off, int len, boolean keep) throws IOException {
      int tempChecksum = (int)sum.getValue();
      if (!keep) {
        sum.reset();
      }
      int2byte(tempChecksum, checksum); // 整数转字节数组
      writeChunk(b, off, len, checksum);
    }
    

    write1()方法是用了一个实用的技巧,若当前缓冲区的写入字节数为0(count=0)且需要写入的字节数据长度大于或等于块(buf.length)的长度,则直接进行校验和计算,避免将数据拷贝到缓冲区,然后再计算校验和,减少内存拷贝的次数。 write1()方法尽可能的写入多的数据,但一次最多写入一个块。

    ChecksumFileSystem.CheckSumFSOutputSummer提供了构造FSOutputSummer所需要的参数。 校验和采用PureJavaCrc32,校验和长度4字节,缓冲大小为512字节(默认)。

    public ChecksumFSOutputSummer(ChecksumFileSystem fs,   Path file, boolean overwrite, int bufferSize, 
                                  short replication, long blockSize, Progressable progress) throws IOException {
      super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
      int bytesPerSum = fs.getBytesPerSum();
      this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize, replication, blockSize, progress);
      int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
      this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true, sumBufferSize, replication, blockSize);
      sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
      sums.writeInt(bytesPerSum);
    }
    

    构造ChecksumFSOutputSummer时,就往校验和文件流中写入魔数CHECKSUM_VERSION("crc")和校验块长度。 FSOutputSummer抽象了大部分和数据分块、计算校验和的相关功能,ChecksumFSOutputSummer在此基础上提供了具体的文件流输出。



    由此可见,hadoop对于文件校验有一套精心的设计,文件系统和文件读写都会避免错误的产生。虽然额外的校验可能会导致性能的占用,但是一些公司经过摸索也找出了解决方案:

    ggjuchenghdfs都是存储大文件的,默认每512字节就做一个crc校验,客户端在读写文件都要做这个校验,这个对hdfs的性能消耗是比较大的,crc最开始是采用jni调用,但是jni调用都要做上下文切换,加上每512字节就做一次crc校验,所以导致jvm切换很频繁,后来修改为pure java的crc校验,性能还提高了下,如果是几百兆就做一个crc校验,那么jni调用导致的上下文切换少些,那么jni就还有优势,但是在hadoop这个应用场景明显不合适。 后来淘宝的针对hadoop的crc场景,定制了jvm,将crc指令优化为调用硬件指令,性能测试报告证明提高了hdfs性能的20%-30%。 

    到此,文件校验分析就结束了。接下来我会接着介绍压缩编码解码方面的原理。


    Charles 于2015-12-22 Phnom Penh



    版权说明:
    本文由Charles Dong原创,本人支持开源以及免费有益的传播,反对商业化谋利。
    CSDN博客:http://blog.csdn.net/mrcharles
    个人站:http://blog.xingbod.cn
    EMAIL:charles@xingbod.cn
  • 相关阅读:
    Java基础(六)判断两个对象相等:equals、hashcode、toString方法
    同时找最大最小值
    0-1背包问题
    大数相加
    单例模式(singleton pattern)
    House Robber
    Binary Tree Paths
    双向链表的插入
    工厂模式(factory pattern)
    装饰者模式(decorator pattern)
  • 原文地址:https://www.cnblogs.com/mrcharles/p/5067975.html
Copyright © 2011-2022 走看看