zoukankan      html  css  js  c++  java
  • SequenceFile文件

        SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。目前,也有不少人在该文件的基础之上提出了一些HDFS中小文件存储的解决方案,他们的基本思路就是将小文件进行合并成一个大文件,同时对这些小文件的位置信息构建索引。不过,这类解决方案还涉及到Hadoop的另一种文件格式——MapFile文件。SequenceFile文件并不保证其存储的key-value数据是按照key的某个顺序存储的,同时不支持append操作。

          在SequenceFile文件中,每一个key-value被看做是一条记录(Record),因此基于Record的压缩策略,SequenceFile文件可支持三种压缩类型(SequenceFile.CompressionType):

    NONE: 对records不进行压缩;

    RECORD: 仅压缩每一个record中的value值;

    BLOCK: 将一个block中的所有records压缩在一起;

    那么,基于这三种压缩类型,Hadoop提供了对应的三种类型的Writer:

    SequenceFile.Writer  写入时不压缩任何的key-value对(Record);

    [java] view plain copy
     
    1. public static class Writer implements java.io.Closeable {  
    2.   
    3. ...  
    4.    //初始化Writer  
    5.    void init(Path name, Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, boolean compress, CompressionCodec codec, Metadata metadata) throws IOException {  
    6.       this.conf = conf;  
    7.       this.out = out;  
    8.       this.keyClass = keyClass;  
    9.       this.valClass = valClass;  
    10.       this.compress = compress;  
    11.       this.codec = codec;  
    12.       this.metadata = metadata;  
    13.         
    14.       //创建非压缩的对象序列化器  
    15.       SerializationFactory serializationFactory = new SerializationFactory(conf);  
    16.       this.keySerializer = serializationFactory.getSerializer(keyClass);  
    17.       this.keySerializer.open(buffer);  
    18.       this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);  
    19.       this.uncompressedValSerializer.open(buffer);  
    20.         
    21.       //创建可压缩的对象序列化器  
    22.       if (this.codec != null) {  
    23.         ReflectionUtils.setConf(this.codec, this.conf);  
    24.         this.compressor = CodecPool.getCompressor(this.codec);  
    25.         this.deflateFilter = this.codec.createOutputStream(buffer, compressor);  
    26.         this.deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));  
    27.         this.compressedValSerializer = serializationFactory.getSerializer(valClass);  
    28.         this.compressedValSerializer.open(deflateOut);  
    29.       }  
    30.     }  
    31.       
    32.   
    33.   //添加一条记录(key-value,对象值需要序列化)  
    34.   public synchronized void append(Object key, Object val) throws IOException {  
    35.       if (key.getClass() != keyClass)  
    36.         throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass);  
    37.         
    38.       if (val.getClass() != valClass)  
    39.         throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass);  
    40.   
    41.       buffer.reset();  
    42.   
    43.       //序列化key(将key转化为二进制数组),并写入缓存buffer中  
    44.       keySerializer.serialize(key);  
    45.       int keyLength = buffer.getLength();  
    46.       if (keyLength < 0)  
    47.         throw new IOException("negative length keys not allowed: " + key);  
    48.   
    49.       //compress在初始化是被置为false   
    50.       if (compress) {  
    51.         deflateFilter.resetState();  
    52.         compressedValSerializer.serialize(val);  
    53.         deflateOut.flush();  
    54.         deflateFilter.finish();  
    55.       } else {  
    56.         //序列化value值(不压缩),并将其写入缓存buffer中  
    57.         uncompressedValSerializer.serialize(val);  
    58.       }  
    59.   
    60.       //将这条记录写入文件流  
    61.       checkAndWriteSync();                                // sync  
    62.       out.writeInt(buffer.getLength());                   // total record length  
    63.       out.writeInt(keyLength);                            // key portion length  
    64.       out.write(buffer.getData(), 0, buffer.getLength()); // data  
    65.     }  
    66.   
    67.     //添加一条记录(key-value,二进制值)  
    68.     public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {  
    69.       if (keyLength < 0)  
    70.         throw new IOException("negative length keys not allowed: " + keyLength);  
    71.   
    72.       int valLength = val.getSize();  
    73.   
    74.       checkAndWriteSync();  
    75.         
    76.       //直接将key-value写入文件流  
    77.       out.writeInt(keyLength+valLength);          // total record length  
    78.       out.writeInt(keyLength);                    // key portion length  
    79.       out.write(keyData, keyOffset, keyLength);   // key  
    80.       val.writeUncompressedBytes(out);            // value  
    81.     }  
    82.   
    83. ...  
    84.   
    85. }  

    SequenceFile.RecordCompressWriter写入时只压缩key-value对(Record)中的value;

    [java] view plain copy
     
    1. static class RecordCompressWriter extends Writer {  
    2. ...  
    3.   
    4.    public synchronized void append(Object key, Object val) throws IOException {  
    5.       if (key.getClass() != keyClass)  
    6.         throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass);  
    7.         
    8.       if (val.getClass() != valClass)  
    9.         throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass);  
    10.   
    11.       buffer.reset();  
    12.   
    13.       //序列化key(将key转化为二进制数组),并写入缓存buffer中  
    14.       keySerializer.serialize(key);  
    15.       int keyLength = buffer.getLength();  
    16.       if (keyLength < 0)  
    17.         throw new IOException("negative length keys not allowed: " + key);  
    18.   
    19.       //序列化value值(不压缩),并将其写入缓存buffer中  
    20.       deflateFilter.resetState();  
    21.       compressedValSerializer.serialize(val);  
    22.       deflateOut.flush();  
    23.       deflateFilter.finish();  
    24.   
    25.       //将这条记录写入文件流  
    26.       checkAndWriteSync();                                // sync  
    27.       out.writeInt(buffer.getLength());                   // total record length  
    28.       out.writeInt(keyLength);                            // key portion length  
    29.       out.write(buffer.getData(), 0, buffer.getLength()); // data  
    30.     }  
    31.   
    32.     /** 添加一条记录(key-value,二进制值,value已压缩) */  
    33.     public synchronized void appendRaw(byte[] keyData, int keyOffset,  
    34.         int keyLength, ValueBytes val) throws IOException {  
    35.   
    36.       if (keyLength < 0)  
    37.         throw new IOException("negative length keys not allowed: " + keyLength);  
    38.   
    39.       int valLength = val.getSize();  
    40.         
    41.       checkAndWriteSync();                        // sync  
    42.       out.writeInt(keyLength+valLength);          // total record length  
    43.       out.writeInt(keyLength);                    // key portion length  
    44.       out.write(keyData, keyOffset, keyLength);   // 'key' data  
    45.       val.writeCompressedBytes(out);              // 'value' data  
    46.     }  
    47.       
    48.   } // RecordCompressionWriter  
    49.   
    50.   
    51. ...  
    52. }  

    SequenceFile.BlockCompressWriter 写入时将一批key-value对(Record)压缩成一个Block;

    [java] view plain copy
     
    1. static class BlockCompressWriter extends Writer {  
    2. ...  
    3.   
    4.    void init(int compressionBlockSize) throws IOException {  
    5.       this.compressionBlockSize = compressionBlockSize;  
    6.       keySerializer.close();  
    7.       keySerializer.open(keyBuffer);  
    8.       uncompressedValSerializer.close();  
    9.       uncompressedValSerializer.open(valBuffer);  
    10.     }  
    11.       
    12.     /** Workhorse to check and write out compressed data/lengths */  
    13.     private synchronized void writeBuffer(DataOutputBuffer uncompressedDataBuffer) throws IOException {  
    14.       deflateFilter.resetState();  
    15.       buffer.reset();  
    16.       deflateOut.write(uncompressedDataBuffer.getData(), 0, uncompressedDataBuffer.getLength());  
    17.       deflateOut.flush();  
    18.       deflateFilter.finish();  
    19.         
    20.       WritableUtils.writeVInt(out, buffer.getLength());  
    21.       out.write(buffer.getData(), 0, buffer.getLength());  
    22.     }  
    23.       
    24.     /** Compress and flush contents to dfs */  
    25.     public synchronized void sync() throws IOException {  
    26.       if (noBufferedRecords > 0) {  
    27.         super.sync();  
    28.           
    29.         // No. of records  
    30.         WritableUtils.writeVInt(out, noBufferedRecords);  
    31.           
    32.         // Write 'keys' and lengths  
    33.         writeBuffer(keyLenBuffer);  
    34.         writeBuffer(keyBuffer);  
    35.           
    36.         // Write 'values' and lengths  
    37.         writeBuffer(valLenBuffer);  
    38.         writeBuffer(valBuffer);  
    39.           
    40.         // Flush the file-stream  
    41.         out.flush();  
    42.           
    43.         // Reset internal states  
    44.         keyLenBuffer.reset();  
    45.         keyBuffer.reset();  
    46.         valLenBuffer.reset();  
    47.         valBuffer.reset();  
    48.         noBufferedRecords = 0;  
    49.       }  
    50.         
    51.     }  
    52.   
    53.   
    54.    //添加一条记录(key-value,对象值需要序列化)  
    55.    public synchronized void append(Object key, Object val) throws IOException {  
    56.       if (key.getClass() != keyClass)  
    57.         throw new IOException("wrong key class: "+key+" is not "+keyClass);  
    58.         
    59.       if (val.getClass() != valClass)  
    60.         throw new IOException("wrong value class: "+val+" is not "+valClass);  
    61.   
    62.       //序列化key(将key转化为二进制数组)(未压缩),并写入缓存keyBuffer中  
    63.       int oldKeyLength = keyBuffer.getLength();  
    64.       keySerializer.serialize(key);  
    65.       int keyLength = keyBuffer.getLength() - oldKeyLength;  
    66.       if (keyLength < 0)  
    67.         throw new IOException("negative length keys not allowed: " + key);  
    68.       WritableUtils.writeVInt(keyLenBuffer, keyLength);  
    69.   
    70.       //序列化value(将value转化为二进制数组)(未压缩),并写入缓存valBuffer中  
    71.       int oldValLength = valBuffer.getLength();  
    72.       uncompressedValSerializer.serialize(val);  
    73.       int valLength = valBuffer.getLength() - oldValLength;  
    74.       WritableUtils.writeVInt(valLenBuffer, valLength);  
    75.         
    76.       // Added another key/value pair  
    77.       ++noBufferedRecords;  
    78.         
    79.       // Compress and flush?  
    80.       int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();  
    81.       //block已满,可将整个block进行压缩并写入文件流  
    82.       if (currentBlockSize >= compressionBlockSize) {  
    83.         sync();  
    84.       }  
    85.     }  
    86.       
    87.     /**添加一条记录(key-value,二进制值,value已压缩). */  
    88.     public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {  
    89.         
    90.       if (keyLength < 0)  
    91.         throw new IOException("negative length keys not allowed");  
    92.   
    93.       int valLength = val.getSize();  
    94.         
    95.       // Save key/value data in relevant buffers  
    96.       WritableUtils.writeVInt(keyLenBuffer, keyLength);  
    97.       keyBuffer.write(keyData, keyOffset, keyLength);  
    98.       WritableUtils.writeVInt(valLenBuffer, valLength);  
    99.       val.writeUncompressedBytes(valBuffer);  
    100.   
    101.       // Added another key/value pair  
    102.       ++noBufferedRecords;  
    103.   
    104.       // Compress and flush?  
    105.       int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();   
    106.       if (currentBlockSize >= compressionBlockSize) {  
    107.         sync();  
    108.       }  
    109.     }  
    110.       
    111.   } // RecordCompressionWriter  
    112.   
    113.   
    114. ...  
    115. }  

         源码中,block的大小compressionBlockSize默认值为1000000,也可通过配置参数io.seqfile.compress.blocksize来指定。

       根据三种压缩算法,共有三种类型的SequenceFile文件格式:

    1). Uncompressed SequenceFile

        

    2). Record-Compressed SequenceFile

    3). Block-Compressed SequenceFile

  • 相关阅读:
    动态为datagrid增加控件
    对DataGrid中的ListBox设置值
    Enhancing Inserting Data through the DataGrid Footer, Part 2
    Setting Focus to a TextBox in an EditItemTemplate
    Enhancing Inserting Data through the DataGrid Footer
    CRM选择行变色
    【转】on delete cascade
    VS如何连接2种类型的数据库
    SQL Server 已配置为允许远程连接 解决办法
    【转】sql server 重命名表字段
  • 原文地址:https://www.cnblogs.com/mfryf/p/7072446.html
Copyright © 2011-2022 走看看