zoukankan      html  css  js  c++  java
  • hadoop2.x之IO:MapReduce压缩

    前面我们说到了hadoop的压缩,在Hadoop所运行的数据一般都是很大的,输入的数据很大,输出的数据也很大。因此我们有必要对map和Reduce的数据进行压缩存储。

    如果我们想对Reduce进行压缩,有两种方法,一种是配置使用Configuration配置。另一种是还是用FileOutputFormat类对输出进行设置。

    1. 对Reduce进行压缩(使用Configuration)

    使用Configuration,我们需要将mapred.output.compress设置为true。设置mapred.output.compression.codec为我们想设置的codec的类名。例如:

    Job程序:MaxTemperatureWithCompression.java

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class MaxTemperatureWithCompression {
    	
    	public static void main(String[] args) throws Exception {
    		if (args.length != 2) {
    			System.err.println("Usage: MaxTemperature <input path> <output path>");
    			System.exit(-1);
    		}
    		Configuration conf = new Configuration();
    		// 重点是这两句
    		conf.setBoolean("mapred.output.compress", true);
    		conf.set("mapred.output.compression.codec", GzipCodec.class.getName());
    		conf.set("mapred.jar", "MaxTemperature.jar");
    		Job job = Job.getInstance(conf);
    		
    		job.setJarByClass(MaxTemperatureWithCompression.class);
    		job.setJobName("Max temperature");
    		
    		FileInputFormat.addInputPath(job, new Path(args[0]));
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		
    		job.setMapperClass(MaxTemperatureMapper.class);
    		job.setReducerClass(MaxTemperatureReducer.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    		
    	}
    
    }
    

    Map程序:

    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class MaxTemperatureMapper extends
    		Mapper<LongWritable, Text, Text, IntWritable> {
    	private static final int MISSING = 9999;
    
    	@Override
    	public void map(LongWritable key, Text value, Context context)
    			throws IOException, InterruptedException {
    		String line = value.toString();
    		String year = line.substring(15, 23);
    		int airTemperature;
    		if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
    										// signs
    			airTemperature = Integer.parseInt(line.substring(88, 92));
    		} else {
    			airTemperature = Integer.parseInt(line.substring(87, 92));
    		}
    		String quality = line.substring(92, 93);
    		if (airTemperature != MISSING && quality.matches("[01459]")) {
    			context.write(new Text(year), new IntWritable(airTemperature));
    		}
    	}
    }
    
    

    Reduce程序

    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MaxTemperatureReducer extends
    		Reducer<Text, IntWritable, Text, IntWritable> {
    	@Override
    	public void reduce(Text key, Iterable<IntWritable> values, Context context)
    			throws IOException, InterruptedException {
    		int maxValue = Integer.MIN_VALUE;
    		for (IntWritable value : values) {
    			maxValue = Math.max(maxValue, value.get());
    		}
    		context.write(key, new IntWritable(maxValue));
    	}
    }
    
    

    编译打包运行...

    [grid@tiny01 myclass]$ hadoop fs -ls /
    Found 5 items
    -rw-r--r--   1 grid supergroup      49252 2017-07-29 00:07 /data.txt
    -rw-r--r--   1 grid supergroup 4848295796 2017-07-01 00:40 /input
    drwx------   - grid supergroup          0 2017-07-01 00:42 /tmp
    drwxr-xr-x   - grid supergroup          0 2017-07-01 00:42 /user
    [grid@tiny01 myclass]$ hadoop jar MaxTemperature.jar MaxTemperatureWithCompression /data.txt /out
    [grid@tiny01 myclass]$ hadoop fs -cat /out/part-r-00000.gz |gunzip
    20160622        380
    20160623        310
    

    有关Reduce结果压缩的属性:

    属性名称 类型 默认值 描述
    mapred.output.compress boolean false 压缩输出
    mapred.output.compression.codec String org.apache.hadoop.io.compress.DefaultCodec reduce输出所用的压缩codec
    mapred.output.compression String RECORD SqeuenceFile的输出可以使用的压缩类型:NONE,RECORD,BLOCK

    2. 对Reduce进行压缩(使用FileOutputFormat)
    我们只修改Job类:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class MaxTemperatureWithCompression2 {
    	
    	public static void main(String[] args) throws Exception {
    		if (args.length != 2) {
    			System.err.println("Usage: MaxTemperature <input path> <output path>");
    			System.exit(-1);
    		}
    		Configuration conf = new Configuration();
    		conf.set("mapred.jar", "MaxTemperature2.jar");
    		Job job = Job.getInstance(conf);
    		
    		job.setJarByClass(MaxTemperatureWithCompression2.class);
    		job.setJobName("Max temperature");
    		
    		FileInputFormat.addInputPath(job, new Path(args[0]));
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		// 添加这两句
    		FileOutputFormat.setCompressOutput(job, true);
    		FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    		
    		job.setMapperClass(MaxTemperatureMapper.class);
    		job.setReducerClass(MaxTemperatureReducer.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    		
    	}
    
    }
    

    运行:

     [grid@tiny01 myclass]$ hadoop jar MaxTemperature2.jar MaxTemperatureWithCompression2 /data.txt /out2
    [grid@tiny01 myclass]$ hadoop fs -cat /out2/part-r-00000.gz |gunzip
    20160622        380
    20160623        310
    

    是一样的.

    3.对map任务进行压缩

    因为map和reduce往往在不同的节点上,因此需要网络传输。如果map任务的输出使用一些能够快速压缩的算法,例如LZO,LZ4等就会使Hadoop的性能提升。map任务的压缩属性:

    属性名称 类型 默认值 描述
    mapred.compress.map.output boolean false 对map任务输出进行压缩
    mapred.map.output.compression.codec String org.apache.hadoop.io.compress.DefaultCodec map输出压缩所用的codec

    我们还可以使用另一种方式,使用JobConf(Configuration的子类)对象设置相关9配置:

    JobConf conf = new JobConf();
    conf.setCompressMapOutput(true);
    conf.setMapOutputCompressorClass(GzipCodec.class)
    conf.set("mapred.jar", "classname.jar");
    Job job = Job.getInstance(conf);
    

    4.参考资料
    [1] Hadoop:The Definitive Guide,Third Edition, by Tom White. Copyright 2013 Tom White,978-1-449-31152-0

  • 相关阅读:
    jvm2-垃圾回收
    Elasticsearch脑裂问题详细分析以及解决方案
    ThreadLocal原理(基于jdk1.8)
    seata-分布式事务-学习笔记
    Java中的数组
    HAProxy 详细配置说明
    (基础)--- 约数
    (基础)--- Trie树
    Oracle 数据类型对比 不同数据类型对数据空间占用及查询效率影响
    python F score打分
  • 原文地址:https://www.cnblogs.com/erygreat/p/7352462.html
Copyright © 2011-2022 走看看