zoukankan      html  css  js  c++  java
  • 【Hadoop】Hadoop MR 如何实现倒排索引算法?

    1、概念、方案

    2、代码示例

    InverseIndexOne

    package com.ares.hadoop.mr.inverseindex;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.StringUtils;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.apache.log4j.Logger;
    
    public class InverseIndexOne extends Configured implements Tool {
        
        private static final Logger LOGGER = Logger.getLogger(InverseIndexOne.class);
        enum Counter {
            LINESKIP
        }
        
        public static class InverseIndexOneMapper 
            extends Mapper<LongWritable, Text, Text, LongWritable> {
            
            private String line;
            private final static char separatorA = ' ';
            private final static char separatorB = '-';
            private String fileName;
            
            private Text text = new Text();
            private final static LongWritable ONE = new LongWritable(1L);
            
            @Override
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                //super.map(key, value, context);
                try {
                    line = value.toString();
                    String[] fields = StringUtils.split(line, separatorA);
                    
                    FileSplit fileSplit = (FileSplit) context.getInputSplit();
                    fileName = fileSplit.getPath().getName();
                    
                    for (int i = 0; i < fields.length; i++) {
                        text.set(fields[i] + separatorB + fileName);
                        context.write(text, ONE);
                    }
                } catch (Exception e) {
                    // TODO: handle exception
                    LOGGER.error(e);
                    System.out.println(e);
                    context.getCounter(Counter.LINESKIP).increment(1);
                    return;
                }            
            }
        }
        
        public static class InverseIndexOneReducer 
        extends Reducer<Text, LongWritable, Text, LongWritable> {
            private LongWritable result = new LongWritable();
            
            @Override
            protected void reduce(Text key, Iterable<LongWritable> values,
                    Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                //super.reduce(arg0, arg1, arg2);
                long count = 0;
                for (LongWritable value : values) {
                    count += value.get();
                }
                result.set(count);
                context.write(key, result);
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
            // TODO Auto-generated method stub
            //return 0;
            String errMsg = "InverseIndexOne: TEST STARTED...";
            LOGGER.debug(errMsg);
            System.out.println(errMsg);
            
            Configuration conf = new Configuration();
            //FOR Eclipse JVM Debug  
            //conf.set("mapreduce.job.jar", "flowsum.jar");
            Job job = Job.getInstance(conf);
            
            // JOB NAME
            job.setJobName("InverseIndexOne");
            
            // JOB MAPPER & REDUCER
            job.setJarByClass(InverseIndexOne.class);
            job.setMapperClass(InverseIndexOneMapper.class);
            job.setReducerClass(InverseIndexOneReducer.class);
            
            // JOB PARTITION
            //job.setPartitionerClass(FlowGroupPartition.class);
            
            // JOB REDUCE TASK NUMBER
            //job.setNumReduceTasks(5);
            
            // MAP & REDUCE
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            // MAP
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            
            // JOB INPUT & OUTPUT PATH
            //FileInputFormat.addInputPath(job, new Path(args[0]));
            FileInputFormat.setInputPaths(job, args[1]);
            Path output = new Path(args[2]);
    //        FileSystem fs = FileSystem.get(conf);
    //        if (fs.exists(output)) {
    //            fs.delete(output, true);
    //        }
            FileOutputFormat.setOutputPath(job, output);
            
            // VERBOSE OUTPUT
            if (job.waitForCompletion(true)) {
                errMsg = "InverseIndexOne: TEST SUCCESSFULLY...";
                LOGGER.debug(errMsg);
                System.out.println(errMsg);
                return 0;
            } else {
                errMsg = "InverseIndexOne: TEST FAILED...";
                LOGGER.debug(errMsg);
                System.out.println(errMsg);
                return 1;
            }            
        }
        
        public static void main(String[] args) throws Exception {
            if (args.length != 3) {
                String errMsg = "InverseIndexOne: ARGUMENTS ERROR";
                LOGGER.error(errMsg);
                System.out.println(errMsg);
                System.exit(-1);
            }
            
            int result = ToolRunner.run(new Configuration(), new InverseIndexOne(), args);
            System.exit(result);
        }
    }

    InverseIndexTwo

    package com.ares.hadoop.mr.inverseindex;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.StringUtils;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.apache.log4j.Logger;
    
    public class InverseIndexTwo extends Configured implements Tool{
        private static final Logger LOGGER = Logger.getLogger(InverseIndexOne.class);
        enum Counter {
            LINESKIP
        }
        
        public static class InverseIndexTwoMapper extends 
        Mapper<LongWritable, Text, Text, Text> {
            
            private String line;
            private final static char separatorA = '	';
            private final static char separatorB = '-';        
            
            private Text textKey = new Text();
            private Text textValue = new Text();
            
            @Override
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                //super.map(key, value, context);
                try {
                    line = value.toString();
                    String[] fields = StringUtils.split(line, separatorA);
                    String[] wordAndfileName = StringUtils.split(fields[0], separatorB);
                    long count = Long.parseLong(fields[1]);
                    String word = wordAndfileName[0];
                    String fileName = wordAndfileName[1];
                    
                    textKey.set(word);
                    textValue.set(fileName + separatorB + count);
                    context.write(textKey, textValue);
                } catch (Exception e) {
                    // TODO: handle exception
                    LOGGER.error(e);
                    System.out.println(e);
                    context.getCounter(Counter.LINESKIP).increment(1);
                    return;
                }            
            }
        }
        
        public static class InverseIndexTwoReducer extends 
        Reducer<Text, Text, Text, Text> {
            
            private Text textValue = new Text();
            
            @Override
            protected void reduce(Text key, Iterable<Text> values,
                    Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                //super.reduce(arg0, arg1, arg2);
                StringBuilder index = new StringBuilder("");
    //            for (Text text : values) {
    //                if (condition) {
    //                    
    //                }
    //                index.append(text.toString() + separatorA);
    //            }
                String separatorA = "";
                for (Text text : values) {
                    index.append(separatorA + text.toString());
                    separatorA = ",";
                }
                textValue.set(index.toString());
                context.write(key, textValue);
            }
        }
        
        @Override
        public int run(String[] args) throws Exception {
            // TODO Auto-generated method stub
            //return 0;
            String errMsg = "InverseIndexTwo: TEST STARTED...";
            LOGGER.debug(errMsg);
            System.out.println(errMsg);
            
            Configuration conf = new Configuration();
            //FOR Eclipse JVM Debug  
            //conf.set("mapreduce.job.jar", "flowsum.jar");
            Job job = Job.getInstance(conf);
            
            // JOB NAME
            job.setJobName("InverseIndexTwo");
            
            // JOB MAPPER & REDUCER
            job.setJarByClass(InverseIndexTwo.class);
            job.setMapperClass(InverseIndexTwoMapper.class);
            job.setReducerClass(InverseIndexTwoReducer.class);
            
            // JOB PARTITION
            //job.setPartitionerClass(FlowGroupPartition.class);
            
            // JOB REDUCE TASK NUMBER
            //job.setNumReduceTasks(5);
            
            // MAP & REDUCE
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            // MAP
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            // JOB INPUT & OUTPUT PATH
            //FileInputFormat.addInputPath(job, new Path(args[0]));
            FileInputFormat.setInputPaths(job, args[1]);
            Path output = new Path(args[2]);
    //        FileSystem fs = FileSystem.get(conf);
    //        if (fs.exists(output)) {
    //            fs.delete(output, true);
    //        }
            FileOutputFormat.setOutputPath(job, output);
            
            // VERBOSE OUTPUT
            if (job.waitForCompletion(true)) {
                errMsg = "InverseIndexTwo: TEST SUCCESSFULLY...";
                LOGGER.debug(errMsg);
                System.out.println(errMsg);
                return 0;
            } else {
                errMsg = "InverseIndexTwo: TEST FAILED...";
                LOGGER.debug(errMsg);
                System.out.println(errMsg);
                return 1;
            }            
        }
        
        public static void main(String[] args) throws Exception {
            if (args.length != 3) {
                String errMsg = "InverseIndexOne: ARGUMENTS ERROR";
                LOGGER.error(errMsg);
                System.out.println(errMsg);
                System.exit(-1);
            }
            
            int result = ToolRunner.run(new Configuration(), new InverseIndexTwo(), args);
            System.exit(result);
        }
    
    }

    参考资料:

    How to check if processing the last item in an Iterator?:http://stackoverflow.com/questions/9633991/how-to-check-if-processing-the-last-item-in-an-iterator

  • 相关阅读:
    CentOS安装Docker
    Spring内异常 application exception overridden by commit exception
    SVN里恢复到某一天的版本操作
    BootStrap如何支持多模态框弹窗
    Finally使用时报"finally block does not complete normally"
    dom4j解析XML文件
    避免先查询再插入的解决方案
    List<T>转换Datable 数据
    DataTable转Json Json转DataTable
    sql 自动生成编号函数
  • 原文地址:https://www.cnblogs.com/junneyang/p/5852074.html
Copyright © 2011-2022 走看看