MapReduce默认的InputFormat是TextInputFormat,且key是偏移量,value是文本,自定义InputFormat需要实现FileInputFormat,并重写createRecorder方法,如果需要还可以重写isSplitable()来设置是否切片,重写了createRecordReader还需要自定义RecordReader,InputFormat规定了key,value是什么,而RecordReader则是具体的读取逻辑,下面的例子是合并小文件,最终输出的k是文件路径,v是文件二进制字节
1.InputFormat
1 /**
2 * 自定义InputFormat规定读取文件的k,v
3 * @author tele
4 *
5 */
6 public class MyInputFormat extends FileInputFormat<NullWritable,BytesWritable>{
7 /**
8 * 设置不切片,把小文件作为一个整体
9 */
10 @Override
11 protected boolean isSplitable(JobContext context, Path filename) {
12 return false;
13 }
14
15 @Override
16 public RecordReader<NullWritable,BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
17 throws IOException, InterruptedException {
18 MyRecordReader recordReader = new MyRecordReader();
19 recordReader.initialize(split, context);
20 return recordReader;
21 }
22 }
2.RecordReader
1 /**
2 * recordreader用于读取文件内容,输出文件内容即可,文件路径信息保存在split中
3 * @author tele
4 *
5 */
6 public class MyRecordReader extends RecordReader<NullWritable,BytesWritable> {
7 FileSplit split;
8 BytesWritable value = new BytesWritable();
9 boolean flag = false;
10 Configuration conf;
11 int count = 0;
12
13 /**
14 * 初始化
15 */
16 @Override
17 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
18 this.split = (FileSplit) split;
19 conf = context.getConfiguration(); conf = context.getConfiguration();
20 }
21
22 /**
23 * 业务逻辑处理,这个方法用来判断是否还有文件内容需要读取,会进入两次,第一次读取内容存入value中,返回true,第二次调用返回false
24 * 只要返回true,就会调用getCurrentKey().getCurrentValue()把内容返回给map
25 *
26 */
27 @Override
28 public boolean nextKeyValue() throws IOException, InterruptedException {
29 count++;
30 if(!flag) {
31 //获取fs
32 FileSystem fs = FileSystem.get(conf);
33 //开启流
34 Path path = this.split.getPath();
35 FSDataInputStream fsDataInputStream = fs.open(path);
36 long length = this.split.getLength();
37 byte[] buf = new byte[(int) length];
38
39 //读取
40 IOUtils.readFully(fsDataInputStream, buf, 0,buf.length);
41 value.set(buf, 0, buf.length);
42
43 //关闭流
44 IOUtils.closeStream(fsDataInputStream);
45 flag = true;
46 }else {
47 flag = false;
48 }
49 return flag;
50 }
51
52 @Override
53 public NullWritable getCurrentKey() throws IOException, InterruptedException {
54 return NullWritable.get();
55 }
56
57 @Override
58 public BytesWritable getCurrentValue() throws IOException, InterruptedException {
59 return value;
60 }
61
62 @Override
63 public float getProgress() throws IOException, InterruptedException {
64 return flag?1:0;
65 }
66
67 @Override
68 public void close() throws IOException {
69
70 }
71 }
3.Mapper
1 /**
2 * 把结果输出到SequenceFileOutPutFormat中,输出的key是文件路径,value为文件内容
3 * @author tele
4 *
5 */
6 public class InputformatMapper extends Mapper<NullWritable, BytesWritable, Text,BytesWritable/*Text*/> {
7 Text k = new Text();
8
9 @Override
10 protected void map(NullWritable key, BytesWritable value,
11 Mapper<NullWritable, BytesWritable, Text, BytesWritable/*Text*/>.Context context)
12 throws IOException, InterruptedException {
13 FileSplit split = (FileSplit) context.getInputSplit();
14 Path path = split.getPath();
15
16 k.set(path.toString());
17
18 /* String result = new String(value.getBytes(),0,value.getLength());
19 context.write(k,new Text(result));*/
20
21 context.write(k, value);
22 }
23 }
4.Driver(由于输出的是字节,需要指定OutputFormat为SequenceFileOutputFormat)
1 /**
2 * 驱动
3 * @author tele
4 *
5 */
6 public class InputformatDriver {
7 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
8 //1.获得job实例
9 Configuration conf = new Configuration();
10 Job job = Job.getInstance(conf);
11
12 //2.关联class
13 job.setJarByClass(InputformatDriver.class);
14 job.setMapperClass(InputformatMapper.class);
15
16
17 //4.设置format
18 job.setInputFormatClass(MyInputFormat.class);
19 //使用SequenceFileOutputFormat作为输出格式
20 job.setOutputFormatClass(SequenceFileOutputFormat.class);
21
22 //5.数据类型
23 job.setOutputKeyClass(Text.class);
24 job.setOutputValueClass(BytesWritable.class);
25
26 // job.setOutputValueClass(Text.class);
27
28 //6.设置输入与输出路径
29 FileInputFormat.setInputPaths(job,new Path(args[0]));
30 FileOutputFormat.setOutputPath(job,new Path(args[1]));
31
32 //7.提交
33 boolean result = job.waitForCompletion(true);
34 System.exit(result?0:1);
35 }
36 }