环境:
Hadoop1.x,CentOS6.5,三台虚拟机搭建的模拟分布式环境
数据:任意数量、格式的文本文件(我用的四个.java代码文件)
方案目标:
根据提供的文本文件,提取出每个单词在哪个文件中出现了几次,组成倒排索引,格式如下
Ant FaultyWordCount.java : 1 , WordCount.java : 1
思路:
因为这个程序需要用到三个变量:单词、文件名、出现的频率,因此需要自定义Writable类,以单词为key,将文件名和出现的频率打包。
1.先将每行文本的单词进行分割,以K/V=Word/Filename:1的格式分割。
2.利用Combiner类,将本Map一个文件的先进行一次计数,减少传输量
3.在Reduce中对Combiner中传输过来的同一个单词的在不同文件出现的频率数据进行组合。
难点:这个程序主要是用到了一个Combiner和自定义了Writable类。在实现的时候,需要注意的是Writable必须默认无参构造函数。
主调用Main类:
package ren.snail; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Main extends Configured implements Tool { public static void main(String[] args) throws Exception { // TODO Auto-generated method stub int result = ToolRunner.run(new Configuration(), new Main(), args); System.exit(result); } @Override public int run(String[] arg0) throws Exception { // TODO Auto-generated method stub Configuration configuration = getConf(); Job job = new Job(configuration, "InvertIndex"); job.setJarByClass(Main.class); FileInputFormat.addInputPath(job, new Path(arg0[0])); FileOutputFormat.setOutputPath(job, new Path(arg0[1])); job.setMapperClass(InvertMapper.class); job.setCombinerClass(Combinner.class); //设置Combiner类 job.setReducerClass(InvertReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FileFreqWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } }
自定义Writbale类
package ren.snail; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; public class FileFreqWritable implements Writable { Text documentID; IntWritable fequence; public FileFreqWritable() //必须提供无参构造函数 { this.documentID = new Text(); this.fequence = new IntWritable(); } public FileFreqWritable(Text id,IntWritable feq) { // TODO Auto-generated constructor stub this.documentID=id; this.fequence =feq; } public void set(String id,int feq) { this.documentID.set(id); this.fequence.set(feq); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub documentID.readFields(in); fequence.readFields(in); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub documentID.write(out); fequence.write(out); } public Text getDocumentID() { return documentID; } public String toString() { return documentID.toString()+" : "+fequence.get(); } public IntWritable getFequence() { return fequence; } }
Map
package ren.snail; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class InvertMapper extends Mapper<LongWritable, Text, Text, FileFreqWritable>{ public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException { String data = value.toString().replaceAll("[^a-zA-Z0-9]+", " "); //将不需要的其他字符都设为空 String[] values = data.split(" "); FileSplit fileSplit = (FileSplit)context.getInputSplit(); String filename = fileSplit.getPath().getName(); for (String temp : values) { FileFreqWritable obj = new FileFreqWritable(new Text(filename),new IntWritable(1)); context.write(new Text(temp), obj); } } }
Combiner
package ren.snail; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class Combinner extends Reducer<Text, FileFreqWritable, Text, FileFreqWritable>{ public void reduce(Text key,Iterable<FileFreqWritable> values,Context context) throws IOException,InterruptedException { int count = 0 ; String id = ""; for (FileFreqWritable temp : values) { count++; if(count == 1) { id=temp.getDocumentID().toString(); } } context.write(key,new FileFreqWritable(new Text(id), new IntWritable(count))); } }
Reduce
package ren.snail; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class InvertReducer extends Reducer<Text, FileFreqWritable, Text, Text> { public void reduce(Text key,Iterable<FileFreqWritable> values,Context context) throws IOException,InterruptedException { StringBuilder value = new StringBuilder(); for (FileFreqWritable fileFreqWritable : values) { String temp = fileFreqWritable.toString(); value.append(temp+" , "); } context.write(key,new Text(value.toString())); } }
其实我的Reduce实现思路可能有点问题,不过大致是这样