zoukankan      html  css  js  c++  java
  • Hadoop: the definitive guide 第三版 拾遗 第四章

    第四章中提到了通过CompressionCodec对streams进行压缩和解压缩,并提供了示例程序:

    输入:标准输入流

    输出:压缩后的标准输出流

    // cc StreamCompressor A program to compress data read from standard input and write it to standard output
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionOutputStream;
    import org.apache.hadoop.util.ReflectionUtils;
    
    // vv StreamCompressor
    public class StreamCompressor {
    
      public static void main(String[] args) throws Exception {
        String codecClassname = args[0];
        Class<?> codecClass = Class.forName(codecClassname);
        Configuration conf = new Configuration();
        CompressionCodec codec = (CompressionCodec)
          ReflectionUtils.newInstance(codecClass, conf);
        
        CompressionOutputStream out = codec.createOutputStream(System.out);
        IOUtils.copyBytes(System.in, out, 4096, false);
        out.finish();
      }
    }
    // ^^ StreamCompressor
    

    该实例程序通过CompressionCodec的createOutputStream(OutputStream out)方法获得CompressionOutputStream对象。

    第12行因参数固定,可以直接写成String codecClassname="org.apache.hadoop.io.compress.GzipCodec";

    即从命令行接受一个CompressionCodec实现类的参数,然后通过ReflectionUtils把实例化这个类,调用CompressionCodec的接口方法对标准输出流进行封装,封装成一个压缩流,通过IOUtils类的copyBytes方法把标准输入流拷贝到压缩流中,最后调用CompressionCodec的finish方法,完成压缩。

    在hadoop集群的hadoop根目录下使用如下命令验证该程序(通过linux的gunzip完成解压缩):

    echo "Hello world" |  hadoop jar xxxx.jar com.tht.hadoopIO.StreamCompressor  org.apache.hadoop.io.compress.GzipCodec | gunzip -

    下面对改程序做进一步修改:

    一、更改输出路径:即标准输入流压缩后的存放位置。

    输入:标准输入流

    输出:压缩后的文件存放到HDFS上

    示例代码如下:

    package com.tht.hadoopIO;
    
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionOutputStream;
    import org.apache.hadoop.util.ReflectionUtils;
    
    //vv StreamCompressor
    public class StreamCompressor {
    
    	public static void main(String[] args) throws Exception {
    		String codecClassname = "org.apache.hadoop.io.compress.GzipCodec";
    		String outputUri = "hdfs://master:9000/in/test.gz";
    		
    		Class<?> codecClass = Class.forName(codecClassname);
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(URI.create(outputUri), conf);
    		
    		CompressionCodec codec = (CompressionCodec) ReflectionUtils
    				.newInstance(codecClass, conf);	
    		CompressionOutputStream out = codec.createOutputStream(fs.create(new Path(outputUri)));
    		IOUtils.copyBytes(System.in, out, 4096, false);
    		out.finish();
    	}
    }
    // ^^ StreamCompressor


    当然,在此路径outputUri:,,,/test.gz是指压缩后的文件存放位置和文件名及扩展名,如果改为...../test.txt.gz则指以.txt格式的压缩文件,后缀名是gz。

    二、更改输入文件,即将输入文件路径定为HDFS上的文件。

    输入:HDFS上存放文件

    输出:压缩后的文件存放至HDFS上

    示例代码如下:

    import java.io.InputStream;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionOutputStream;
    import org.apache.hadoop.util.ReflectionUtils;
    
    //vv StreamCompressor
    public class StreamCompressor {
    
    	public static void main(String[] args) throws Exception {
    		String codecClassname = "org.apache.hadoop.io.compress.GzipCodec";
    		String uri = "hdfs://master:9000/in/test.txt";
    		String outputUri = "hdfs://master:9000/in/test.txt.gz";
    		
    		Class<?> codecClass = Class.forName(codecClassname);
    		Configuration conf = new Configuration();
    		FileSystem fs1 = FileSystem.get(URI.create(uri), conf);
    		FileSystem fs2 = FileSystem.get(URI.create(outputUri), conf);
    		
    		CompressionCodec codec = (CompressionCodec) ReflectionUtils
    				.newInstance(codecClass, conf);
    
    		InputStream in =fs1.open(new Path(uri));	
    		CompressionOutputStream out = codec.createOutputStream(fs2.create(new Path(outputUri)));
    		
    		IOUtils.copyBytes(in, out, 4096, false);
    		in.close();
    		out.close();
    	}
    }
    // ^^ StreamCompressor


    当然了,输入输出都可以以参数形式存在。即String uri = arg[0];String outputUri =arg[1];则在执行时须加入两个路径参数。




  • 相关阅读:
    PDA智能程序访问WebService,报告“未能建立与网络的连接”
    VS2008中开发智能设备程序的一些总结收藏
    Error: The INF file contains Unicode characters that could not be converted correctly
    在vs2008工程中制作cab包
    linux专题三之如何悄悄破解root密码(以redhat7.2x64为例)
    linux专题一之文件描述符、重定向、管道符、tee命令
    linux的计划
    如何安装RHEL7.2x64 即红帽7.2虚拟机?
    快速排序及查找第K个大的数。
    来来来,做道题,一起防老年痴呆
  • 原文地址:https://www.cnblogs.com/javawebsoa/p/3249300.html
Copyright © 2011-2022 走看看