Hadoop提供了较为丰富的数据输入输出格式,可以满足很多的设计实现,但是在某些时候需要自定义输入输出格式。
数据的输入格式用于描述MapReduce作业的数据输入规范,MapReduce框架依靠 数据输入格式完后输入规范检查(比如输入文件目录的检查),对数据文件进行输入分块(InputSpilt)以及提供从输入分快中将数据逐行的读出,并转 换为Map过程的输入键值对等功能。Hadoop提供了很多的输入格式,TextInputFormat和KeyValueInputFormat,对于 每个输入格式都有与之对应的RecordReader,LineRecordReader和KeyValueLineRecordReader。用户需要 自定义输入格式,主要实现InputFormat中的createRecordReader()和getSplit()方法,而在 RecordReader中实现getCurrentKey().....
例如:
1 package com.rpc.nefu; 2 3 import java.io.IOException; 4 import org.apache.hadoop.fs.FSDataInputStream; 5 import org.apache.hadoop.fs.FileSystem; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.InputSplit; 10 import org.apache.hadoop.mapreduce.RecordReader; 11 import org.apache.hadoop.mapreduce.TaskAttemptContext; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.util.LineReader; 14 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 15 16 //自定义的输入格式需要 继承FileInputFormat接口 17 public class ZInputFormat extends FileInputFormat<IntWritable,IntWritable>{ 18 19 @Override //实现RecordReader 20 public RecordReader<IntWritable, IntWritable> createRecordReader( 21 InputSplit split, TaskAttemptContext context) 22 throws IOException, InterruptedException { 23 return new ZRecordReader(); 24 } 25 26 //自定义的数据类型 27 public static class ZRecordReader extends RecordReader<IntWritable,IntWritable> 28 { 29 //data 30 private LineReader in; //输入流 31 private boolean more = true;//提示后续还有没有数据 32 33 private IntWritable key = null; 34 private IntWritable value = null; 35 36 //这三个保存当前读取到位置(即文件中的位置) 37 private long start; 38 private long end; 39 private long pos; 40 41 //private Log LOG = LogFactory.getLog(ZRecordReader.class);//日志写入系统,可加可不加 42 43 @Override 44 public void initialize(InputSplit split, TaskAttemptContext context) 45 throws IOException, InterruptedException { 46 // 初始化函数 47 48 FileSplit inputsplit = (FileSplit)split; 49 start = inputsplit.getStart(); //得到此分片开始位置 50 end = start + inputsplit.getLength();//结束此分片位置 51 final Path file = inputsplit.getPath(); 52 53 // 打开文件 54 FileSystem fs = file.getFileSystem(context.getConfiguration()); 55 FSDataInputStream fileIn = fs.open(inputsplit.getPath()); 56 57 58 //将文件指针移动到当前分片,因为每次默认打开文件时,其指针指向开头 59 fileIn.seek(start); 60 61 in = new LineReader(fileIn, context.getConfiguration()); 62 63 if (start != 0) 64 { 65 System.out.println("4"); 66 //如果这不是第一个分片,那么假设第一个分片是0——4,那么,第4个位置已经被读取,则需要跳过4,否则会产生读入错误,因为你回头又去读之前读过的地方 67 start += in.readLine(new Text(), 0, maxBytesToConsume(start)); 68 } 69 pos = start; 70 } 71 72 private int maxBytesToConsume(long pos) 73 { 74 return (int) Math.min(Integer.MAX_VALUE, end - pos); 75 } 76 77 @Override 78 public boolean nextKeyValue() throws IOException, 79 InterruptedException { 80 //下一组值 81 //tips:以后在这种函数中最好不要有输出,费时 82 //LOG.info("正在读取下一个,嘿嘿"); 83 if(null == key) 84 { 85 key = new IntWritable(); 86 } 87 if(null == value) 88 { 89 value = new IntWritable(); 90 } 91 Text nowline = new Text();//保存当前行的内容 92 int readsize = in.readLine(nowline); 93 //更新当前读取到位置 94 pos += readsize; 95 96 //如果pos的值大于等于end,说明此分片已经读取完毕 97 if(pos >= end) 98 { 99 more = false; 100 return false; 101 } 102 103 if(0 == readsize) 104 { 105 key = null; 106 value = null; 107 more = false;//说明此时已经读取到文件末尾,则more为false 108 return false; 109 } 110 String[] keyandvalue = nowline.toString().split(","); 111 112 //排除第一行 113 if(keyandvalue[0].endsWith(""CITING"")) 114 { 115 readsize = in.readLine(nowline); 116 //更新当前读取到位置 117 pos += readsize; 118 if(0 == readsize) 119 { 120 more = false;//说明此时已经读取到文件末尾,则more为false 121 return false; 122 } 123 //重新划分 124 keyandvalue = nowline.toString().split(","); 125 } 126 127 //得到key和value 128 //LOG.info("key is :" + key +"value is" + value); 129 key.set(Integer.parseInt(keyandvalue[0])); 130 value.set(Integer.parseInt(keyandvalue[1])); 131 132 return true; 133 } 134 135 @Override 136 public IntWritable getCurrentKey() throws IOException, 137 InterruptedException { 138 //得到当前的Key 139 return key; 140 } 141 142 @Override 143 public IntWritable getCurrentValue() throws IOException, 144 InterruptedException { 145 //得到当前的value 146 return value; 147 } 148 149 @Override 150 public float getProgress() throws IOException, InterruptedException { 151 //计算对于当前片的处理进度 152 if( false == more || end == start) 153 { 154 return 0f; 155 } 156 else 157 { 158 return Math.min(1.0f, (pos - start)/(end - start)); 159 } 160 } 161 162 @Override 163 public void close() throws IOException { 164 //关闭此输入流 165 if(null != in) 166 { 167 in.close(); 168 } 169 } 170 171 } 172 }
package reverseIndex;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
public class FileNameLocInputFormat extends FileInputFormat<Text, Text>{
@Override
public org.apache.hadoop.mapreduce.RecordReader<Text, Text> createRecordReader(
org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new FileNameLocRecordReader();
}
public static class FileNameLocRecordReader extends RecordReader<Text,Text>{
String FileName;
LineRecordReader line = new LineRecordReader();
/**
* ......
*/
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new Text("("+FileName+"@"+line.getCurrentKey()+")");
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return line.getCurrentValue();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext arg1)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
line.initialize(split, arg1);
FileSplit inputsplit = (FileSplit)split;
FileName = (inputsplit).getPath().getName();
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return false;
}
}
}
Hadoop中也内置了很多的输出格式与RecordWriter.输出格式完成输出规范检查,作业结果数据输出。
自定义的输出格式:
public static class AlphaOutputFormat extends multiformat<Text, IntWritable>{
@Override
protected String generateFileNameForKeyValue(Text key,
IntWritable value, Configuration conf) {
// TODO Auto-generated method stub
char c = key.toString().toLowerCase().charAt(0);
if( c>='a' && c<='z'){
return c+".txt";
}else{
return "other.txt";
}
}
}
//设置输出格式
job.setOutputFormatClass(AlphaOutputFormat.class);
package com.rpc.nefu;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
public abstract class multiformat<K extends WritableComparable<?>, V extends Writable>
extends FileOutputFormat<K, V> {
private MultiRecordWriter writer = null;
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,
InterruptedException {
if (writer == null) {
writer = new MultiRecordWriter(job, getTaskOutputPath(job));
}
return writer;
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path workPath = null;
OutputCommitter committer = super.getOutputCommitter(conf);
if (committer instanceof FileOutputCommitter) {
workPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
Path outputPath = super.getOutputPath(conf);
if (outputPath == null) {
throw new IOException("Undefined job output-path");
}
workPath = outputPath;
}
return workPath;
}
/**通过key, value, conf来确定输出文件名(含扩展名)*/
protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);
public class MultiRecordWriter extends RecordWriter<K, V> {
/**RecordWriter的缓存*/
private HashMap<String, RecordWriter<K, V>> recordWriters = null;
private TaskAttemptContext job = null;
/**输出目录*/
private Path workPath = null;
public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
super();
this.job = job;
this.workPath = workPath;
recordWriters = new HashMap<String, RecordWriter<K, V>>();
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
while (values.hasNext()) {
values.next().close(context);
}
this.recordWriters.clear();
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
//得到输出文件名
String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());
RecordWriter<K, V> rw = this.recordWriters.get(baseName);
if (rw == null) {
rw = getBaseRecordWriter(job, baseName);
this.recordWriters.put(baseName, rw);
}
rw.write(key, value);
}
// ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)
throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = ",";
RecordWriter<K, V> recordWriter = null;
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
Path file = new Path(workPath, baseName + codec.getDefaultExtension());
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
recordWriter = new lineRecordWrite<K, V>(new DataOutputStream(codec
.createOutputStream(fileOut)), keyValueSeparator);
} else {
Path file = new Path(workPath, baseName);
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
recordWriter = new lineRecordWrite<K, V>(fileOut, keyValueSeparator);
}
return recordWriter;
}
}
}