排序
排序是MapReduce的核心技术。
1.准备
示例:按照气温字段对天气数据集排序。由于气温字段是有符号的整数,所以不能将该字段视为Text对象并以字典顺序排序。反之,用顺序文件存储数据,其IntWritable键代表气温(并且正确排序),其Text值就是数据行。
MapReduce作业只包含map任务,它过滤输入数据并移除空数据行的记录。各个map创建并输出一个块压缩的顺序文件。
代码如下
package com.zhen.mapreduce.sort.preprocessor; import java.io.IOException; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author FengZhen * @date 2018年9月9日 * 过滤掉无用数据并使用顺序文件存储数据 */ public class SortDataPreprocessor extends Configured implements Tool{ static class CleanerMapper extends Mapper<LongWritable, Text, IntWritable, Text>{ private RecordParser recordParser = new RecordParser(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException { recordParser.parse(value.toString()); if (recordParser.isValidTemperature()) { context.write(new IntWritable(recordParser.getTemperature()), new Text(recordParser.getCity())); } } } public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf()); job.setJobName("SortDataPreprocessor"); job.setJarByClass(SortDataPreprocessor.class); job.setMapperClass(CleanerMapper.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); job.setOutputFormatClass(SequenceFileOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); SequenceFileOutputFormat.setOutputPath(job, new Path(args[1])); //是否被压缩都会被输出 SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { String[] params = new String[]{"hdfs://fz/user/hdfs/MapReduce/data/sort/preprocessor/input","hdfs://fz/user/hdfs/MapReduce/data/sort/preprocessor/output"}; int exitCode = ToolRunner.run(new SortDataPreprocessor(), params); System.exit(exitCode); } }
package com.zhen.mapreduce.sort.preprocessor; import java.io.Serializable; /** * @author FengZhen * @date 2018年9月9日 * 解析MapReduce中map的数据 */ public class RecordParser implements Serializable{ private static final long serialVersionUID = 1L; /** * 城市 */ private String city; /** * 气温 */ private Integer temperature; /** * 解析 * @param value */ public void parse(String value) { String[] values = value.split(","); if (values.length >= 2) { city = values[0]; temperature = Integer.valueOf(values[1]); } } /** * 校验是否合格 * @return */ public boolean isValidTemperature() { return null != temperature; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public int getTemperature() { return temperature; } public void setTemperature(Integer temperature) { this.temperature = temperature; } }
打jar包上传至服务器执行
scp /Users/FengZhen/Desktop/Hadoop/file/Sort.jar root@192.168.1.124:/usr/local/test/mr hadoop jar Sort.jar com.zhen.mapreduce.sort.SortDataPreprocessor
2.部分排序
当有多个reduce任务时,产生多个已排序的输出文件。但是如何将这些小文件合并成一个有序的文件却并非易事。
3.全排序
如何使用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区(a single partition)。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构的优势。
事实上仍有替代方案:首先,创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要的思路是使用一个partitioner来描述输出的全局排序。
示例:以气温排序为例
给定一个partitioner,四个分区,第一个分区的温度范围在0-10,第二个在11-20,第三个在21-30,第四个在31-40.
这样可以保证第i个分区的键小于第i+1个分区的键,保证了完全有序,但是会出现数据分布不均的情况。
获得气温分布信息意味着可以建立一系列分布非常均匀的分区。但由于该操作需要遍历整个数据集,因此并不实用。通过对键空间进行采样,就可较为均匀地划分数据集。采样的核心思想是只查看一小部分键,获得键的近似分布,并由此构建分区。Hadoop已经内置了若干采样器。
InputSampler类实现了Sampler接口,该接口的唯一成员方法(getSampler)有两个输入参数(一个InputFormat对象和一个Job对象),返回一系列样本键。
代码如下
package com.zhen.mapreduce.sort.totalPartitioner; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.InputSampler; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author FengZhen * @date 2018年9月9日 * 根据分区全排序 */ public class SortByTemperatureUsingTotalOrderPartitioner extends Configured implements Tool{ public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf()); job.setJobName("SortByTemperatureUsingTotalOrderPartitioner"); job.setJarByClass(SortByTemperatureUsingTotalOrderPartitioner.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileInputFormat.setInputPaths(job, new Path(args[0])); SequenceFileOutputFormat.setOutputPath(job, new Path(args[1])); //是否被压缩都会被输出 SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); job.setPartitionerClass(TotalOrderPartitioner.class); /** * 采样率设为 0.1 * 最大样本数 10000 * 最大分区数 10 * 这也是InputSampler作为应用程序运行时的默认设置 * 只要任意一个限制条件满足,即停止采样。 */ InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler(0.1, 10000, 10); InputSampler.writePartitionFile(job, sampler); //为了和集群上运行的其他任务共享分区文件,InputSampler需要将其所写的分区文件加到分布式缓存中。 Configuration conf = job.getConfiguration(); String partitionFile = TotalOrderPartitioner.getPartitionFile(conf); URI partitionUri = new URI(partitionFile + "#" + TotalOrderPartitioner.DEFAULT_PATH); job.addCacheFile(partitionUri); job.createSymlink(); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { String[] params = new String[]{"hdfs://fz/user/hdfs/MapReduce/data/sort/preprocessor/output","hdfs://fz/user/hdfs/MapReduce/data/sort/SortByTemperatureUsingTotalOrderPartitioner/output"}; int exitCode = ToolRunner.run(new SortByTemperatureUsingTotalOrderPartitioner(), params); System.exit(exitCode); } }
4.辅助排序
MapReduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。
示例:键升序,键相同的值升序
代码如下
package com.zhen.mapreduce.sort.secondarySort; import java.io.IOException; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author FengZhen * @date 2018年9月9日 * 对键排序后的值排序 */ public class MaxTemperatureUsingSecondarySort extends Configured implements Tool{ static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable>{ private RecordParser recordParser = new RecordParser(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntPair, NullWritable>.Context context) throws IOException, InterruptedException { recordParser.parse(value.toString()); if (recordParser.isValidTemperature()) { context.write(new IntPair(recordParser.getYear(), recordParser.getTemperature()), NullWritable.get()); } } } static class MaxTemperatureReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable>{ @Override protected void reduce(IntPair key, Iterable<NullWritable> values, Reducer<IntPair, NullWritable, IntPair, NullWritable>.Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } /** * 创建一个自定义的partitioner以按照组合键的守字段(年份)进行分区 * @author FengZhen * */ public static class FirstPartitioner extends Partitioner<IntWritable, IntWritable>{ @Override public int getPartition(IntWritable key, IntWritable value, int numPartitions) { return Math.abs(key.get() * 127) % numPartitions; } } /** * 按照年份(升序)和气温(降序)排列键 * @author FengZhen * */ public static class KeyComparator extends WritableComparator{ public KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { IntPair ip1 = (IntPair) a; IntPair ip2 = (IntPair) b; int cmp = IntPair.compare(ip1.getFirstKey(), ip2.getFirstKey()); if (cmp != 0) { return cmp; } return -IntPair.compare(ip1.getSecondKey(), ip2.getSecondKey()); } } /** * 按年份对键进行分组 * @author FengZhen * */ public static class GroupComparator extends WritableComparator { protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { IntPair ip1 = (IntPair) a; IntPair ip2 = (IntPair) b; return IntPair.compare(ip1.getFirstKey(), ip2.getFirstKey()); } } public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf()); job.setJobName("MaxTemperatureUsingSecondarySort"); job.setJarByClass(MaxTemperatureUsingSecondarySort.class); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setPartitionerClass(FirstPartitioner.class); job.setSortComparatorClass(KeyComparator.class); job.setGroupingComparatorClass(GroupComparator.class); job.setOutputKeyClass(IntPair.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { String[] params = new String[] {"hdfs://fz/user/hdfs/MapReduce/data/sort/MaxTemperatureUsingSecondarySort/input", "hdfs://fz/user/hdfs/MapReduce/data/sort/MaxTemperatureUsingSecondarySort/output"}; int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), params); System.exit(exitCode); } }
IntPair:自定义组合键
package com.zhen.mapreduce.sort.secondarySort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 自定义组合键 * map的键排序 * */ public class IntPair implements WritableComparable{ //使用java基本数据类型 private int firstKey; private int secondKey; public IntPair() { } public IntPair(int firstKey, int secondKey) { this.firstKey = firstKey; this.secondKey = secondKey; } //必须有默认的构造函数 public int getFirstKey() { return firstKey; } public void setFirstKey(int firstKey) { this.firstKey = firstKey; } public int getSecondKey() { return secondKey; } public void setSecondKey(int secondKey) { this.secondKey = secondKey; } public void readFields(DataInput in) throws IOException { firstKey = in.readInt(); secondKey = in.readInt(); } public void write(DataOutput out) throws IOException { out.writeInt(firstKey); out.writeInt(secondKey); } /** * map的键的比较就是根据这个方法来进行 * */ public int compareTo(Object o) { IntPair tInt = (IntPair)o; //利用这个来控制升序或降序 //this在前为升序 //this在后为降序 return this.getFirstKey() >= (tInt.getFirstKey()) ? -1 : 1; } /** * 比较两个int值大小 * 降序 * @param a * @param b * @return */ public static int compare(int a, int b) { return a >= b ? -1 : 1; } @Override public String toString() { return "IntPair [firstKey=" + firstKey + ", secondKey=" + secondKey + "]"; } }
RecordParser:解析每条记录
package com.zhen.mapreduce.sort.secondarySort; import java.io.Serializable; /** * @author FengZhen * @date 2018年9月9日 * 解析MapReduce中map的数据 */ public class RecordParser implements Serializable{ private static final long serialVersionUID = 1L; /** * 年份 */ private Integer year; /** * 气温 */ private Integer temperature; /** * 解析 * @param value */ public void parse(String value) { String[] values = value.split(","); if (values.length >= 2) { year = Integer.valueOf(values[0]); temperature = Integer.valueOf(values[1]); } } /** * 校验是否合格 * @return */ public boolean isValidTemperature() { return null != temperature; } public Integer getYear() { return year; } public void setYear(Integer year) { this.year = year; } public int getTemperature() { return temperature; } public void setTemperature(Integer temperature) { this.temperature = temperature; } }
原始数据如下
1990,14 1980,12 1990,19 1960,11 1960,18 1980,17 1970,24 1970,23 1940,22 1940,35 1930,44 1920,43
输出数据如下:输出数据格式可重写IntPair的toString方法
IntPair [firstKey=1990, secondKey=19] IntPair [firstKey=1990, secondKey=14] IntPair [firstKey=1980, secondKey=17] IntPair [firstKey=1980, secondKey=12] IntPair [firstKey=1970, secondKey=23] IntPair [firstKey=1970, secondKey=24] IntPair [firstKey=1960, secondKey=18] IntPair [firstKey=1960, secondKey=11] IntPair [firstKey=1940, secondKey=35] IntPair [firstKey=1940, secondKey=22] IntPair [firstKey=1930, secondKey=44] IntPair [firstKey=1920, secondKey=43]