为何使用combiner
- 减少洗牌的键值对数量
- 缓解数据倾斜问题
combiner的设计
combiner在数据转换上必须与reducer等价
- 若Reducer仅处理分配型函数(最大值/最小值/求和/计数),可以使用reducer为combiner
- 其他:自己设计combiner和reducer
求均值Combiner的例子
在输出中增加了一列count,将求均值任务转换为value和count的求和任务,使得reducer具有分配特性,因而可直接用于combiner(输出略微调整)。
- Mapper输出:(key:【value count】)
- Combiner输出:(key:【value count】)
- Reducer输出:(key:【sum(value) / sum(count)】)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
// 求均值Combiner的例子
public class AverageByAttributeWithCombiner extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",", -20);
String country = fields[4];
String numClaims = fields[8];
if (numClaims.length() > 0 && !numClaims.startsWith(""")) {
context.write(new Text(country), new Text(numClaims + ",1"));
}
}
}
public static class ReduceClass extends Reducer<Text, Text, Text, DoubleWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sum = 0;
int count = 0;
for (Text val: values) {
String fields[] = val.toString().split(",");
sum += Double.parseDouble(fields[0]);
count += Integer.parseInt(fields[1]);
}
context.write(key, new DoubleWritable(sum / count));
}
}
public static class Combiner extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sum = 0;
int count = 0;
for (Text val: values) {
String fields[] = val.toString().split(",");
sum += Double.parseDouble(fields[0]);
count += Integer.parseInt(fields[1]);
}
context.write(key, new Text(sum + "," + count));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "AverageByAttributeWithCombiner");
job.setJarByClass(AverageByAttributeWithCombiner.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setCombinerClass(Combiner.class);
job.setReducerClass(ReduceClass.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new AverageByAttributeWithCombiner(), args);
System.exit(exitCode);
}
}
查看combine的效果
- Map output records:Map输出的记录数量
- Reduce input Records:Reduce输入记录的数量