zoukankan      html  css  js  c++  java
  • MapReduce实现倒排索引

    倒排索引

    倒排索引"是文档检索系统中最常用的数据结构,被广泛地应用于全文搜索引擎。它主要是用来存储某个单词(或词组)在一个文档或一组文档中的存储位置的映射,即提供了一种根据内容来查找文档的方式。由于不是根据文档来确定文档所包含的内容,而是进行相反的操作,因而称为倒排索引(Inverted Index)。

    file1:

    MapReduce is simple

    file2:

    MapReduce is powerful is simple

    file3:

    Hello MapReduce bye MapReduce

    样例输出

    MapReduce      file1.txt:1;file2.txt:1;file3.txt:2;
    
    is            file1.txt:1;file2.txt:2;
    
    simple           file1.txt:1;file2.txt:1;
    
    powerful      file2.txt:1;
    
    Hello          file3.txt:1;
    
    bye            file3.txt:1;

    设计思路

    因为需要标识读入的文件名,而文件名与split有关,所以只能在map中写入,通过((FileSplit)context.getInputSplit()).getPath().getName()获取到文件名。

    以file3为例

      map端输入: 0 Hello MapReduce bye MapReduce

      map端输出: Hello file3:1   MapReduce file3:1  bye file3:1 MapReduce file3:1

      combiner输入: (Hello,<file3:1>) (MapReduce <file3:1,file3:1>) (bye<file3:1>)

      combiner输出: (Hello,<file3:1>) (MapReduce,<file3:2>) (bye,<file3:1>)

      Reduce输入: (MapReduce,<file3:2,file1:1,file2:1>)

      Reduce输出: (MapReduce,file3:2;file1:1;file2:1)

    代码实现

    Mapper类

    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class MyMapper extends Mapper<LongWritable,Text, Text,Text> {
    
        private static final Text k = new Text();
        private static final Text v = new Text();
        
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
            String path = ((FileSplit)context.getInputSplit()).getPath().getName();//获取文件名
            StringTokenizer st = new StringTokenizer(value.toString());
            v.set(path+":1");
            while(st.hasMoreTokens()){
                k.set(st.nextToken());
                context.write(k,v);
            }
        }
    }

    Combiner类

    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyCombiner extends Reducer<Text, Text, Text, Text> {
    
        private static final Text v = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> value,Context context) throws IOException,
                InterruptedException {
            int sum = 0;
            String con = "";
            for(Text val : value){
                String line = val.toString();
                int index = line.indexOf(":");
                con = line.substring(0, index+1);
                String stf = line.substring(index+1);
                sum +=Integer.parseInt(stf);
            }
            v.set(con+sum);
            context.write(key, v);
        }
    }

    Reducer类

    mport java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyReducer extends Reducer<Text, Text, Text, Text>{
    
        private static final Text v = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> value,Context context)
                throws IOException, InterruptedException {
            String str = "";
            for(Text val : value){
                str += val.toString()+";";
            }
            v.set(str);
            context.write(key, v);
        }
    }

    Job驱动类

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class InvertIndex {
    
        public static void main(String[] args) throws Exception {
            
            Configuration conf = new Configuration();
            
            Job job = new Job(conf,"invert index");
            job.setJarByClass(InvertIndex.class);
            job.setMapperClass(MyMapper.class);
            job.setCombinerClass(MyCombiner.class);
            job.setReducerClass(MyReducer.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            FileInputFormat.addInputPath(job, new Path("hdfs://127.0.0.1:9000/usr/qqx/invertinput"));
            FileOutputFormat.setOutputPath(job, new Path("hdfs://127.0.0.1:9000/usr/qqx/invertoutput"));
            System.exit(job.waitForCompletion(true)?0:1);
    
        }
    }
  • 相关阅读:
    docker 创建新的镜像到私有仓库
    docker 数据管理<1>
    docker 数据管理<1>
    docker 运行挂载磁盘
    docker 运行挂载磁盘
    docker 容器管理上
    docker 指定容器名字
    消息队列应用场景解析
    Apache软件基金会Member陈亮:一名开源拓荒者的 Apache之旅
    【干货贴】消息队列如何利用标签实现消息过滤
  • 原文地址:https://www.cnblogs.com/qiaoqianxiong/p/4987086.html
Copyright © 2011-2022 走看看