zoukankan      html  css  js  c++  java
  • SequenceFile文件

        SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。目前,也有不少人在该文件的基础之上提出了一些HDFS中小文件存储的解决方案,他们的基本思路就是将小文件进行合并成一个大文件,同时对这些小文件的位置信息构建索引。不过,这类解决方案还涉及到Hadoop的另一种文件格式——MapFile文件。SequenceFile文件并不保证其存储的key-value数据是按照key的某个顺序存储的,同时不支持append操作。

          在SequenceFile文件中,每一个key-value被看做是一条记录(Record),因此基于Record的压缩策略,SequenceFile文件可支持三种压缩类型(SequenceFile.CompressionType):

    NONE: 对records不进行压缩;

    RECORD: 仅压缩每一个record中的value值;

    BLOCK: 将一个block中的所有records压缩在一起;

    那么,基于这三种压缩类型,Hadoop提供了对应的三种类型的Writer:

    SequenceFile.Writer  写入时不压缩任何的key-value对(Record);

    [java] view plain copy
     
    1. public static class Writer implements java.io.Closeable {  
    2.   
    3. ...  
    4.    //初始化Writer  
    5.    void init(Path name, Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, boolean compress, CompressionCodec codec, Metadata metadata) throws IOException {  
    6.       this.conf = conf;  
    7.       this.out = out;  
    8.       this.keyClass = keyClass;  
    9.       this.valClass = valClass;  
    10.       this.compress = compress;  
    11.       this.codec = codec;  
    12.       this.metadata = metadata;  
    13.         
    14.       //创建非压缩的对象序列化器  
    15.       SerializationFactory serializationFactory = new SerializationFactory(conf);  
    16.       this.keySerializer = serializationFactory.getSerializer(keyClass);  
    17.       this.keySerializer.open(buffer);  
    18.       this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);  
    19.       this.uncompressedValSerializer.open(buffer);  
    20.         
    21.       //创建可压缩的对象序列化器  
    22.       if (this.codec != null) {  
    23.         ReflectionUtils.setConf(this.codec, this.conf);  
    24.         this.compressor = CodecPool.getCompressor(this.codec);  
    25.         this.deflateFilter = this.codec.createOutputStream(buffer, compressor);  
    26.         this.deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));  
    27.         this.compressedValSerializer = serializationFactory.getSerializer(valClass);  
    28.         this.compressedValSerializer.open(deflateOut);  
    29.       }  
    30.     }  
    31.       
    32.   
    33.   //添加一条记录(key-value,对象值需要序列化)  
    34.   public synchronized void append(Object key, Object val) throws IOException {  
    35.       if (key.getClass() != keyClass)  
    36.         throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass);  
    37.         
    38.       if (val.getClass() != valClass)  
    39.         throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass);  
    40.   
    41.       buffer.reset();  
    42.   
    43.       //序列化key(将key转化为二进制数组),并写入缓存buffer中  
    44.       keySerializer.serialize(key);  
    45.       int keyLength = buffer.getLength();  
    46.       if (keyLength < 0)  
    47.         throw new IOException("negative length keys not allowed: " + key);  
    48.   
    49.       //compress在初始化是被置为false   
    50.       if (compress) {  
    51.         deflateFilter.resetState();  
    52.         compressedValSerializer.serialize(val);  
    53.         deflateOut.flush();  
    54.         deflateFilter.finish();  
    55.       } else {  
    56.         //序列化value值(不压缩),并将其写入缓存buffer中  
    57.         uncompressedValSerializer.serialize(val);  
    58.       }  
    59.   
    60.       //将这条记录写入文件流  
    61.       checkAndWriteSync();                                // sync  
    62.       out.writeInt(buffer.getLength());                   // total record length  
    63.       out.writeInt(keyLength);                            // key portion length  
    64.       out.write(buffer.getData(), 0, buffer.getLength()); // data  
    65.     }  
    66.   
    67.     //添加一条记录(key-value,二进制值)  
    68.     public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {  
    69.       if (keyLength < 0)  
    70.         throw new IOException("negative length keys not allowed: " + keyLength);  
    71.   
    72.       int valLength = val.getSize();  
    73.   
    74.       checkAndWriteSync();  
    75.         
    76.       //直接将key-value写入文件流  
    77.       out.writeInt(keyLength+valLength);          // total record length  
    78.       out.writeInt(keyLength);                    // key portion length  
    79.       out.write(keyData, keyOffset, keyLength);   // key  
    80.       val.writeUncompressedBytes(out);            // value  
    81.     }  
    82.   
    83. ...  
    84.   
    85. }  

    SequenceFile.RecordCompressWriter写入时只压缩key-value对(Record)中的value;

    [java] view plain copy
     
    1. static class RecordCompressWriter extends Writer {  
    2. ...  
    3.   
    4.    public synchronized void append(Object key, Object val) throws IOException {  
    5.       if (key.getClass() != keyClass)  
    6.         throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass);  
    7.         
    8.       if (val.getClass() != valClass)  
    9.         throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass);  
    10.   
    11.       buffer.reset();  
    12.   
    13.       //序列化key(将key转化为二进制数组),并写入缓存buffer中  
    14.       keySerializer.serialize(key);  
    15.       int keyLength = buffer.getLength();  
    16.       if (keyLength < 0)  
    17.         throw new IOException("negative length keys not allowed: " + key);  
    18.   
    19.       //序列化value值(不压缩),并将其写入缓存buffer中  
    20.       deflateFilter.resetState();  
    21.       compressedValSerializer.serialize(val);  
    22.       deflateOut.flush();  
    23.       deflateFilter.finish();  
    24.   
    25.       //将这条记录写入文件流  
    26.       checkAndWriteSync();                                // sync  
    27.       out.writeInt(buffer.getLength());                   // total record length  
    28.       out.writeInt(keyLength);                            // key portion length  
    29.       out.write(buffer.getData(), 0, buffer.getLength()); // data  
    30.     }  
    31.   
    32.     /** 添加一条记录(key-value,二进制值,value已压缩) */  
    33.     public synchronized void appendRaw(byte[] keyData, int keyOffset,  
    34.         int keyLength, ValueBytes val) throws IOException {  
    35.   
    36.       if (keyLength < 0)  
    37.         throw new IOException("negative length keys not allowed: " + keyLength);  
    38.   
    39.       int valLength = val.getSize();  
    40.         
    41.       checkAndWriteSync();                        // sync  
    42.       out.writeInt(keyLength+valLength);          // total record length  
    43.       out.writeInt(keyLength);                    // key portion length  
    44.       out.write(keyData, keyOffset, keyLength);   // 'key' data  
    45.       val.writeCompressedBytes(out);              // 'value' data  
    46.     }  
    47.       
    48.   } // RecordCompressionWriter  
    49.   
    50.   
    51. ...  
    52. }  

    SequenceFile.BlockCompressWriter 写入时将一批key-value对(Record)压缩成一个Block;

    [java] view plain copy
     
    1. static class BlockCompressWriter extends Writer {  
    2. ...  
    3.   
    4.    void init(int compressionBlockSize) throws IOException {  
    5.       this.compressionBlockSize = compressionBlockSize;  
    6.       keySerializer.close();  
    7.       keySerializer.open(keyBuffer);  
    8.       uncompressedValSerializer.close();  
    9.       uncompressedValSerializer.open(valBuffer);  
    10.     }  
    11.       
    12.     /** Workhorse to check and write out compressed data/lengths */  
    13.     private synchronized void writeBuffer(DataOutputBuffer uncompressedDataBuffer) throws IOException {  
    14.       deflateFilter.resetState();  
    15.       buffer.reset();  
    16.       deflateOut.write(uncompressedDataBuffer.getData(), 0, uncompressedDataBuffer.getLength());  
    17.       deflateOut.flush();  
    18.       deflateFilter.finish();  
    19.         
    20.       WritableUtils.writeVInt(out, buffer.getLength());  
    21.       out.write(buffer.getData(), 0, buffer.getLength());  
    22.     }  
    23.       
    24.     /** Compress and flush contents to dfs */  
    25.     public synchronized void sync() throws IOException {  
    26.       if (noBufferedRecords > 0) {  
    27.         super.sync();  
    28.           
    29.         // No. of records  
    30.         WritableUtils.writeVInt(out, noBufferedRecords);  
    31.           
    32.         // Write 'keys' and lengths  
    33.         writeBuffer(keyLenBuffer);  
    34.         writeBuffer(keyBuffer);  
    35.           
    36.         // Write 'values' and lengths  
    37.         writeBuffer(valLenBuffer);  
    38.         writeBuffer(valBuffer);  
    39.           
    40.         // Flush the file-stream  
    41.         out.flush();  
    42.           
    43.         // Reset internal states  
    44.         keyLenBuffer.reset();  
    45.         keyBuffer.reset();  
    46.         valLenBuffer.reset();  
    47.         valBuffer.reset();  
    48.         noBufferedRecords = 0;  
    49.       }  
    50.         
    51.     }  
    52.   
    53.   
    54.    //添加一条记录(key-value,对象值需要序列化)  
    55.    public synchronized void append(Object key, Object val) throws IOException {  
    56.       if (key.getClass() != keyClass)  
    57.         throw new IOException("wrong key class: "+key+" is not "+keyClass);  
    58.         
    59.       if (val.getClass() != valClass)  
    60.         throw new IOException("wrong value class: "+val+" is not "+valClass);  
    61.   
    62.       //序列化key(将key转化为二进制数组)(未压缩),并写入缓存keyBuffer中  
    63.       int oldKeyLength = keyBuffer.getLength();  
    64.       keySerializer.serialize(key);  
    65.       int keyLength = keyBuffer.getLength() - oldKeyLength;  
    66.       if (keyLength < 0)  
    67.         throw new IOException("negative length keys not allowed: " + key);  
    68.       WritableUtils.writeVInt(keyLenBuffer, keyLength);  
    69.   
    70.       //序列化value(将value转化为二进制数组)(未压缩),并写入缓存valBuffer中  
    71.       int oldValLength = valBuffer.getLength();  
    72.       uncompressedValSerializer.serialize(val);  
    73.       int valLength = valBuffer.getLength() - oldValLength;  
    74.       WritableUtils.writeVInt(valLenBuffer, valLength);  
    75.         
    76.       // Added another key/value pair  
    77.       ++noBufferedRecords;  
    78.         
    79.       // Compress and flush?  
    80.       int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();  
    81.       //block已满,可将整个block进行压缩并写入文件流  
    82.       if (currentBlockSize >= compressionBlockSize) {  
    83.         sync();  
    84.       }  
    85.     }  
    86.       
    87.     /**添加一条记录(key-value,二进制值,value已压缩). */  
    88.     public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {  
    89.         
    90.       if (keyLength < 0)  
    91.         throw new IOException("negative length keys not allowed");  
    92.   
    93.       int valLength = val.getSize();  
    94.         
    95.       // Save key/value data in relevant buffers  
    96.       WritableUtils.writeVInt(keyLenBuffer, keyLength);  
    97.       keyBuffer.write(keyData, keyOffset, keyLength);  
    98.       WritableUtils.writeVInt(valLenBuffer, valLength);  
    99.       val.writeUncompressedBytes(valBuffer);  
    100.   
    101.       // Added another key/value pair  
    102.       ++noBufferedRecords;  
    103.   
    104.       // Compress and flush?  
    105.       int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();   
    106.       if (currentBlockSize >= compressionBlockSize) {  
    107.         sync();  
    108.       }  
    109.     }  
    110.       
    111.   } // RecordCompressionWriter  
    112.   
    113.   
    114. ...  
    115. }  

         源码中,block的大小compressionBlockSize默认值为1000000,也可通过配置参数io.seqfile.compress.blocksize来指定。

       根据三种压缩算法,共有三种类型的SequenceFile文件格式:

    1). Uncompressed SequenceFile

        

    2). Record-Compressed SequenceFile

    3). Block-Compressed SequenceFile

  • 相关阅读:
    poj 2485 Highways 最小生成树
    hdu 3415 Max Sum of MaxKsubsequence
    poj 3026 Borg Maze
    poj 2823 Sliding Window 单调队列
    poj 1258 AgriNet
    hdu 1045 Fire Net (二分图匹配)
    poj 1789 Truck History MST(最小生成树)
    fafu 1181 割点
    减肥瘦身健康秘方
    人生的问题
  • 原文地址:https://www.cnblogs.com/mfryf/p/7072446.html
Copyright © 2011-2022 走看看