Job job = new Job(conf, "MaxTemperature");
job.setJarByClass(MaxTemperature.class);
job.setMapperClass(MaxTemperatureMapper.class);
Combiner类:
通常,每个Map会产生大量的输出,combiner类的作用就是在Map端对输出先做一次合并,以减少传输到reducer的数据量。
另外,需要注意的是,根据框架要求,combiner的输入输出类型必须和mapper的输出和reducer的输入类型一致。
Combiner函数的示例如下:
我们以计算特定的key对应的平均值为例,展示一下combiner的用法:
Mapper:
Class Mapper
method Map(Long, String)
Emit(String, Integer)
Combiner:
class Combiner method Combine(String t, Integers [r1, r2, . . .]) sum ← 0 cnt ← 0 for all integer r ∈ integers [r1, r2, . . .] do sum ← sum + r cnt ← cnt + 1 Emit(String t, pair (sum, cnt)) // 分离sum和cnt
Reducer:
class Reducer method Reduce(String t, pairs [(s1, c1), (s2, c2) . . .]) sum ← 0 cnt ← 0 for all pair (s, c) ∈ pairs [(s1, c1), (s2, c2) . . .] do sum ← sum + s cnt ← cnt + c ravg ← sum/cnt Emit(String t, Integer ravg)
乍一看应该没有问题,但是不幸的是,这个combiner是不正确的。因为框架要求,combiner的输入输出类型必须和mapper的输出以及reducer的输入类型一致,更应是和Reducer的输入输出类型一致。即,当Combiner类正确的情况下,可以完全删除而对程序的运行结果没有任何影响,只是对于数据量大的job执行时间会变长。而上面将Combiner删除后,程序就会出错。这时候Mapper的输出类型和Reducer的输入类型不一致。所以会出现这个问题
改变的方式非常简单,把Mapper的输出包装一下即可:
class Mapper method Map(String t, Integer r) Emit(String t, pair (r, 1)) class Combiner method Combine(String t, pairs [(s1, c1), (s2, c2) . . .]) sum ← 0 cnt ← 0 for all pair (s, c) ∈ pairs [(s1, c1), (s2, c2) . . .] do sum ← sum + s cnt ← cnt + c Emit(String t, pair (sum, cnt)) class Reducer method Reduce(String t, pairs [(s1, c1), (s2, c2) . . .]) sum ← 0 cnt ← 0 for all pair (s, c) ∈ pairs [(s1, c1), (s2, c2) . . .] do sum ← sum + s cnt ← cnt + c ravg ← sum/cnt Emit(String t, integer ravg)
//job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
Partitioner类:
Partitioner负责对map中间输出键值对按键进行分区,用键或者键的子集来获得分区,典型的是用Hash函数来完成。
分区的总数目和当然工作的reduce任务的总个数相同(reduce任务个数可以通过job.setNumReduceTasks(tasks)进行设置),
因此,这种分组使得发送到reduce任务的中间键值对(记录条数)减少。
PartitionerClass需要继承自Partitioner类,并重载它的getPartition方法。
/** * @param key 将要被分组的键 * @param value 实体的值 * @param numPartitions 总的分组数目 * @return 返回给定key的分组个数 */ int getPartition(K2 key, V2 value, int numPartitions);
PartionerClass的示例代码如下:
public static class CatPartiotner extends Partitioner<Text,Text>{ @Override public int getPartition(Text key, Text value, int numPartitions){ String[] parts=key.toString().split("-");
if(parts.length==2){
return Math.abs(parts[0],hashCode())%numPartitions;
}
return Math.abs(key.toString().hashCode())%numPartitions; } }
然后在job中设置Partitioner Class:
//job.setPartitionerClass(cls);
// job.setInputFormatClass(Class<? extends InputFormat> cls); //设置job的输入格式
// job.setOutputFormatClass(Class<? extends InputFormat> cls); //设置job的输出格式
Grouping类:
定义一个比较器,对于Reducer的一个单一的调用reduce(Obect,Iterable,Context),该比较器控制哪些些键被聚合在一起。按照键值关系进行分组。
job.setGroupingComparatorClass(Class<? extends RawComparator> cls)
Sorting类:
定义一个比较器,确定在这些键被传递给Reducer之前是以何种方式排序的。主要是对键进行排序。
job.setSortComparatorClass(Class<? extends RawComparator> cls )
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//job.setNumReduceTasks(tasks);
for(int i=0;i<otherArgs.length-1;i++)
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}