zoukankan      html  css  js  c++  java
  • 【hadoop】——HDFS解压缩实现

    转载请注明出处:http://www.cnblogs.com/zhengrunjian/p/4527220.html 

    所有源码在github上,https://github.com/lastsweetop/styhadoop

    1简介

    codec其实就是coder和decoder两个单词的词头组成的缩略词。CompressionCodec定义了压缩和解压接口,我们这里讲的codec就是实现了CompressionCodec接口的一些压缩格式的类,下面是这些类的列表:

    2使用CompressionCodes解压缩

    CompressionCodec有两个方法可以方便的压缩和解压。
    压缩:通过createOutputStream(OutputStream out)方法获得CompressionOutputStream对象
    解压:通过createInputStream(InputStream in)方法获得CompressionInputStream对象
    压缩的示例代码
    [java] view plaincopy
     
    1. package com.sweetop.styhadoop;  
    2.   
    3. import org.apache.hadoop.conf.Configuration;  
    4. import org.apache.hadoop.io.IOUtils;  
    5. import org.apache.hadoop.io.compress.CompressionCodec;  
    6. import org.apache.hadoop.io.compress.CompressionOutputStream;  
    7. import org.apache.hadoop.util.ReflectionUtils;  
    8.   
    9. /** 
    10.  * Created with IntelliJ IDEA. 
    11.  * User: lastsweetop 
    12.  * Date: 13-6-25 
    13.  * Time: 下午10:09 
    14.  * To change this template use File | Settings | File Templates. 
    15.  */  
    16. public class StreamCompressor {  
    17.     public static void main(String[] args) throws Exception {  
    18.         String codecClassName = args[0];  
    19.         Class<?> codecClass = Class.forName(codecClassName);  
    20.         Configuration conf = new Configuration();  
    21.         CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);  
    22.   
    23.         CompressionOutputStream out = codec.createOutputStream(System.out);  
    24.         IOUtils.copyBytes(System.in, out, 4096, false);  
    25.         out.finish();  
    26.     }  
    27. }  
    从命令行接受一个CompressionCodec实现类的参数,然后通过ReflectionUtils把实例化这个类,调用CompressionCodec的接口方法对标准输出流进行封装,封装成一个压缩流,通过IOUtils类的copyBytes方法把标准输入流拷贝到压缩流中,最后调用CompressionCodec的finish方法,完成压缩。
    再来看下命令行:
    [plain] view plaincopy
     
    1. echo "Hello lastsweetop" | ~/hadoop/bin/hadoop com.sweetop.styhadoop.StreamCompressor  org.apache.hadoop.io.compress.GzipCodec | gunzip -  
    使用GzipCodec类来压缩“Hello lastsweetop”,然后再通过gunzip工具解压。
    我们来看一下输出:
    [plain] view plaincopy
     
    1. [exec] 13/06/26 20:01:53 INFO util.NativeCodeLoader: Loaded the native-hadoop library  
    2.     [exec] 13/06/26 20:01:53 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library  
    3.     [exec] Hello lastsweetop  

    3使用CompressionCodecFactory解压缩

    如果你想读取一个被压缩的文件的话,首先你得先通过扩展名判断该用哪种codec,可以看下【hadoop】——压缩工具比较中得对应关系。
    当然有更简便得办法,CompressionCodecFactory已经帮你把这件事做了,通过传入一个Path调用它得getCodec方法,即可获得相应得codec。我们来看下代码
    [java] view plaincopy
     
    1. package com.sweetop.styhadoop;  
    2.   
    3. import org.apache.hadoop.conf.Configuration;  
    4. import org.apache.hadoop.fs.FileSystem;  
    5. import org.apache.hadoop.fs.Path;  
    6. import org.apache.hadoop.io.IOUtils;  
    7. import org.apache.hadoop.io.compress.CompressionCodec;  
    8. import org.apache.hadoop.io.compress.CompressionCodecFactory;  
    9.   
    10. import java.io.IOException;  
    11. import java.io.InputStream;  
    12. import java.io.OutputStream;  
    13. import java.net.URI;  
    14.   
    15. /** 
    16.  * Created with IntelliJ IDEA. 
    17.  * User: lastsweetop 
    18.  * Date: 13-6-26 
    19.  * Time: 下午10:03 
    20.  * To change this template use File | Settings | File Templates. 
    21.  */  
    22. public class FileDecompressor {  
    23.     public static void main(String[] args) throws Exception {  
    24.         String uri = args[0];  
    25.         Configuration conf = new Configuration();  
    26.         FileSystem fs = FileSystem.get(URI.create(uri), conf);  
    27.   
    28.         Path inputPath = new Path(uri);  
    29.         CompressionCodecFactory factory = new CompressionCodecFactory(conf);  
    30.         CompressionCodec codec = factory.getCodec(inputPath);  
    31.         if (codec == null) {  
    32.             System.out.println("No codec found for " + uri);  
    33.             System.exit(1);  
    34.         }  
    35.         String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());  
    36.   
    37.         InputStream in = null;  
    38.         OutputStream out = null;  
    39.   
    40.         try {  
    41.             in = codec.createInputStream(fs.open(inputPath));  
    42.             out = fs.create(new Path(outputUri));  
    43.             IOUtils.copyBytes(in,out,conf);  
    44.         } finally {  
    45.             IOUtils.closeStream(in);  
    46.             IOUtils.closeStream(out);  
    47.         }  
    48.     }  
    49. }  

    注意看下removeSuffix方法,这是一个静态方法,它可以将文件的后缀去掉,然后我们将这个路径做为解压的输出路径。CompressionCodecFactory能找到的codec也是有限的,默认只有三种org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DefaultCodec,如果想添加其他的codec你需要更改io.compression.codecs属性,并注册codec。
     

    4原生库

    现在越来越多原生库的概念,hdfs的codec也不例外,原生库可以极大的提升性能比如gzip的原生库解压提高50%,压缩提高10%,但不是所有codec都有原生库的,而一些codec只有原生库。我们来看下列表:
    linux下,hadoop以前提前编译好了32位的原生库和64位的原生库,我们看下:
    [plain] view plaincopy
     
    1. [hadoop@namenode native]$pwd  
    2. /home/hadoop/hadoop/lib/native  
    3. [hadoop@namenode native]$ls -ls  
    4. total 8  
    5. 4 drwxrwxrwx 2 root root 4096 Nov 14  2012 Linux-amd64-64  
    6. 4 drwxrwxrwx 2 root root 4096 Nov 14  2012 Linux-i386-32  
    如果是其他平台的话,你就需要自己编译了,详细步骤请看这里http://wiki.apache.org/hadoop/NativeHadoop
    java原生库的路径可以通过java.library.path指定,在bin目录下,hadoop的启动脚本已经指定,如果你不用这个脚本,那么你就需要在你的程序中指定了,hadoop脚本中指定原生库路径的片段:
    [plain] view plaincopy
     
    1. if [ -d "${HADOOP_HOME}/build/native" -o -d "${HADOOP_HOME}/lib/native" -o -e "${HADOOP_PREFIX}/lib/libhadoop.a" ]; then  
    2.   
    3.   if [ -d "$HADOOP_HOME/build/native" ]; then  
    4.     JAVA_LIBRARY_PATH=${HADOOP_HOME}/build/native/${JAVA_PLATFORM}/lib  
    5.   fi  
    6.   
    7.   if [ -d "${HADOOP_HOME}/lib/native" ]; then  
    8.     if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then  
    9.       JAVA_LIBRARY_PATH=${JAVA_LIBRARY_PATH}:${HADOOP_HOME}/lib/native/${JAVA_PLATFORM}  
    10.     else  
    11.       JAVA_LIBRARY_PATH=${HADOOP_HOME}/lib/native/${JAVA_PLATFORM}  
    12.     fi  
    13.   fi  
    14.   
    15.   if [ -e "${HADOOP_PREFIX}/lib/libhadoop.a" ]; then  
    16.     JAVA_LIBRARY_PATH=${HADOOP_PREFIX}/lib  
    17.   fi  
    18. fi  
    hadoop会去查找对应的原生库,并且自动加载,你不需要关心这些设置。但某些时候你不想使用原生库,比如调试一些bug的时候,那么可以通过hadoop.native.lib设置为false来实现。
    如果你用原生库做大量的压缩和解压的话可以考虑用CodecPool,有点像连接池,这样你就无需频繁的去创建codec对象。
    [java] view plaincopy
     
    1. package com.sweetop.styhadoop;  
    2.   
    3. import org.apache.hadoop.conf.Configuration;  
    4. import org.apache.hadoop.io.IOUtils;  
    5. import org.apache.hadoop.io.compress.CodecPool;  
    6. import org.apache.hadoop.io.compress.CompressionCodec;  
    7. import org.apache.hadoop.io.compress.CompressionOutputStream;  
    8. import org.apache.hadoop.io.compress.Compressor;  
    9. import org.apache.hadoop.util.ReflectionUtils;  
    10.   
    11. /** 
    12.  * Created with IntelliJ IDEA. 
    13.  * User: lastsweetop 
    14.  * Date: 13-6-27 
    15.  * Time: 上午11:53 
    16.  * To change this template use File | Settings | File Templates. 
    17.  */  
    18. public class PooledStreamCompressor {  
    19.     public static void main(String[] args) throws Exception {  
    20.         String codecClassName = args[0];  
    21.         Class<?> codecClass = Class.forName(codecClassName);  
    22.         Configuration conf = new Configuration();  
    23.         CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);  
    24.         Compressor compressor = null;  
    25.         try {  
    26.             compressor = CodecPool.getCompressor(codec);  
    27.             CompressionOutputStream out = codec.createOutputStream(System.out, compressor);  
    28.             IOUtils.copyBytes(System.in, out, 4096, false);  
    29.             out.finish();  
    30.         } finally {  
    31.             CodecPool.returnCompressor(compressor);  
    32.         }  
    33.     }  
    34. }  
    代码比较容易理解,通过CodecPool的getCompressor方法获得Compressor对象,该方法需要传入一个codec,然后Compressor对象在createOutputStream中使用,使用完毕后再通过returnCompressor放回去。
    输出结果如下:
    [plain] view plaincopy
     
    1.     [exec] 13/06/27 12:00:06 INFO util.NativeCodeLoader: Loaded the native-hadoop library  
    2.     [exec] 13/06/27 12:00:06 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library  
    3.     [exec] 13/06/27 12:00:06 INFO compress.CodecPool: Got brand-new compressor  
    4.     [exec] Hello lastsweetop    

      转载请注明出处:http://blog.csdn.net/lastsweetop/article/details/9173061

  • 相关阅读:
    appium---模拟点击事件
    python发送邮件(smtplib)
    postman---postman提示 Could not get any response
    postman---postman导出python脚本
    postman---postman生成测试报告
    python读写Excel方法(xlwt和xlrd)
    java求两个集合的交集和并集,比较器
    集合类型操作
    Random类和Math.random()方法
    final修饰和StringBuffer的几个案例(拼接,反转,对称操作)
  • 原文地址:https://www.cnblogs.com/zhengrunjian/p/4527220.html
Copyright © 2011-2022 走看看