zoukankan      html  css  js  c++  java
  • MapReduce-排序(全部排序、辅助排序)

    排序

    排序是MapReduce的核心技术。

    1.准备

    示例:按照气温字段对天气数据集排序。由于气温字段是有符号的整数,所以不能将该字段视为Text对象并以字典顺序排序。反之,用顺序文件存储数据,其IntWritable键代表气温(并且正确排序),其Text值就是数据行。
    MapReduce作业只包含map任务,它过滤输入数据并移除空数据行的记录。各个map创建并输出一个块压缩的顺序文件。
    代码如下

    package com.zhen.mapreduce.sort.preprocessor;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.SequenceFile.CompressionType;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * @author FengZhen
     * @date 2018年9月9日
     * 过滤掉无用数据并使用顺序文件存储数据
     */
    public class SortDataPreprocessor extends Configured implements Tool{
    
    	static class CleanerMapper extends Mapper<LongWritable, Text, IntWritable, Text>{
    		private RecordParser recordParser = new RecordParser();
    		@Override
    		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
    				throws IOException, InterruptedException {
    			recordParser.parse(value.toString());
    			if (recordParser.isValidTemperature()) {
    				context.write(new IntWritable(recordParser.getTemperature()), new Text(recordParser.getCity()));
    			}
    		}
    	}
    	
    	public int run(String[] args) throws Exception {
    		
    		Job job = Job.getInstance(getConf());
    		job.setJobName("SortDataPreprocessor");
    		job.setJarByClass(SortDataPreprocessor.class);
    		
    		job.setMapperClass(CleanerMapper.class);
    		job.setOutputKeyClass(IntWritable.class);
    		job.setOutputValueClass(Text.class);
    		job.setNumReduceTasks(0);
    		job.setOutputFormatClass(SequenceFileOutputFormat.class);
    		
    		FileInputFormat.setInputPaths(job, new Path(args[0]));
    		
    		SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));
    		//是否被压缩都会被输出
    		SequenceFileOutputFormat.setCompressOutput(job, true);
    		SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    		SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
    	
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    
    	public static void main(String[] args) throws Exception {
    		String[] params = new String[]{"hdfs://fz/user/hdfs/MapReduce/data/sort/preprocessor/input","hdfs://fz/user/hdfs/MapReduce/data/sort/preprocessor/output"};
    		int exitCode = ToolRunner.run(new SortDataPreprocessor(), params);
    		System.exit(exitCode);
    	}
    	
    }
    

      

    package com.zhen.mapreduce.sort.preprocessor;
    
    import java.io.Serializable;
    
    /**
     * @author FengZhen
     * @date 2018年9月9日
     * 解析MapReduce中map的数据
     */
    public class RecordParser implements Serializable{
    
    	private static final long serialVersionUID = 1L;
    
    	/**
    	 * 城市
    	 */
    	private String city;
    	/**
    	 * 气温
    	 */
    	private Integer temperature;
    	
    	/**
    	 * 解析
    	 * @param value
    	 */
    	public void parse(String value) {
    		String[] values = value.split(",");
    		if (values.length >= 2) {
    			city = values[0];
    			temperature = Integer.valueOf(values[1]);
    		}
    	}
    	
    	/**
    	 * 校验是否合格
    	 * @return
    	 */
    	public boolean isValidTemperature() {
    		return null != temperature;
    	}
    	
    	
    	public String getCity() {
    		return city;
    	}
    	public void setCity(String city) {
    		this.city = city;
    	}
    	public int getTemperature() {
    		return temperature;
    	}
    	public void setTemperature(Integer temperature) {
    		this.temperature = temperature;
    	}
    	
    }
    

      

    打jar包上传至服务器执行

    scp /Users/FengZhen/Desktop/Hadoop/file/Sort.jar root@192.168.1.124:/usr/local/test/mr
    hadoop jar Sort.jar com.zhen.mapreduce.sort.SortDataPreprocessor

    2.部分排序

    当有多个reduce任务时,产生多个已排序的输出文件。但是如何将这些小文件合并成一个有序的文件却并非易事。

    3.全排序

    如何使用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区(a single partition)。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构的优势。
    事实上仍有替代方案:首先,创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要的思路是使用一个partitioner来描述输出的全局排序。
    示例:以气温排序为例
    给定一个partitioner,四个分区,第一个分区的温度范围在0-10,第二个在11-20,第三个在21-30,第四个在31-40.
    这样可以保证第i个分区的键小于第i+1个分区的键,保证了完全有序,但是会出现数据分布不均的情况。
    获得气温分布信息意味着可以建立一系列分布非常均匀的分区。但由于该操作需要遍历整个数据集,因此并不实用。通过对键空间进行采样,就可较为均匀地划分数据集。采样的核心思想是只查看一小部分键,获得键的近似分布,并由此构建分区。Hadoop已经内置了若干采样器。
    InputSampler类实现了Sampler接口,该接口的唯一成员方法(getSampler)有两个输入参数(一个InputFormat对象和一个Job对象),返回一系列样本键。
    代码如下

    package com.zhen.mapreduce.sort.totalPartitioner;
    
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.SequenceFile.CompressionType;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
    import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * @author FengZhen
     * @date 2018年9月9日
     * 根据分区全排序
     */
    public class SortByTemperatureUsingTotalOrderPartitioner extends Configured implements Tool{
    
    	public int run(String[] args) throws Exception {
    		Job job = Job.getInstance(getConf());
    		job.setJobName("SortByTemperatureUsingTotalOrderPartitioner");
    		job.setJarByClass(SortByTemperatureUsingTotalOrderPartitioner.class);
    		
    		job.setInputFormatClass(SequenceFileInputFormat.class);
    		job.setOutputFormatClass(SequenceFileOutputFormat.class);
    		
    		job.setOutputKeyClass(IntWritable.class);
    		job.setOutputFormatClass(SequenceFileOutputFormat.class);
    		
    		SequenceFileInputFormat.setInputPaths(job, new Path(args[0]));
    		SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));
    		//是否被压缩都会被输出
    		SequenceFileOutputFormat.setCompressOutput(job, true);
    		SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    		SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
    		
    		job.setPartitionerClass(TotalOrderPartitioner.class);
    		/**
    		 * 采样率设为 0.1
    		 * 最大样本数 10000
    		 * 最大分区数 10
    		 * 这也是InputSampler作为应用程序运行时的默认设置
    		 * 只要任意一个限制条件满足,即停止采样。
    		 */
    		InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler(0.1, 10000, 10);
    		InputSampler.writePartitionFile(job, sampler);
    		
    		//为了和集群上运行的其他任务共享分区文件,InputSampler需要将其所写的分区文件加到分布式缓存中。
    		Configuration conf = job.getConfiguration();
    		String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
    		URI partitionUri = new URI(partitionFile + "#" + TotalOrderPartitioner.DEFAULT_PATH);
    		job.addCacheFile(partitionUri);
    		job.createSymlink();
    		
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    
    	public static void main(String[] args) throws Exception {
    		String[] params = new String[]{"hdfs://fz/user/hdfs/MapReduce/data/sort/preprocessor/output","hdfs://fz/user/hdfs/MapReduce/data/sort/SortByTemperatureUsingTotalOrderPartitioner/output"};
    		int exitCode = ToolRunner.run(new SortByTemperatureUsingTotalOrderPartitioner(), params);
    		System.exit(exitCode);
    	}
    	
    }
    

      

    4.辅助排序

    MapReduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。
    示例:键升序,键相同的值升序
    代码如下

    package com.zhen.mapreduce.sort.secondarySort;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    /**
     * @author FengZhen
     * @date 2018年9月9日
     * 对键排序后的值排序
     */
    public class MaxTemperatureUsingSecondarySort extends Configured implements Tool{
    
    	static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable>{
    		
    		private RecordParser recordParser = new RecordParser();
    		
    		@Override
    		protected void map(LongWritable key, Text value,
    				Mapper<LongWritable, Text, IntPair, NullWritable>.Context context)
    				throws IOException, InterruptedException {
    			recordParser.parse(value.toString());
    			if (recordParser.isValidTemperature()) {
    				context.write(new IntPair(recordParser.getYear(), recordParser.getTemperature()), NullWritable.get());
    			}
    		}
    	}
    	
    	static class MaxTemperatureReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable>{
    		@Override
    		protected void reduce(IntPair key, Iterable<NullWritable> values,
    				Reducer<IntPair, NullWritable, IntPair, NullWritable>.Context context)
    				throws IOException, InterruptedException {
    				context.write(key, NullWritable.get());
    		}
    	}
    	
    	/**
    	 * 创建一个自定义的partitioner以按照组合键的守字段(年份)进行分区
    	 * @author FengZhen
    	 *
    	 */
    	public static class FirstPartitioner extends Partitioner<IntWritable, IntWritable>{
    		@Override
    		public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
    			return Math.abs(key.get() * 127) % numPartitions;
    		}
    	}
    	
    	/**
    	 * 按照年份(升序)和气温(降序)排列键
    	 * @author FengZhen
    	 *
    	 */
    	public static class KeyComparator extends WritableComparator{
    		public KeyComparator() {
    			super(IntPair.class, true);
    		}
    		@Override
    		public int compare(WritableComparable a, WritableComparable b) {
    			IntPair ip1 = (IntPair) a;
    			IntPair ip2 = (IntPair) b;
    			int cmp = IntPair.compare(ip1.getFirstKey(), ip2.getFirstKey());
    			if (cmp != 0) {
    				return cmp;
    			}
    			return -IntPair.compare(ip1.getSecondKey(), ip2.getSecondKey());
    		}
    	}
    	
    	/**
    	 * 按年份对键进行分组
    	 * @author FengZhen
    	 *
    	 */
    	public static class GroupComparator extends WritableComparator {
    		protected GroupComparator() {
    			super(IntPair.class, true);
    		}
    		@Override
    		public int compare(WritableComparable a, WritableComparable b) {
    			IntPair ip1 = (IntPair) a;
    			IntPair ip2 = (IntPair) b;
    			return IntPair.compare(ip1.getFirstKey(), ip2.getFirstKey());
    		}
    	}
    	
    	public int run(String[] args) throws Exception {
    		Job job = Job.getInstance(getConf());
    		job.setJobName("MaxTemperatureUsingSecondarySort");
    		job.setJarByClass(MaxTemperatureUsingSecondarySort.class);
    		
    		job.setMapperClass(MaxTemperatureMapper.class);
    		job.setReducerClass(MaxTemperatureReducer.class);
    		
    		job.setPartitionerClass(FirstPartitioner.class);
    		job.setSortComparatorClass(KeyComparator.class);
    		job.setGroupingComparatorClass(GroupComparator.class);
    		
    		job.setOutputKeyClass(IntPair.class);
    		job.setOutputValueClass(NullWritable.class);
    		
    		FileInputFormat.setInputPaths(job, new Path(args[0]));
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    	
    	public static void main(String[] args) throws Exception {
    		String[] params = new String[] {"hdfs://fz/user/hdfs/MapReduce/data/sort/MaxTemperatureUsingSecondarySort/input", "hdfs://fz/user/hdfs/MapReduce/data/sort/MaxTemperatureUsingSecondarySort/output"};
    		int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), params);
    		System.exit(exitCode);
    	}
    }
    

      

    IntPair:自定义组合键

    package com.zhen.mapreduce.sort.secondarySort;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    /**
     * 自定义组合键
     * map的键排序
     * */
    public class IntPair implements WritableComparable{
    
    	//使用java基本数据类型
    	private int firstKey;
    	private int secondKey;
    	public IntPair() {
    	}
    	public IntPair(int firstKey, int secondKey) {
    		this.firstKey = firstKey;
    		this.secondKey = secondKey;
    	}
    	//必须有默认的构造函数
    	public int getFirstKey() {
    		return firstKey;
    	}
    	public void setFirstKey(int firstKey) {
    		this.firstKey = firstKey;
    	}
    	public int getSecondKey() {
    		return secondKey;
    	}
    	public void setSecondKey(int secondKey) {
    		this.secondKey = secondKey;
    	}
    
    	public void readFields(DataInput in) throws IOException {
    		firstKey = in.readInt();
    		secondKey = in.readInt();
    	}
    
    	public void write(DataOutput out) throws IOException {
    		out.writeInt(firstKey);
    		out.writeInt(secondKey);
    	}
    
    	/**
    	 * map的键的比较就是根据这个方法来进行
    	 * */
    	public int compareTo(Object o) {
    		IntPair tInt = (IntPair)o;
    		//利用这个来控制升序或降序
    		//this在前为升序
    		//this在后为降序
    		return this.getFirstKey() >= (tInt.getFirstKey()) ? -1 : 1;
    	}
    	
    	/**
    	 * 比较两个int值大小
    	 * 降序
    	 * @param a
    	 * @param b
    	 * @return
    	 */
    	public static int compare(int a, int b) {
    		return a >= b ? -1 : 1;
    	}
    	@Override
    	public String toString() {
    		return "IntPair [firstKey=" + firstKey + ", secondKey=" + secondKey + "]";
    	}
    	
    }
    

      

    RecordParser:解析每条记录

    package com.zhen.mapreduce.sort.secondarySort;
    
    import java.io.Serializable;
    
    /**
     * @author FengZhen
     * @date 2018年9月9日
     * 解析MapReduce中map的数据
     */
    public class RecordParser implements Serializable{
    
    	private static final long serialVersionUID = 1L;
    
    	/**
    	 * 年份
    	 */
    	private Integer year;
    	/**
    	 * 气温
    	 */
    	private Integer temperature;
    	
    	/**
    	 * 解析
    	 * @param value
    	 */
    	public void parse(String value) {
    		String[] values = value.split(",");
    		if (values.length >= 2) {
    			year = Integer.valueOf(values[0]);
    			temperature = Integer.valueOf(values[1]);
    		}
    	}
    	
    	/**
    	 * 校验是否合格
    	 * @return
    	 */
    	public boolean isValidTemperature() {
    		return null != temperature;
    	}
    	
    	public Integer getYear() {
    		return year;
    	}
    
    	public void setYear(Integer year) {
    		this.year = year;
    	}
    
    	public int getTemperature() {
    		return temperature;
    	}
    	public void setTemperature(Integer temperature) {
    		this.temperature = temperature;
    	}
    }
    

      

    原始数据如下

    1990,14
    1980,12
    1990,19
    1960,11
    1960,18
    1980,17
    1970,24
    1970,23
    1940,22
    1940,35
    1930,44
    1920,43


    输出数据如下:输出数据格式可重写IntPair的toString方法

    IntPair [firstKey=1990, secondKey=19]
    IntPair [firstKey=1990, secondKey=14]
    IntPair [firstKey=1980, secondKey=17]
    IntPair [firstKey=1980, secondKey=12]
    IntPair [firstKey=1970, secondKey=23]
    IntPair [firstKey=1970, secondKey=24]
    IntPair [firstKey=1960, secondKey=18]
    IntPair [firstKey=1960, secondKey=11]
    IntPair [firstKey=1940, secondKey=35]
    IntPair [firstKey=1940, secondKey=22]
    IntPair [firstKey=1930, secondKey=44]
    IntPair [firstKey=1920, secondKey=43]
    

      

  • 相关阅读:
    年年岁岁花相似,岁岁年年竟相同
    两情相悦,亦或情投意合
    FreeBSD学习笔记1
    MySQL学习笔记2
    门户网站镜像站以及CDN技术
    候车
    MySQL学习笔记1
    JDBC | 第一章: 快速开始使用JDBC连接Mysql数据库之简单CRUD
    JDBC | 第零章: 什么是JDBC?
    JDBC | 第二章: JDBC之批量更新,添加,和删除操作
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/9656141.html
Copyright © 2011-2022 走看看