一.介绍及数据准备
把文件ID对应到关键词的映射转换为关键词到文件ID的映射,每个关键词都对应着一系列的文件,这些文件中都出现这个关键词。
参考博客:https://www.cnblogs.com/zlslch/p/6440114.html
1.建立一个文件夹
hdfs dfs -mkdir /indexdata
2.在文件夹下建立三个文件,准备数据:
data01.txt:I love Beijing and love China
data02.txt:I love China
data03.txtx:Beijing is the capital of China
3.推送到hdfs
hdfs dfs -put data0*.txt /indexdata
二.分析倒排索引的数据处理流程
RevertedIndexMapper.java
package demo.reveredindex; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class RevertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException { //得到数据来自于哪个文件 /myindexdata/data01.txt String path = ((FileSplit)context.getInputSplit()).getPath().toString(); //得到最后一个斜线的位置 int index = path.lastIndexOf("/"); //得到文件名 String fileName = path.substring(index+1); // 数据:I love Beijing and love Shanghai String data = value1.toString(); //分词 String[] words = data.split(" "); //输出:I:data01.txt 1 for (String w : words) { context.write(new Text(w+":"+fileName), new Text("1")); } } }
RevertedIndexCombiner.java
package demo.reveredindex; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class RevertedIndexCombiner extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text k21, Iterable<Text> v21, Context context) throws IOException, InterruptedException { // 求和:对一个文件中的某个单词进行求和 long total = 0; for (Text t : v21) { total = total + Long.parseLong(t.toString()); } //k21: love:data01.txt String str = k21.toString(); //得到冒号的位置 int index = str.indexOf(":"); //单词 String word = str.substring(0, index); //文件名 String fileName = str.substring(index+1); //输出 context.write(new Text(word), new Text(fileName+":"+total)); } }
RevertedIndexReducer.jaja
package demo.reveredindex; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class RevertedIndexReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text k3, Iterable<Text> v3, Context context) throws IOException, InterruptedException { //对combiner数据value:拼加 String string = ""; for (Text t : v3) { string = "("+t.toString()+")"+string; } //输出 context.write(k3, new Text(string)); } }
RevertedIndexMain.java
package demo.reveredindex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class RevertedIndexMain { public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(RevertedIndexMain.class); //Mapper : k2 v2 job.setMapperClass(RevertedIndexMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //Combiner job.setCombinerClass(RevertedIndexCombiner.class); //Reducer k4 v4 job.setReducerClass(RevertedIndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //ִ执行 job.waitForCompletion(true); } }
运行:
hadoop jar temp/c1.jar /indexdata /output/day0310/c1
console: