需求是: 统计输出某目录文件的所有单词,去除重复的单词。
mapper阶段正常做map工作,映射。 切割单词。 <key,value> --> <word,nullWritable>
reducer阶段,对于同一个key 的一组信息,是只输出第一个。
mapper 和wordcount 的单词数是一样的。
package com.mapreduce.mapper; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ Text text = new Text(); protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String worlds[] = line.split(" "); for( String word:worlds ){ text.set(word); context.write(text, NullWritable.get()); } } }
reducer 对于同一个key 的一组, 只输出一个就ok 了。( ... ... )
package com.mapreduce.mapper; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class DistincReducer extends Reducer<Text, NullWritable, Text, NullWritable>{ @Override protected void reduce(Text key, Iterable<NullWritable> value, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
job 提交
package com.mapreduce.mapper; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class DriverDemo { public static void main(String[] args) throws Exception, IOException { Configuration configuration = new Configuration(); // 2 job Job job = Job.getInstance(configuration); // 3 作业jar包 job.setJarByClass(DriverDemo.class); // 4 map, reduce jar 包 job.setMapperClass(DistinctMapper.class); job.setReducerClass(DistincReducer.class); // 5 map 输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 6 最终 输出类型 (reducer) job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 7 inputformatclass , outputformatclass 输入输出入文件类型 可能决定分片信息 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // 8 输入输出文件路径 FileInputFormat.setInputPaths(job, new Path("d:/input")); FileOutputFormat.setOutputPath(job, new Path("d:/output5")); // 9 job提交 job.waitForCompletion(true); } }