之前我们跑mapreduce,对某些维度进行统计,都是暴利方式的遍历,有些时候,我们仅仅想扫描原始数据的一部分,或者仅仅是其中的一列,这些数据可能仅仅是原始数据的十分之一,百分之一,那么暴利扫描太不可取了。
回想下我们之前使用数据库的场景,数据库在扫描的时候通常是利用一些索引, 而并非全表扫描,故mapReduce 程序也可以借助这一特点,先创建索引,然后在索引上的进一步分析,防止扫描全部的原始数据-这将节省很多的IO资源。
目
前对我们来说最容易使用的文件索引无外乎就是lucene了,而且非常成熟和可靠,但是索引并非splitable,这样对于mapreduce的处理不
是很方便,且大索引频繁的合并开销很大,故我们将索引拆分成一小块一小块的(比如说128M),但是hadoop又不擅长处理特别多的小文件,故我们又将
这些索引合并在了一个统一的大文件里,非常类似hadoop内置的SequenceFile文件的结构,Key为Text格式的文本,用于我们做一些标
记,Value就是真实的索引了。故我们给其起名叫sequenceIndex.
后续我们又在此基础上,添加了
SequenceIndexOutputFormat和SequenceIndexInputFormat 两个类,用于通过Mapreduce创建索
引,以及在mapreduce中通过索引来进行检索,而不是进行暴力的遍历数据。
给个简单的创建索引和搜索的例子吧(在这个例子中,每个map扫描的速度,仅仅为8秒,其中还不乏有mapreduce本身的时间)
public class SequenceIndexExample {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
String type=args[0];
String input=args[1];
String output=args[2];
Integer numreduce=Integer.parseInt(args[3]);
if(type.equals("create"))
{
create(input, output,numreduce);
}else{
search(input, output,numreduce);
}
private static void search(String input,String output,int numreduce) throws IOException, InterruptedException, ClassNotFoundException
{
Job job = new Job(new Configuration());
job.setInputFormatClass(SequenceIndexInputFormat.class);
SequenceIndexInputFormat.addInputPath(job, new Path(input));
job.setMapperClass(IndexMap.class);
job.setJarByClass(SequenceIndexExample.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
TextOutputFormat.setOutputPath(job, new Path(output));
job.setNumReduceTasks(numreduce);
job.waitForCompletion(true);
}
{
Job job = new Job(new Configuration());
FileInputFormat.addInputPath(job, new Path(input));
job.setJarByClass(SequenceIndexExample.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(HadoopDirectory.class);
job.setOutputFormatClass(SequenceIndexOutputFormat.class);
SequenceIndexOutputFormat.setOutputPath(job, new Path(output));
job.setNumReduceTasks(numreduce);
job.waitForCompletion(true);
}
public static class IndexMap extends
Mapper<Text, HadoopDirectory, Text, Text> {
throws IOException, InterruptedException {
Directory dir = value.getDir();
IndexReader reader = IndexReader.open(dir);
StandardAnalyzer an = new StandardAnalyzer(Version.LUCENE_35);
QueryParser q = new QueryParser(Version.LUCENE_35, "index", an);
IndexSearcher searcher = new IndexSearcher(reader);
try {
docs = searcher.search(q.parse("index:500"), 20);
} catch (ParseException e) {
throw new RuntimeException(e);
}
if (list != null && list.length > 0) {
StringBuffer buff = new StringBuffer();
for (ScoreDoc doc : list) {
Document document = searcher.doc(doc.doc);
for (Fieldable f : document.getFields()) {
buff.append(f.name() + "="
+ document.getFieldable(f.name()).stringValue()
+ ",");
}
context.write(key, new Text(buff.toString()));
}
}
}
}
public static class IndexReducer extends
Reducer<LongWritable, Text, Text, HadoopDirectory> {
boolean setup=false;
protected void reduce(LongWritable key, Iterable<Text> values,
Context context) throws java.io.IOException, InterruptedException {
if(setup)
{
return;
}
setup=true;
for(int k=0;k<10000;k++)
{
HadoopDirectory hdir=new HadoopDirectory();
hdir.setDir(new RAMDirectory());
IndexWriter writer = new IndexWriter(hdir.getDir(), null,
new KeepOnlyLastCommitDeletionPolicy(),
MaxFieldLength.UNLIMITED);
writer.setUseCompoundFile(false);
writer.setMergeFactor(2);
System.out.println(k);
for(int i=0;i<1000;i++)
{
Document doc=new Document();
doc.add(new Field("index", String.valueOf(i), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
for(int j=0;j<10;j++)
{
doc.add(new Field("col"+j, String.valueOf(i)+","+j+","+k, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
}
writer.addDocument(doc);
}
writer.optimize();
writer.close();
context.write(new Text(String.valueOf(k)), hdir);
}
}
}
}