这两天一直在研究用hadoop进行并行计算的事情,既然要并行,就逃不过将大问题划分成小问题这一步。所以hadoop里的InputFormat是非常关键的。通常有把输入文件按单个文件一个split来划分 ,也有按记录的行来划分。下面我介绍按行划分的代码,这里所谓的按行划分,就是将输入行按每N行划分为一个split。
1 package seven.ili; 2 3 import org.apache.hadoop.classification.InterfaceAudience; 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.FSDataInputStream; 6 import org.apache.hadoop.fs.FileStatus; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 12 import org.apache.hadoop.mapred.JobConf; 13 import org.apache.hadoop.mapreduce.InputSplit; 14 import org.apache.hadoop.mapreduce.JobContext; 15 import org.apache.hadoop.mapreduce.RecordReader; 16 import org.apache.hadoop.mapreduce.TaskAttemptContext; 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18 import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; 19 import org.apache.hadoop.util.LineReader; 20 21 import java.io.FileInputStream; 22 import java.io.FileNotFoundException; 23 import java.io.IOException; 24 import java.util.ArrayList; 25 import java.util.List; 26 27 /** 28 * Created with IntelliJ IDEA. 29 * User: Isaac Li 30 * Date: 12/20/12 31 * Time: 1:25 PM 32 * To change this template use File | Settings | File Templates. 33 */ 34 public class SplitByLine extends FileInputFormat<LongWritable, Text> { 35 private int N = 3; 36 37 @Override 38 public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { 39 return new LineRecordReader(); 40 } 41 42 public List<InputSplit> getSplits(JobContext job) throws IOException { 43 List<InputSplit> splits = new ArrayList<InputSplit>(); 44 for (FileStatus file: listStatus(job)){ 45 Path path = file.getPath(); 46 FileSystem fs = path.getFileSystem(job.getConfiguration()); 47 LineReader lr = null; 48 try{ 49 FSDataInputStream in = fs.open(path); 50 Configuration conf = job.getConfiguration(); 51 lr = new LineReader(in, conf); 52 N = conf.getInt("mapred.line.input.format.linespermap", 3); 53 Text line = new Text(); 54 int numLines = 0; 55 long begin = 0; 56 long length = 0; 57 int num = -1; 58 while((num = lr.readLine(line)) > 0){ 59 numLines++; 60 length += num; 61 if (numLines == N){ 62 splits.add(new FileSplit(path, begin, length, new String[]{})); 63 begin += length; 64 length = 0; 65 numLines = 0; 66 } 67 } 68 if (numLines != 0) { 69 splits.add(new FileSplit(path, begin, length, new String[]{})); 70 } 71 }finally { 72 if (lr != null){ 73 lr.close(); 74 } 75 } 76 } 77 System.out.println("Total # of splits: " + splits.size()); 78 return splits; 79 } 80 }
参考:http://doudouclever.blog.163.com/blog/static/175112310201273162313219/