zoukankan      html  css  js  c++  java
  • Hadoop案例(六)小文件处理(自定义InputFormat)

    小文件处理(自定义InputFormat)

    1.需求分析

    无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。将多个小文件合并成一个文件SequenceFile,SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。

    2.数据准备

    one.txt

    yongpeng weidong weinan
    sanfeng luozong xiaoming

    two.txt

    longlong fanfan
    mazong kailun yuhang yixin
    longlong fanfan
    mazong kailun yuhang yixin

    three.txt

    shuaige changmo zhenqiang 
    dongli lingu xuanxuan

    最终预期文件格式:

    3.优化分析

    小文件的优化无非以下几种方式:

    (1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS

    (2)在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并

    (3)在mapreduce处理时,可采用CombineTextInputFormat提高效率

    4.具体实现

    本节采用自定义InputFormat的方式,处理输入小文件的问题。

    (1)自定义一个类继承FileInputFormat

    (2)改写RecordReader,实现一次读取一个完整文件封装为KV

    (3)在输出时使用SequenceFileOutPutFormat输出合并文件

    5.代码实现

    (1)自定义InputFromat

    package com.xyg.mapreduce.inputformat;
    
    import java.io.IOException;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    public class WholeFileInputformat extends FileInputFormat<NullWritable, BytesWritable>{
    
        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            return false;
        }
        
        @Override
        public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            // 1 定义一个自己的recordReader
            WholeRecordReader recordReader = new WholeRecordReader();
            
            // 2 初始化recordReader
            recordReader.initialize(split, context);
            
            return recordReader;
        }
    }

    (2)自定义RecordReader

    package com.xyg.mapreduce.inputformat;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable> {
        private FileSplit split;
        private Configuration configuration;
    
        private BytesWritable value = new BytesWritable();
        private boolean processed = false;
    
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            // 获取传递过来的数据
            this.split = (FileSplit) split;
            configuration = context.getConfiguration();
        }
    
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            
            if (!processed) {
                // 1 定义缓存
                byte[] contents = new byte[(int) split.getLength()];
    
                // 2 获取文件系统
                Path path = split.getPath();
                FileSystem fs = path.getFileSystem(configuration);
    
                // 3 读取内容
                FSDataInputStream fis = null;
                try {
                    // 3.1 打开输入流
                    fis = fs.open(path);
                    
                    // 3.2 读取文件内容
                    IOUtils.readFully(fis, contents, 0, contents.length);
                    
                    // 3.3 输出文件内容
                    value.set(contents, 0, contents.length);
                } catch (Exception e) {
    
                } finally {
                    IOUtils.closeStream(fis);
                }
                
                processed = true;
                
                return true;
            }
            
            return false;
        }
    
        @Override
        public NullWritable getCurrentKey() throws IOException, InterruptedException {
            
            return NullWritable.get();
        }
    
        @Override
        public BytesWritable getCurrentValue() throws IOException, InterruptedException {
            
            return value;
        }
    
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return processed?1:0;
        }
    
        @Override
        public void close() throws IOException {
    
        }
    }

    (3)InputFormatDriver处理流程

    package com.xyg.mapreduce.inputformat;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    public class InputFormatDriver {
    
        static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
            private Text k = new Text();;
    
            @Override
            protected void map(NullWritable key, BytesWritable value, Context context)
                    throws IOException, InterruptedException {
    
                // 获取切片信息
                InputSplit split = context.getInputSplit();
                // 获取切片路径
                Path path = ((FileSplit) split).getPath();
                // 根据切片路径获取文件名称
    k.set(path.toString());
    
                // 文件名称为key
                context.write(k, value);
            }
        }
    
        public static void main(String[] args) throws Exception {
            args = new String[] { "e:/inputinputformat", "e:/output1" };
    
            Configuration conf = new Configuration();
            
            Job job = Job.getInstance(conf);
            job.setJarByClass(InputFormatDriver.class);
            job.setMapperClass(SequenceFileMapper.class);
            job.setNumReduceTasks(0);
            job.setInputFormatClass(WholeFileInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BytesWritable.class);
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            boolean result = job.waitForCompletion(true);
    
            System.exit(result ? 0 : 1);
        }
    }
  • 相关阅读:
    Camera
    iOS实现截屏 并合适保存
    将UIView转成UIImage,将UIImage转成PNG/JPG
    iOS7Status bar适配
    @synthesize obj=_obj的意义详解 @property和@synthesize
    iOS各种问题处理
    Foundation框架中的NSNumber对象详解
    iOS 文件和数据管理 (可能会删除本地文件储存)
    当ABAP遇见普罗米修斯
    一个工作13年的SAP开发人员的回忆:电子科技大学2000级新生入学指南
  • 原文地址:https://www.cnblogs.com/frankdeng/p/9256245.html
Copyright © 2011-2022 走看看