zoukankan      html  css  js  c++  java
  • 【Hadoop】MapReduce练习:多job关联实现倒排索引

    概述

    倒排索引(英语:Inverted index),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。它是文档检索系统中最常用的数据结构。通过倒排索引,可以根据单词快速获取包含这个单词的文档列表。倒排索引主要由两个部分组成:“单词词典”和“倒排文件”。

    倒排索引有两种不同的反向索引形式:
      一条记录的水平反向索引(或者反向档案索引)包含每个引用单词的文档的列表。
      一个单词的水平反向索引(或者完全反向索引)又包含每个单词在一个文档中的位置。
      后者的形式提供了更多的兼容性(比如短语搜索),但是需要更多的时间和空间来创建。

    现代搜索引擎的索引都是基于倒排索引。相比“签名文件”、“后缀树”等索引结构,“倒排索引”是实现单词到文档映射关系的最佳实现方式和最有效的索引结构。

    多Job串联:第一个job产生的输出结果,是第二个job的输入,第二个job执行的前提是获得第一个job的输出结果,第二个job依赖于第一个job,二者是串行执行关系。job1----->job2----->jobn

    示例

    需求:有大量的文本(文档、网页),需要建立搜索索引。

    示例:有a.txt,b.txt,c.txt三个文件,每个文件分别对应一些关键词;

    a.txt如下:

    map
    reduce
    MapReduce
    index Inverted index
    Inverted index
    倒排索引
    大数据
    hadoop MapReduce hdfs
    Inverted index
    

    b.txt如下:

    hadoop MapReduce hdfs
    Inverted index
    倒排索引
    大数据
    map
    reduce
    MapReduce
    

    c.txt如下:

    Inverted index
    倒排索引
    大数据
    hadoop MapReduce hdfs
    Inverted index
    hadoop MapReduce hdfs
    Inverted index
    map
    reduce
    MapReduce
    

    分析如上文件,实现倒排索引并进行多job关联,需要两个job进行解析;

    第一个job期望实现如下方式:

    Inverted	a.txt-->3
    Inverted	b.txt-->1
    Inverted	c.txt-->3
    MapReduce	a.txt-->2
    MapReduce	b.txt-->2
    MapReduce	c.txt-->3
    hadoop	a.txt-->1
    hadoop	b.txt-->1
    hadoop	c.txt-->2
    hdfs	a.txt-->1
    hdfs	b.txt-->1
    hdfs	c.txt-->2
    index	a.txt-->4
    index	b.txt-->1
    index	c.txt-->3
    map	a.txt-->1
    map	b.txt-->1
    map	c.txt-->1
    reduce	a.txt-->1
    reduce	b.txt-->1
    reduce	c.txt-->1
    倒排索引	a.txt-->1
    倒排索引	b.txt-->1
    倒排索引	c.txt-->1
    大数据	a.txt-->1
    大数据	b.txt-->1
    大数据	c.txt-->1
    

    第二个job期望实现最后效果:

    Inverted	b.txt-->1	a.txt-->3	c.txt-->3	
    MapReduce	a.txt-->2	b.txt-->2	c.txt-->3	
    hadoop	a.txt-->1	b.txt-->1	c.txt-->2	
    hdfs	a.txt-->1	b.txt-->1	c.txt-->2	
    index	b.txt-->1	c.txt-->3	a.txt-->4	
    map	a.txt-->1	b.txt-->1	c.txt-->1	
    reduce	a.txt-->1	b.txt-->1	c.txt-->1	
    倒排索引	a.txt-->1	b.txt-->1	c.txt-->1	
    大数据	a.txt-->1	b.txt-->1	c.txt-->1	
    

    代码详情

    IndexOne

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    public class IndexOne { }
    
    class IndexMapperOne extends Mapper<LongWritable, Text, Text, IntWritable> {
        String name;
        Text tKey = new Text();
        IntWritable tValue = new IntWritable();
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //获取文件对象
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            //获取文件名
            name = inputSplit.getPath().getName();
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取数据
            String line = value.toString();
            //切分关键词
            String[] words = line.split(" ");
            for (String word : words) {
                //将关键词和文件名字拼接,作为输出的key
                tKey.set(word + "@" + name);
                tValue.set(1);
                context.write(tKey, tValue);
            }
        }
    }
    
    class IndexReduceOne extends Reducer<Text, IntWritable, Text, Text> {
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            String line = key.toString();
            String[] splits = line.split("@");
            StringBuilder builder = new StringBuilder();
            int count = 0;
            for (IntWritable number : values) {
                count += number.get();
            }
            builder.append(splits[1] + "-->" + count);
            context.write(new Text(splits[0]), new Text(builder.toString()));
        }
    }
    

    IndexTwo

    package descIndex;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class IndexTwo {
    
    }
    
    class IndexMapperTwo extends Mapper<LongWritable, Text, Text, Text> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] splits = line.split("	");
            context.write(new Text(splits[0]), new Text(splits[1]));
        }
    }
    
    class IndexReduceTwo extends Reducer<Text, Text, Text, Text> {
    
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuilder builder = new StringBuilder();
            for (Text txt : values) {
                builder.append(txt + "	");
            }
            context.write(key, new Text(builder.toString()));
        }
    }
    

    JobUtils

    package descIndex;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.File;
    import java.io.IOException;
    
    public class JobUtils {
    
        private static Configuration conf;
    
        static {
            conf = new Configuration();
        }
    
        /**
         * 获得job实例
         * @param paths 输入与输出路径数组
         * @param params 类的可变数组
         * @return  job实例
         * @throws IOException
         */
        public static Job getJobInstance(String[] paths, Class... params) throws IOException {
    
            Job job = Job.getInstance(conf);
            job.setJarByClass(params[0]);
            job.setMapperClass(params[1]);
            job.setReducerClass(params[2]);
    
            job.setMapOutputKeyClass(params[3]);
            job.setMapOutputValueClass(params[4]);
            job.setOutputKeyClass(params[5]);
            job.setOutputValueClass(params[6]);
    
            FileInputFormat.setInputPaths(job, new Path(paths[0]));
            FileOutputFormat.setOutputPath(job, new Path(paths[1]));
            return job;
        }
    
    }
    

    IndexDriver

    package descIndex;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
    import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class IndexDriver {
        public static void main(String[] args) throws IOException, InterruptedException {
    
            String[] paths1 = {"F:/index", "F:/outindex1"};
            Job job1 = JobUtils.getJobInstance(paths1,IndexDriver.class,IndexMapperOne.class,IndexReduceOne.class
            ,Text.class,IntWritable.class,Text.class,Text.class);
    
            String[] paths2 = {"F:/outindex1", "F:/outindex2"};
            Job job2 = JobUtils.getJobInstance(paths2,IndexDriver.class,IndexMapperTwo.class,IndexReduceTwo.class
                    ,Text.class,Text.class,Text.class,Text.class);
    
            ControlledJob cjJob1 = new ControlledJob(job1.getConfiguration());
            ControlledJob cjJob2 = new ControlledJob(job2.getConfiguration());
            //创建管理job任务的control组
            JobControl ssrs = new JobControl("ssrs");
            //添加job
            ssrs.addJob(cjJob1);
            ssrs.addJob(cjJob2);
            //添加任务依赖关系
            cjJob2.addDependingJob(cjJob1);
            //线程提交
            Thread t1 = new Thread(ssrs);
            t1.start();
    
            while(!ssrs.allFinished()){
                Thread.sleep(1000);
            }
    
            System.exit(0);
        }
    }
    
  • 相关阅读:
    Outlook同步问题
    Excel下三角图解的绘制
    数据库,SQL,万恶之源?
    新年第一篇
    ArcGIS的GeoProcessing的原理及实现(1)
    如何在多个文件中查找需要的信息
    关于GIS门户(GIS Portal)的概念
    2004总结
    MapViewControl更新
    GCDPlot 0.22 介绍
  • 原文地址:https://www.cnblogs.com/ShadowFiend/p/11880764.html
Copyright © 2011-2022 走看看