Key排序
1. 继承WritableComparator
在hadoop之Shuffle和Sort中,可以看到mapper的输出文件spill文件需要在内存中排序,并且在输入reducer之前,不同的mapper的数据也会排序,排序是根据数据的key进行的.
如果key是用户自定义的类型,并没有默认的比较函数时,就需要自己定义key的比较函数,也就是继承WritableComparator.事例代码如下:
public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; // 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数 // 这里是先比较年份,再比较温度,按温度降序排序 int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst()); if (cmp != 0) { return cmp; } return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse } }
例子中对IntPair定义了新的compare函数,并在main函数中通过下面的方式实现替换:
job.setSortComparatorClass(KeyComparator.class);
2.实现 WritableComparable接口
看下面的例子代码:
static class NewK2 implements WritableComparable<NewK2>{ Long first; Long second; public NewK2(){} public NewK2(long first, long second){ this.first = first; this.second = second; } @Override public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); }
@Override public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); } /** * 当k2进行排序时,会调用该方法. * 当第一列不同时,升序;当第一列相同时,第二列升序 */ @Override public int compareTo(NewK2 o) { final long minus = this.first - o.first; if(minus !=0){ return (int)minus; } return (int)(this.second - o.second); } @Override public int hashCode() { return this.first.hashCode()+this.second.hashCode(); } @Override public boolean equals(Object obj) { if(!(obj instanceof NewK2)){ return false; } NewK2 oK2 = (NewK2)obj; return (this.first==oK2.first)&&(this.second==oK2.second); } }
如果是按照上述的例子实现的,不需要在main函数中设置其他的代码.
Group排序
一般来说,如果用户自定义了key的排序过程,那么在reducer之前的对数据进行分组的过程就要重新编写,而且一般来说,partitioner也需要重新定义,请参考hadoop之定制自己的Partitioner .
shuffle阶段,虽然使用的是hash的方法,我们并不能保证映射到同一个reducer的key的hash值都是一样的,对于不同的hash值要进行分群,然后再执行reduce.下面是自定义groupcomparator的例子:
public static class GroupComparator extends WritableComparator { protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; // 这里是按key的第一个参数来聚合,就是年份 return IntPair.compare(ip1.getFirst(), ip2.getFirst()); } }
例子中实现了对于IntPair类型的分群比较函数的重新定义.在main函数中通过下面的方式进行调用:
job.setGroupingComparatorClass(GroupComparator.class);
二次排序
下面是对地区温度进行的统计,要求输出各个年份的最大温度,例子中定制了自己的partitioner:FirstPartitioner来对组合后的类型进行分组,实际上还是按照年份进行的分组;定制了自己的keycomparator:KeyComparator,先比较年份,然后再比较温度;定制了自己的分群比较类:GroupComparator,也是按照年份进行分群,然后扔给reducer进行处理.
值得一提的是,为什么不用传统的mapreduce,按照年份进行进行map,然后在reduce中,遍历每年不同的温度,找到最大值呢?原因之一就是效率的问题,sort操作本身就要在MP框架中执行,而且已经做了很多优化,通过设置比较的不同手段,很容易实现比较,然而在reducer处理中进行遍历,显然比上面的sort过程要慢.下面是例子的完整代码,摘自Hadoop- The Definitive Guide, 4th Edition.
public class MaxTemperatureUsingSecondarySort extends Configured implements Tool { // Map任务 static class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntPair, NullWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); public void map(LongWritable key, Text value, OutputCollector<IntPair, NullWritable> output, Reporter reporter) throws IOException { parser.parse(value); // 解析输入的文本 if (parser.isValidTemperature()) { // 这里把年份与温度组合成一个key,value为空 output.collect(new IntPair(parser.getYearInt(),+ parser.getAirTemperature()), NullWritable.get()); } } } // Reduce任务 static class MaxTemperatureReducer extends MapReduceBase implements Reducer<IntPair, NullWritable, IntPair, NullWritable> { public void reduce(IntPair key, Iterator<NullWritable> values, OutputCollector<IntPair, NullWritable> output, Reporter reporter) throws IOException { // 输出聚合的key值,这里的key是先按年份进行聚合,所我们会看到相同所有年份相同的key会聚合在一起,而这些聚合后的key按温度进行降序按列 // 所以聚合中第一个key为温度最高的,所以这里输出的key为这一年中温度最高的值 output.collect(key, NullWritable.get()); } } // 切分器,这里是按年份* 127 % reduceNum来进行切分的 public static class FirstPartitioner implements Partitioner<IntPair, NullWritable> { @Override public void configure(JobConf job) {} @Override public int getPartition(IntPair key, NullWritable value, int numPartitions) { return Math.abs(key.getFirst() * 127) % numPartitions; } } // 聚合key的一个比较器 public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; // 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数 // 这里是先比较年份,再比较温度,按温度降序排序 int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst()); if (cmp != 0) { return cmp; } return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse } } // 设置聚合比较器 public static class GroupComparator extends WritableComparator { protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; // 这里是按key的第一个参数来聚合,就是年份 return IntPair.compare(ip1.getFirst(), ip2.getFirst()); } } @Override public int run(String[] args) throws IOException { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } job.setMapperClass(MaxTemperatureMapper.class); job.setPartitionerClass(FirstPartitioner.class); job.setSortComparatorClass(KeyComparator.class); job.setGroupingComparatorClass(GroupComparator.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(IntPair.class); // 设置key的一个组合类型,如里这个类型实现了WritableComparable<T>的话,那就不要设置setOutputKeyComparatorClass了. job.setOutputValueClass(NullWritable.class); // 输出的value为NULL,因为这里的实际value已经组合到了key中
return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args); System.exit(exitCode); } }