如果想利用mapreduce并行计算的话,不能指望所有都是hadoop来跟你做,hadoop不可能跟你做很多事情。看下面一个问题:
x1 / 1 + x2 / 2 + x3 / 3 + …… + xn / n = 1
求出当n = 10时的所有正整数解。
先确定我们要采用的办法,就是搜索穷举。搜出n个未知数的解。
例如我们想用四个mapper来执行,就将输入文件划分为四个split就行了。例如,输入文件为:
1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9 10 10
这10个值,为x1可能的10个值。也就是说,我们将搜索空间,划分为这10个,然后来用mapper来并行计算。但是,如果直接将这个文件扔到hdfs里,hadoop是不会跟我们开四个mapper来并行计算的,因为这个文件太小了。那么我们可以重写InputSplit,对输入文件进行划分,达到“并行计算”的效果。
那么,我们将这个文件按行进行划分,每三行为一个块,这样就把输入文件划分为四个块,对应会有四个mapper来处理。下面看一下代码:
1 package seven.ili; 2 3 import org.apache.hadoop.classification.InterfaceAudience; 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.FSDataInputStream; 6 import org.apache.hadoop.fs.FileStatus; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 12 import org.apache.hadoop.mapred.JobConf; 13 import org.apache.hadoop.mapreduce.InputSplit; 14 import org.apache.hadoop.mapreduce.JobContext; 15 import org.apache.hadoop.mapreduce.RecordReader; 16 import org.apache.hadoop.mapreduce.TaskAttemptContext; 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18 import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; 19 import org.apache.hadoop.util.LineReader; 20 21 import java.io.FileInputStream; 22 import java.io.FileNotFoundException; 23 import java.io.IOException; 24 import java.util.ArrayList; 25 import java.util.List; 26 27 /** 28 * Created with IntelliJ IDEA. 29 * User: Isaac Li 30 * Date: 12/20/12 31 * Time: 1:25 PM 32 * To change this template use File | Settings | File Templates. 33 */ 34 public class SplitByLine extends FileInputFormat<LongWritable, Text> { 35 private int N = 3; 36 37 @Override 38 public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { 39 return new LineRecordReader(); 40 } 41 42 public List<InputSplit> getSplits(JobContext job) throws IOException { 43 List<InputSplit> splits = new ArrayList<InputSplit>(); 44 for (FileStatus file: listStatus(job)){ 45 Path path = file.getPath(); 46 FileSystem fs = path.getFileSystem(job.getConfiguration()); 47 LineReader lr = null; 48 try{ 49 FSDataInputStream in = fs.open(path); 50 Configuration conf = job.getConfiguration(); 51 lr = new LineReader(in, conf); 52 //N = conf.getInt("mapred.line.input.format.linespermap", 3); 53 Text line = new Text(); 54 int numLines = 0; 55 long begin = 0; 56 long length = 0; 57 int num = -1; 58 while((num = lr.readLine(line)) > 0){ 59 numLines++; 60 length += num; 61 if (numLines == N){ 62 splits.add(new FileSplit(path, begin, length, new String[]{})); 63 begin += length; 64 length = 0; 65 numLines = 0; 66 } 67 } 68 if (numLines != 0) { 69 splits.add(new FileSplit(path, begin, length, new String[]{})); 70 } 71 }finally { 72 if (lr != null){ 73 lr.close(); 74 } 75 } 76 } 77 System.out.println("Total # of splits: " + splits.size()); 78 return splits; 79 } 80 }
然后在Job class的main方法中,加上一句设置语句,
job.setInputFormatClass(SplitByLine.class);
到此,我们的第一阶段的任务完成了,将原始问题的解空间,划分成了四个,来进来并行计算。下面,在子空间中的搜索就交给mapper来执行了。这个方法就自己去写吧,一个搜索的算法。我就不写出了。对了,你可以用:
job.setNumReduceTasks(0);
这一条语句,先将reduce task num 设置为0, 你就可以看到mapper的输出,是不是有四个文件?一共有四个mapper?
自己去验证吧。
参考:
http://www.hadoopor.com/viewthread.php?tid=1242
揭秘InputFormat:掌控Map Reduce任务执行的利器: http://www.infoq.com/cn/articles/HadoopInputFormat-map-reduce