习题来源:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html
file1
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
file2
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyDedup {
public static class LineNullMapper extends Mapper<Object, Text, Text, NullWritable>{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
context.write(value, NullWritable.get());
}
}
public static class SortReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException{
context.write(key, NullWritable.get());
}
}
如果把Iterable<NullWritable> values 替换为 NullWritable values 如果是不用Iterable迭代器的话,则不进行分组么?
结果是:只排序了并没有完成去重
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-3 c
2012-3-3 c
2012-3-4 d
2012-3-4 d
2012-3-5 a
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d
public static void main(String[] args) throws Exception {
String dir_in = "hdfs://localhost:9000/in_dedup";
String dir_out = "hdfs://localhost:9000/out_dedup";
Path in = new Path(dir_in);
Path out = new Path(dir_out);
Configuration conf = new Configuration();
Job sortJob = new Job(conf, "my_dedup");
sortJob.setJarByClass(MyDedup.class);
sortJob.setInputFormatClass(TextInputFormat.class);
sortJob.setMapperClass(LineNullMapper.class);
sortJob.setCombinerClass(SortReducer.class);
//countJob.setPartitionerClass(HashPartitioner.class);
sortJob.setMapOutputKeyClass(Text.class);
sortJob.setMapOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(sortJob, in);
sortJob.setReducerClass(SortReducer.class);
// countJob.setNumReduceTasks(1);
sortJob.setOutputKeyClass(Text.class);
sortJob.setOutputValueClass(NullWritable.class);
//countJob.setOutputFormatClass(SequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(sortJob, out);
sortJob.waitForCompletion(true);
}
}
运行结果:
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d