无论是HDFS还是MapReduce在处理小文件时,都要消耗大量内存,效率低
一、回顾
1、HDFS
har,对外对应一个NameNode,对内对应多个文件
2、MapReduce
CombineTextInputFormat,分为虚拟存储过程和切片过程
虚拟存储过程和切片过程都要和最大值做比较
改变切片
二、需求分析
1、需求
将多个小文件合并成一个SequencFile文件。
补充:SequenceFile文件是Hadoop用来存储二进制形式的key-value的文件格式,SequenceFile文件存储多个文件,存储格式为:
key:文件路径 + 文件名(Text)
value: 文件的具体内容(BytesWritable)
注意:多个小文件变成一个SequenceFile,切片过程没有改变,把多个文件存储在一起
2、分析过程
a、自定义InputFormat类,继承FileInputFormat
b、重写createRecordReader() ,返回RecordReader数据类型
c、创建类继承RecordReader类
三、代码
1、自定义的InputFormat
package com.whole; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; /** * 1. 返回的数据类型是 RecordReader, 因此需要返回 recordReader * 2. 定义一个 wholeRecordReader 继承 RecordReader * 3. 初始化(后面) */ public class wholeInputFormat extends FileInputFormat<Text, BytesWritable> { public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { wholeRecordReader recordReader = new wholeRecordReader(); recordReader.initialize(split, context); return recordReader; } }
2、定义继承RecordReader的类
package com.whole; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; /** * 0. key value 数据类型 对应 Text BytesWritable (自定义的 InputFormat 的 key value) * 1. 初始化: 定义 FileSplit split * 2. getCurrentKey() 返回 key * 3. getCurrentValue() 返回 value * 4. 编写核心业务, 具体见 下面的 注释 */ public class wholeRecordReader extends RecordReader<Text, BytesWritable> { FileSplit split; boolean isProcess = true; Text k = new Text(); Configuration conf; BytesWritable v = new BytesWritable(); public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // 初始化 this.split = (FileSplit)split; conf = context.getConfiguration(); } public boolean nextKeyValue() throws IOException, InterruptedException { // 核心业务 if (isProcess){ // 0. 定义 buff 区 byte[] buff = new byte[(int) split.getLength()]; // 1. 通过split -> path,通过 path -> fs Path path = split.getPath(); FileSystem fs = path.getFileSystem(conf); // 2. 读取数据 FSDataInputStream fIS = fs.open(path); // 3. 读取文件内容 IOUtils.readFully(fIS, buff, 0, buff.length); // 4. 设置 v v.set(buff, 0, buff.length); // 5. 设置 k k.set(split.getPath().toString());
IOUtils.closeStream(fIS); isProcess = false; return true; } return false; } public Text getCurrentKey() throws IOException, InterruptedException { // 获取当前的 key return k; } public BytesWritable getCurrentValue() throws IOException, InterruptedException { // 获取当前的 value return v; } public float getProgress() throws IOException, InterruptedException { return 0; } public void close() throws IOException { } }
3、Mapper
package com.whole; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 1.输入 key value Text BytesWritable (自定义的 InputFormat 的 key value) ; 输出 key value */ public class wholeMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> { @Override protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } }
4、Reducer
package com.whole; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class wholeReducer extends Reducer<Text, BytesWritable,Text,BytesWritable> { @Override protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { // 循环输出 for (BytesWritable value : values) { context.write(key, value); } } }
5、Driver
package com.whole; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import java.io.IOException; public class wholeDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"E:\a\input", "E:\a\output"}; // 1. 获取 job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2. 设置 jar job.setJarByClass(wholeDriver.class); // 3. 关联 mapper reducer job.setMapperClass(wholeMapper.class); job.setReducerClass(wholeReducer.class); // 4. 设置 mapper 输出的 key value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); // 5. 设置 输出的 key value job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); // 重要 设置 输入 输出 格式 job.setInputFormatClass(wholeInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); // 6. 设置 输入 输出 路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7. 提交 job boolean wait = job.waitForCompletion(true); System.exit(wait? 0: 1); } }
注意:
自定义的InputForamt类,只是把小文件合成SequenceFile格式的文件,并没有改变切片次数
核心代码:
1、继承RecordReade类中的 nextKeyValue() 方法
2、Driver类中 设置 InputFormat 和OutPutFormat
job.setInputFormatClass(wholeInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class);