zoukankan      html  css  js  c++  java
  • mapreduce (三) MapReduce实现倒排索引(二)

    hadoop api
    http://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/mapreduce/Reducer.html

    改变一下需求:要求“文档词频列表”是经过排序的,即 出现次数高的再前

    思路:


    代码:

    package proj;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class InvertedIndexSortByFreq {
    
        // 将词分为<word:num,docid>
        public static class InvertedIndexMapper extends
                Mapper<Object, Text, Text, Text> {
    
            private Text keyInfo = new Text();
            private Text valInfo = new Text();
            private FileSplit split;
    
            public void map(Object key, Text value, Context context)
                    throws IOException, InterruptedException {
                String[] tokens = value.toString().split(" ");
                split = (FileSplit) context.getInputSplit();
                String docid = split.getPath().getName();
                Map<String, Integer> map = new HashMap<String, Integer>();
                for (String token : tokens) {
                    if (map.containsKey(token)) {
                        Integer newInt = new Integer(map.get(token) + 1);
                        map.put(token, newInt);
                    } else {
                        map.put(token, 1);
                    }
                }
                for (String k : map.keySet()) {
                    Integer num = map.get(k);
                    keyInfo.set(k + ":" + num);
                    valInfo.set(docid);
                    context.write(keyInfo, valInfo);
                }
            }
        }
    
        public static class InvertedIndexPartioner extends
                HashPartitioner<Text, Text> {
    
            private Text term = new Text();
    
            public int getPartition(Text key, Text value, int numReduceTasks) {
                term.set(key.toString().split(":")[0] + ":" + value);
                return super.getPartition(term, value, numReduceTasks);
            }
        }
    
        // 组合成倒排索引文档
        public static class InvertedIndexReducer extends
                Reducer<Text, Text, Text, Text> {
            private Text keyInfo = new Text();
            
            private Text valInfo = new Text();
    
            private String tPrev = null;
    
            private StringBuffer buff = new StringBuffer();
    
            public void reduce(Text key, Iterable<Text> values, Context context)
                    throws IOException, InterruptedException {
    
                String[] tokens = key.toString().split(":");
                String current = tokens[0];
    
                if (tPrev == null) {
                    tPrev = current;
                    for (Text val : values) {
                        buff.append(tokens[1] + ":" + val.toString() + ";");
                    }
                }
                
                if(tPrev.equals(current)){
                    for (Text val : values) {
                        buff.append(tokens[1] + ":" + val.toString() + ";");
                    }
                }else{
                    keyInfo.set(tPrev);
                    valInfo.set(buff.toString());
                    context.write(keyInfo,valInfo);
                    tPrev = current;
                    buff = new StringBuffer();
                    for (Text val : values) {
                        buff.append(tokens[1] + ":" + val.toString() + ";");
                    }
                }
            }
            
            public void cleanup(Context context) throws IOException, InterruptedException{
                keyInfo.set(tPrev);
                valInfo.set(buff.toString());
                context.write(keyInfo,valInfo);
                super.cleanup(context);
            }
            
            
        }
    
        public static void main(String[] args) throws IOException,
                ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args)
                    .getRemainingArgs();
            Job job = new Job(conf, "InvertedIndex");
            job.setJarByClass(InvertedIndex.class);
            job.setMapperClass(InvertedIndexMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setPartitionerClass(InvertedIndexPartioner.class);
            job.setReducerClass(InvertedIndexReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    
    }
  • 相关阅读:
    【http】使用浏览器Cache和http状态码304实现的客户端缓存
    delegate与模式
    用Delegate绕开频繁反射的又一个简单高效的方法
    直接调用、委托与反射调用的性能区别
    Lambda表达式的非Linq用法
    泛型+反射+特性=以静制动
    绕开频繁反射
    不要用错单例模式
    活用接口——反例:MultiKeyDictionary
    jQuery框架总体分析
  • 原文地址:https://www.cnblogs.com/i80386/p/3459429.html
Copyright © 2011-2022 走看看