zoukankan      html  css  js  c++  java
  • MapReduce入门(三)倒排索引

    什么是倒排索引?

               倒排索引源于实际应用中需要根据属性的值来查找记录。这种索引表中的每一项都包括一个属性值和具有该属性值的各记录的地址。由于不是由记录来确定属性值,而是由属性值来确定记录的位置,因而称为倒排索引(inverted index)。带有倒排索引的文件我们称为倒排索引文件,简称倒排文件(inverted file)。

    我感觉搜索引擎的原理就是倒排索引,或者正排索引。那么就让我们解密吧!

    一、创建ReverseApp类:用于实现实现倒排索引

    package com.day02;
    
    import com.google.common.io.Resources;
    import com.utils.CDUPUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import org.apache.hadoop.fs.Path;
    
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    /**
     * 用MapReduce实现倒排索引
     */
    public class ReverseApp {
        public static class ReverseMapper extends Mapper<LongWritable,Text,Text, Text> {
            //map方法每次执行一行数据,会被循环调用map方法(有多少行就调用多少次)
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                //获取key所在的文件名
                Path path = ((FileSplit) context.getInputSplit()).getPath();
                String fileName = path.toString();
                String[] strings = value.toString().split(" ");
                for (int i = 0; i < strings.length; i++) {
                    context.write(new Text(strings[i]),new Text(fileName));
                }
            }
        }
    
        public static class ReverseReducer extends Reducer<Text, Text, Text, Text> {
            //每次处理一个key,会被循环调用,有多少个key就会调用几次
            //获取map处理的数据 hello 集合(用于存储key相同的value--1)
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                Iterator<Text> iterator = values.iterator();
                StringBuffer buffer = new StringBuffer();
                while (iterator.hasNext()) {
                    Text fileName = iterator.next();
                    StringBuffer append = buffer.append(fileName.toString()).append("--");
                }
                context.write(key,new Text(String.valueOf(buffer)));
            }
        }
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration coreSiteConf = new Configuration();
            coreSiteConf.addResource(Resources.getResource("core-site-master2.xml"));
            //设置一个任务,后面是job的名称
            Job job = Job.getInstance(coreSiteConf, "Reverse");
            //将打的jar包自动上传临时目录,运行之后就删除了--自己看不到
            job.setJar("mrdemo/target/mrdemo-1.0-SNAPSHOT.jar");
    
            //设置Map和Reduce处理类
            job.setMapperClass(ReverseMapper.class);
            job.setReducerClass(ReverseReducer.class);
    
            //设置map输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            //设置job/reduce输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            //设置任务的输入路径
            FileInputFormat.addInputPath(job, new Path("/wc"));
            //设置任务的输出路径--保存结果(这个目录必须是不存在的目录)
            //删除存在的文件
            CDUPUtils.deleteFileName("/reout");
    
            FileOutputFormat.setOutputPath(job, new Path("/reout"));
            //运行任务 true:表示打印详情
            boolean flag = job.waitForCompletion(true);
            if (flag){
                System.out.println("使用mapreduce来倒排索引操作的结果...");
                CDUPUtils.readContent("/reout/part-r-00000");
            }else {
                System.out.println(flag+",文件加载失败");
            }
        }
    }

    二、里面用到自己写的工具类CDUPUtils :用于删除已存在目录以及阅读文件内容

    package com.utils;
     
    import com.google.common.io.Resources;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
     
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.LineNumberReader;
    import java.util.ArrayList;
     
    public class CDUPUtils {
        //删除已经存在在hdfs上面的文件文件
        public static void deleteFileName(String path) throws IOException {
            //将要删除的文件
            Path fileName = new Path(path);
            Configuration entries = new Configuration();
            //解析core-site-master2.xml文件
            entries.addResource(Resources.getResource("core-site-local.xml"));
            //coreSiteConf.set(,);--在这里可以添加配置文件
            FileSystem fileSystem = FileSystem.get(entries);
            if (fileSystem.exists(fileName)){
                System.out.println(fileName+"已经存在,正在删除它...");
                boolean flag = fileSystem.delete(fileName, true);
                if (flag){
                    System.out.println(fileName+"删除成功");
                }else {
                    System.out.println(fileName+"删除失败!");
                    return;
                }
            }
            //关闭资源
            fileSystem.close();
        }
     
        //读取文件内容
        public static void readContent(String path) throws IOException {
            //将要读取的文件路径
            Path fileName = new Path(path);
            ArrayList<String> returnValue = new ArrayList<String>();
            Configuration configuration = new Configuration();
            configuration.addResource(Resources.getResource("core-site-local.xml"));
            //获取客户端系统文件
            FileSystem fileSystem = FileSystem.get(configuration);
            //open打开文件--获取文件的输入流用于读取数据
            FSDataInputStream inputStream = fileSystem.open(fileName);
            InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
            //一行一行的读取数据
            LineNumberReader lineNumberReader = new LineNumberReader(inputStreamReader);
            //定义一个字符串变量用于接收每一行的数据
            String str = null;
            //判断何时没有数据
            while ((str=lineNumberReader.readLine())!=null){
                returnValue.add(str);
            }
            //打印数据到控制台
            System.out.println("MapReduce算法操作的文件内容如下:");
            for (String read :
                    returnValue) {
                System.out.println(read);
            }
            //关闭资源
            lineNumberReader.close();
            inputStream.close();
            inputStreamReader.close();
        }
    }

    三、配置文件:cort-site-master2.xml--注意里面的主机IP需要填写自己的

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
        <property>
            <name>fs.defaultFS</name>
            <value>hdfs://192.168.228.13:9000</value>
        </property>
        <property>
            <name>fs.hdfs.impl</name>
            <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
        </property>
        <property>
            <name>mapreduce.framework.name</name>
            <value>yarn</value>
        </property>
        <property>
            <name>yarn.resourcemanager.scheduler.address</name>
            <value>192.168.228.13:8030</value>
        </property>
        <property>
            <name>mapreduce.app-submission.cross-platform</name>
            <value>true</value>
        </property>
    
        <property>
            <description>The address of the applications manager interface in the RM.</description>
            <name>yarn.resourcemanager.address</name>
            <value>192.168.228.13:8032</value>
        </property>
        <property>
            <description>The address of the scheduler interface.</description>
            <name>yarn.resourcemanager.scheduler.address</name>
            <value>192.168.228.13:8030</value>
        </property>
        <property>
            <description>The http address of the RM web application.</description>
            <name>yarn.resourcemanager.webapp.address</name>
            <value>192.168.228.13:8088</value>
        </property>
        <property>
            <description>The https adddress of the RM web application.</description>
            <name>yarn.resourcemanager.webapp.https.address</name>
            <value>192.168.228.13:8090</value>
        </property>
        <property>
            <name>yarn.resourcemanager.resource-tracker.address</name>
            <value>192.168.228.13:8031</value>
        </property>
        <property>
            <description>The address of the RM admin interface.</description>
            <name>yarn.resourcemanager.admin.address</name>
            <value>192.168.228.13:8033</value>
        </property>
        <property>
            <name>yarn.nodemanager.aux-services</name>
            <value>mapreduce_shuffle</value>
        </property>
    </configuration>

    四、在pom文件中添加的依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.zhiyou100</groupId>
        <artifactId>mrdemo</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <org.apache.hadoop.version>2.7.5</org.apache.hadoop.version>
        </properties>
    
        <!--分布式计算-->
        <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>${org.apache.hadoop.version}</version>
            </dependency>
                <dependency>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-mapreduce-client-common</artifactId>
                    <version>${org.apache.hadoop.version}</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
                    <version>${org.apache.hadoop.version}</version>
                </dependency>
    
            <!--分布式存储-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${org.apache.hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${org.apache.hadoop.version}</version>
            </dependency>
    
            <!--数据库驱动-->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.46</version>
            </dependency>
            </dependencies>
    </project>

    五、测试:在本地将jar包自动上传到hdfs上运行,(运行时间长)

    在hdfs上保证

    1.有/wc文件夹--文件夹中的文件内容以空格隔开

    例如:我在/wc目录下有一个a.txt,a.txt中内容如下

    2.在hdfs上保证没有/reout目录

    在打jar包之前需要将第三方jar包放在lib中(在resources中创建lib文件),再打包。

    以上操作之后在本地直接运行(右击Run),就会出现类似下面的内容

  • 相关阅读:
    Mysql 5.7解压版安装
    Java Web 整合案例
    maven 创建Java web项目
    LintCode 数字三角形
    Hibernate 泛型Dao实现
    LintCode 将二叉查找树转换成双链表
    LintCode 删除链表中倒数第n个节点
    LintCode 二级制中有多少个1
    LintCode翻转二叉树
    SpringMVC 运行流程
  • 原文地址:https://www.cnblogs.com/pigdata/p/10305598.html
Copyright © 2011-2022 走看看