目录
05-26-实现自连接的MapReduce程序
05-27-分析倒排索引的过程
倒排索引数据处理的过程.png
05-28-使用MapReduce实现倒排索引1
05-29-使用MapReduce实现倒排索引2
使用MapReduce实现倒排索引
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class RevertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//数据:/indexdata/data01.txt
//得到对应文件名
String path = ((FileSplit)context.getInputSplit()).getPath().toString();
//解析出文件名
//得到最后一个斜线的位置
int index = path.lastIndexOf("/");
String fileName = path.substring(index+1);
//数据:I love Beijing and love Shanghai
String data = value1.toString();
String[] words = data.split(" ");
//输出
for(String word:words){
context.write(new Text(word+":"+fileName), new Text("1"));
}
}
}
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class RevertedIndexReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text k3, Iterable<Text> v3, Context context)
throws IOException, InterruptedException {
String str = "";
for(Text t:v3){
str = "("+t.toString()+")"+str;
}
context.write(k3, new Text(str));
}
}
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class RevertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text k21, Iterable<Text> v21, Context context)
throws IOException, InterruptedException {
// 求和:对同一个文件中的单词进行求和
int total = 0;
for(Text v:v21){
total = total + Integer.parseInt(v.toString());
}
//k21是:love:data01.txt
String data = k21.toString();
//找到:冒号的位置
int index = data.indexOf(":");
String word = data.substring(0, index); //单词
String fileName = data.substring(index + 1); //文件名
//输出:
context.write(new Text(word), new Text(fileName+":"+total));
}
}
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RevertedIndexMain {
public static void main(String[] args) throws Exception {
//1、创建一个任务
Job job = Job.getInstance(new Configuration());
job.setJarByClass(RevertedIndexMain.class); //任务的入口
//2、指定任务的map和map输出的数据类型
job.setMapperClass(RevertedIndexMapper.class);
job.setMapOutputKeyClass(Text.class); //k2的数据类型
job.setMapOutputValueClass(Text.class); //v2的类型
//指定任务的Combiner
job.setCombinerClass(RevertedIndexCombiner.class);
//3、指定任务的reduce和reduce的输出数据的类型
job.setReducerClass(RevertedIndexReducer.class);
job.setOutputKeyClass(Text.class); //k4的类型
job.setOutputValueClass(Text.class); //v4的类型
//4、指定任务的输入路径、任务的输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//5、执行任务
job.waitForCompletion(true);
}
}