zoukankan      html  css  js  c++  java
  • MapReduce实战(四)倒排索引的实现

     需求:

     

     

    以上三个文件,用MapReduce进行处理,最终输出以下格式:

    hello c.txt-->2 b.txt-->2 a.txt-->3
    jerry c.txt-->1 b.txt-->3 a.txt-->1
    tom c.txt-->1 b.txt-->1 a.txt-->2

    思考:

    我们需要进行两个步骤:

    1.就是之前的统计单词个数的练习,只不过现在需要加上文件名而已。得到如下效果

    hello-->a.txt 3
    hello-->b.txt 2
    hello-->c.txt 2
    jerry-->a.txt 1
    jerry-->b.txt 3
    jerry-->c.txt 1
    tom-->a.txt 2
    tom-->b.txt 1
    tom-->c.txt 1

    2.将key由hello-->a.txt这种形式转化成hello这种形式,然后进行分组。得到如下效果:

    hello c.txt-->2 b.txt-->2 a.txt-->3
    jerry c.txt-->1 b.txt-->3 a.txt-->1
    tom c.txt-->1 b.txt-->1 a.txt-->2

    文件目录如下:

    InverseIndexStepOne.java:

    package cn.darrenchan.hadoop.mr.ii;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class InverseIndexStepOne {
        public static class StepOneMapper extends
                Mapper<LongWritable, Text, Text, LongWritable> {
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                // 拿到一行数据
                String line = value.toString();
                // 切分出各个单词
                String[] fields = line.split("	");
                // 获取这一行数据所在的文件切片
                FileSplit inputSplit = (FileSplit) context.getInputSplit();
                // 从文件切片中获取文件名
                String fileName = inputSplit.getPath().getName();
                for (String field : fields) {
                    // 封装kv输出 , k : hello-->a.txt v: 1
                    context.write(new Text(field + "-->" + fileName),
                            new LongWritable(1));
                }
            }
        }
    
        public static class StepOneReducer extends
                Reducer<Text, LongWritable, Text, LongWritable> {
            @Override
            protected void reduce(Text key, Iterable<LongWritable> values,
                    Context context) throws IOException, InterruptedException {
                int count = 0;
                for (LongWritable value : values) {
                    count += value.get();
                }
                // <hello-->a.txt,{1,1,1....}>
                context.write(key, new LongWritable(count));
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(InverseIndexStepOne.class);
    
            job.setMapperClass(StepOneMapper.class);
            job.setReducerClass(StepOneReducer.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
    
            //检查一下参数所指定的输出路径是否存在,如果已存在,先删除
            Path outputPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(conf);
            if (fileSystem.exists(outputPath)) {
                fileSystem.delete(outputPath, true);
            }
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, outputPath);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    
    }

    InverseIndexStepTwo.java:

    package cn.darrenchan.hadoop.mr.ii;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class InverseIndexStepTwo {
        // k: 行起始偏移量 v: {hello-->a.txt 3}
        // map输出的结果是这个形式 : <hello,a.txt-->3>
        public static class StepTwoMapper extends
                Mapper<LongWritable, Text, Text, Text> {
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] fields = line.split("-->");
                String[] strings = fields[1].split("	");
                context.write(new Text(fields[0]), new Text(strings[0] + "-->"
                        + strings[1]));
            }
        }
    
        // 拿到的数据 <hello,{a.txt-->3,b.txt-->2,c.txt-->1}>
        // 输出的结果就是 k: hello v: a.txt-->3 b.txt-->2 c.txt-->1
        public static class StepTwoReducer extends Reducer<Text, Text, Text, Text> {
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context)
                    throws IOException, InterruptedException {
                String result = "    ";
                for (Text value : values) {
                    result += value + "    ";
                }
                context.write(key, new Text(result));
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(InverseIndexStepTwo.class);
    
            job.setMapperClass(StepTwoMapper.class);
            job.setReducerClass(StepTwoReducer.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            // 检查一下参数所指定的输出路径是否存在,如果已存在,先删除
            Path outputPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(conf);
            if (fileSystem.exists(outputPath)) {
                fileSystem.delete(outputPath, true);
            }
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, outputPath);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    
    }

    首先将三个文件传到HDFS的/ii/srcdata目录下面,执行指令:

    hadoop jar ii.jar cn.darrenchan.hadoop.mr.ii.InverseIndexStepOne /ii/srcdata /ii/output1

    打印运行信息:

    17/03/01 17:55:38 INFO client.RMProxy: Connecting to ResourceManager at weekend110/192.168.230.134:8032
    17/03/01 17:55:38 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    17/03/01 17:55:39 INFO input.FileInputFormat: Total input paths to process : 3
    17/03/01 17:55:39 INFO mapreduce.JobSubmitter: number of splits:3
    17/03/01 17:55:40 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1488372977056_0001
    17/03/01 17:55:41 INFO impl.YarnClientImpl: Submitted application application_1488372977056_0001
    17/03/01 17:55:41 INFO mapreduce.Job: The url to track the job: http://weekend110:8088/proxy/application_1488372977056_0001/
    17/03/01 17:55:41 INFO mapreduce.Job: Running job: job_1488372977056_0001
    17/03/01 17:55:52 INFO mapreduce.Job: Job job_1488372977056_0001 running in uber mode : false
    17/03/01 17:55:52 INFO mapreduce.Job: map 0% reduce 0%
    17/03/01 17:56:11 INFO mapreduce.Job: map 33% reduce 0%
    17/03/01 17:56:12 INFO mapreduce.Job: map 100% reduce 0%
    17/03/01 17:56:18 INFO mapreduce.Job: map 100% reduce 100%
    17/03/01 17:56:18 INFO mapreduce.Job: Job job_1488372977056_0001 completed successfully
    17/03/01 17:56:18 INFO mapreduce.Job: Counters: 49
    File System Counters
    FILE: Number of bytes read=382
    FILE: Number of bytes written=372665
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=402
    HDFS: Number of bytes written=138
    HDFS: Number of read operations=12
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=2
    Job Counters
    Launched map tasks=3
    Launched reduce tasks=1
    Data-local map tasks=3
    Total time spent by all maps in occupied slots (ms)=51196
    Total time spent by all reduces in occupied slots (ms)=3018
    Total time spent by all map tasks (ms)=51196
    Total time spent by all reduce tasks (ms)=3018
    Total vcore-seconds taken by all map tasks=51196
    Total vcore-seconds taken by all reduce tasks=3018
    Total megabyte-seconds taken by all map tasks=52424704
    Total megabyte-seconds taken by all reduce tasks=3090432
    Map-Reduce Framework
    Map input records=8
    Map output records=16
    Map output bytes=344
    Map output materialized bytes=394
    Input split bytes=312
    Combine input records=0
    Combine output records=0
    Reduce input groups=9
    Reduce shuffle bytes=394
    Reduce input records=16
    Reduce output records=9
    Spilled Records=32
    Shuffled Maps =3
    Failed Shuffles=0
    Merged Map outputs=3
    GC time elapsed (ms)=1077
    CPU time spent (ms)=6740
    Physical memory (bytes) snapshot=538701824
    Virtual memory (bytes) snapshot=1450766336
    Total committed heap usage (bytes)=379793408
    Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
    File Input Format Counters
    Bytes Read=90
    File Output Format Counters
    Bytes Written=138

    运行结果如下:

    hello-->a.txt 3
    hello-->b.txt 2
    hello-->c.txt 2
    jerry-->a.txt 1
    jerry-->b.txt 3
    jerry-->c.txt 1
    tom-->a.txt 2
    tom-->b.txt 1
    tom-->c.txt 1

    执行指令:

    hadoop jar ii.jar cn.darrenchan.hadoop.mr.ii.InverseIndexStepTwo /ii/output1 /ii/output2

    打印运行信息: 

    17/03/01 18:03:31 INFO client.RMProxy: Connecting to ResourceManager at weekend110/192.168.230.134:8032
    17/03/01 18:03:31 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    17/03/01 18:03:31 INFO input.FileInputFormat: Total input paths to process : 1
    17/03/01 18:03:31 INFO mapreduce.JobSubmitter: number of splits:1
    17/03/01 18:03:32 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1488372977056_0003
    17/03/01 18:03:32 INFO impl.YarnClientImpl: Submitted application application_1488372977056_0003
    17/03/01 18:03:32 INFO mapreduce.Job: The url to track the job: http://weekend110:8088/proxy/application_1488372977056_0003/
    17/03/01 18:03:32 INFO mapreduce.Job: Running job: job_1488372977056_0003
    17/03/01 18:03:38 INFO mapreduce.Job: Job job_1488372977056_0003 running in uber mode : false
    17/03/01 18:03:38 INFO mapreduce.Job: map 0% reduce 0%
    17/03/01 18:03:43 INFO mapreduce.Job: map 100% reduce 0%
    17/03/01 18:03:47 INFO mapreduce.Job: map 100% reduce 100%
    17/03/01 18:03:48 INFO mapreduce.Job: Job job_1488372977056_0003 completed successfully
    17/03/01 18:03:48 INFO mapreduce.Job: Counters: 49
    File System Counters
    FILE: Number of bytes read=162
    FILE: Number of bytes written=185553
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=249
    HDFS: Number of bytes written=112
    HDFS: Number of read operations=6
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=2
    Job Counters
    Launched map tasks=1
    Launched reduce tasks=1
    Data-local map tasks=1
    Total time spent by all maps in occupied slots (ms)=2605
    Total time spent by all reduces in occupied slots (ms)=2725
    Total time spent by all map tasks (ms)=2605
    Total time spent by all reduce tasks (ms)=2725
    Total vcore-seconds taken by all map tasks=2605
    Total vcore-seconds taken by all reduce tasks=2725
    Total megabyte-seconds taken by all map tasks=2667520
    Total megabyte-seconds taken by all reduce tasks=2790400
    Map-Reduce Framework
    Map input records=9
    Map output records=9
    Map output bytes=138
    Map output materialized bytes=162
    Input split bytes=111
    Combine input records=0
    Combine output records=0
    Reduce input groups=3
    Reduce shuffle bytes=162
    Reduce input records=9
    Reduce output records=3
    Spilled Records=18
    Shuffled Maps =1
    Failed Shuffles=0
    Merged Map outputs=1
    GC time elapsed (ms)=138
    CPU time spent (ms)=820
    Physical memory (bytes) snapshot=218480640
    Virtual memory (bytes) snapshot=726454272
    Total committed heap usage (bytes)=137433088
    Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
    File Input Format Counters
    Bytes Read=138
    File Output Format Counters
    Bytes Written=112

    运行结果如下:

    hello c.txt-->2 b.txt-->2 a.txt-->3
    jerry c.txt-->1 b.txt-->3 a.txt-->1
    tom c.txt-->1 b.txt-->1 a.txt-->2

  • 相关阅读:
    Network (poj1144)
    C. Hongcow Builds A Nation
    ZYB loves Xor I(hud5269)
    D. Chloe and pleasant prizes
    Game(hdu5218)
    约瑟夫环的递推方法
    Misaki's Kiss again(hdu5175)
    Exploration(hdu5222)
    B. Arpa's weak amphitheater and Mehrdad's valuable Hoses
    C. Arpa's loud Owf and Mehrdad's evil plan
  • 原文地址:https://www.cnblogs.com/DarrenChan/p/6479053.html
Copyright © 2011-2022 走看看