zoukankan      html  css  js  c++  java
  • (转)带索引的mapReduce

    之前我们跑mapreduce,对某些维度进行统计,都是暴利方式的遍历,有些时候,我们仅仅想扫描原始数据的一部分,或者仅仅是其中的一列,这些数据可能仅仅是原始数据的十分之一,百分之一,那么暴利扫描太不可取了。
            回想下我们之前使用数据库的场景,数据库在扫描的时候通常是利用一些索引, 而并非全表扫描,故mapReduce 程序也可以借助这一特点,先创建索引,然后在索引上的进一步分析,防止扫描全部的原始数据-这将节省很多的IO资源。
            目 前对我们来说最容易使用的文件索引无外乎就是lucene了,而且非常成熟和可靠,但是索引并非splitable,这样对于mapreduce的处理不 是很方便,且大索引频繁的合并开销很大,故我们将索引拆分成一小块一小块的(比如说128M),但是hadoop又不擅长处理特别多的小文件,故我们又将 这些索引合并在了一个统一的大文件里,非常类似hadoop内置的SequenceFile文件的结构,Key为Text格式的文本,用于我们做一些标 记,Value就是真实的索引了。故我们给其起名叫sequenceIndex.
           后续我们又在此基础上,添加了 SequenceIndexOutputFormat和SequenceIndexInputFormat 两个类,用于通过Mapreduce创建索 引,以及在mapreduce中通过索引来进行检索,而不是进行暴力的遍历数据。

    给个简单的创建索引和搜索的例子吧(在这个例子中,每个map扫描的速度,仅仅为8秒,其中还不乏有mapreduce本身的时间)


    public class SequenceIndexExample {
     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
      String type=args[0];
      String input=args[1];
      String output=args[2];
      Integer numreduce=Integer.parseInt(args[3]);
      if(type.equals("create"))
      {
       create(input, output,numreduce);
      }else{
       search(input, output,numreduce);
      }

     }
     private static void search(String input,String output,int numreduce) throws IOException, InterruptedException, ClassNotFoundException
     {
      Job job = new Job(new Configuration());
      job.setInputFormatClass(SequenceIndexInputFormat.class);
      SequenceIndexInputFormat.addInputPath(job, new Path(input));
      job.setMapperClass(IndexMap.class);
      job.setJarByClass(SequenceIndexExample.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
      TextOutputFormat.setOutputPath(job, new Path(output));
      job.setNumReduceTasks(numreduce);
      job.waitForCompletion(true);
     }
     
     private static void create(String input,String output,int numreduce) throws IOException, InterruptedException, ClassNotFoundException
     {
      Job job = new Job(new Configuration());
      FileInputFormat.addInputPath(job, new Path(input));
      job.setJarByClass(SequenceIndexExample.class);
      job.setMapOutputKeyClass(LongWritable.class);
      job.setMapOutputValueClass(Text.class);
      
      job.setReducerClass(IndexReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(HadoopDirectory.class);
      job.setOutputFormatClass(SequenceIndexOutputFormat.class);
      SequenceIndexOutputFormat.setOutputPath(job, new Path(output));
      job.setNumReduceTasks(numreduce);
      job.waitForCompletion(true);
     }
     
     public static class IndexMap extends
       Mapper<Text, HadoopDirectory, Text, Text> {
      protected void map(Text key, HadoopDirectory value, Context context)
        throws IOException, InterruptedException {
       Directory dir = value.getDir();
       IndexReader reader = IndexReader.open(dir);
       StandardAnalyzer an = new StandardAnalyzer(Version.LUCENE_35);
       QueryParser q = new QueryParser(Version.LUCENE_35, "index", an);
       IndexSearcher searcher = new IndexSearcher(reader);
       TopDocs docs;
       try {
        docs = searcher.search(q.parse("index:500"), 20);
       } catch (ParseException e) {
        throw new RuntimeException(e);
       }
       ScoreDoc[] list = docs.scoreDocs;
       if (list != null && list.length > 0) {
        StringBuffer buff = new StringBuffer();
        for (ScoreDoc doc : list) {
         Document document = searcher.doc(doc.doc);
         for (Fieldable f : document.getFields()) {
          buff.append(f.name() + "="
            + document.getFieldable(f.name()).stringValue()
            + ",");
         }
         context.write(key, new Text(buff.toString()));
        }
       }
      }

     }
     public static class IndexReducer extends
        Reducer<LongWritable, Text, Text, HadoopDirectory> {
      boolean setup=false;
        protected void reduce(LongWritable key, Iterable<Text> values,
            Context context) throws java.io.IOException, InterruptedException {
       if(setup)
       {
        return;
       }
       setup=true;
         for(int k=0;k<10000;k++)
         {
          HadoopDirectory hdir=new HadoopDirectory();
          hdir.setDir(new RAMDirectory());
         
          IndexWriter writer = new IndexWriter(hdir.getDir(), null,
                 new KeepOnlyLastCommitDeletionPolicy(),
                 MaxFieldLength.UNLIMITED);
         writer.setUseCompoundFile(false);
         writer.setMergeFactor(2);
        System.out.println(k);
        
        for(int i=0;i<1000;i++)
        {
         Document doc=new Document();
         doc.add(new Field("index", String.valueOf(i), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
         for(int j=0;j<10;j++)
         {
          doc.add(new Field("col"+j, String.valueOf(i)+","+j+","+k, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
         }
         writer.addDocument(doc);
        }
        
        writer.optimize();
        writer.close();
        context.write(new Text(String.valueOf(k)), hdir);
         }
        
        }
      
     }
    }

  • 相关阅读:
    pat甲级 1155 Heap Paths (30 分)
    pat甲级 1152 Google Recruitment (20 分)
    蓝桥杯 基础练习 特殊回文数
    蓝桥杯 基础练习 十进制转十六进制
    蓝桥杯 基础练习 十六进制转十进制
    蓝桥杯 基础练习 十六进制转八进制
    51nod 1347 旋转字符串
    蓝桥杯 入门训练 圆的面积
    蓝桥杯 入门训练 Fibonacci数列
    链表相关
  • 原文地址:https://www.cnblogs.com/end/p/2872020.html
Copyright © 2011-2022 走看看