zoukankan      html  css  js  c++  java
  • MapReduce小文件处理之CombineFileInputFormat实现


    在MapReduce使用过程中。一般会遇到输入文件特别小(几百KB、几十MB)。而Hadoop默认会为每一个文件向yarn申请一个container启动map,container的启动关闭是很耗时的。

    Hadoop提供了CombineFileInputFormat。一个抽象类。作用是将多个小文件合并到一个map中,我们仅仅需实现三个类:

    CompressedCombineFileInputFormat
    CompressedCombineFileRecordReader
    CompressedCombineFileWritable


    maven

    <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.5.0-cdh5.2.1</version>
    </dependency>


    CompressedCombineFileInputFormat.java

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
    
    import java.io.IOException;
    
    
    public class CompressedCombineFileInputFormat
            extends CombineFileInputFormat<CompressedCombineFileWritable, Text> {
    
        public CompressedCombineFileInputFormat() {
            super();
    
        }
    
        public RecordReader<CompressedCombineFileWritable, Text>
        createRecordReader(InputSplit split,
                           TaskAttemptContext context) throws IOException {
            return new
                    CombineFileRecordReader<CompressedCombineFileWritable,
                            Text>((CombineFileSplit) split, context,
                    CompressedCombineFileRecordReader.class);
        }
    
        @Override
        protected boolean isSplitable(JobContext context, Path file) {
            return false;
        }
    
    }


    CompressedCombineFileRecordReader.java

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
    import org.apache.hadoop.util.LineReader;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    
    public class CompressedCombineFileRecordReader
            extends RecordReader<CompressedCombineFileWritable, Text> {
    
        private long startOffset;
        private long end;
        private long pos;
        private FileSystem fs;
        private Path path;
        private Path dPath;
        private CompressedCombineFileWritable key = new CompressedCombineFileWritable();
        private Text value;
        private long rlength;
        private FSDataInputStream fileIn;
        private LineReader reader;
    
    
        public CompressedCombineFileRecordReader(CombineFileSplit split,
                                                 TaskAttemptContext context, Integer index) throws IOException {
    
            Configuration currentConf = context.getConfiguration();
            this.path = split.getPath(index);
            boolean isCompressed = findCodec(currentConf, path);
            if (isCompressed)
                codecWiseDecompress(context.getConfiguration());
    
            fs = this.path.getFileSystem(currentConf);
    
            this.startOffset = split.getOffset(index);
    
            if (isCompressed) {
                this.end = startOffset + rlength;
            } else {
                this.end = startOffset + split.getLength(index);
                dPath = path;
            }
    
            boolean skipFirstLine = false;
    
            fileIn = fs.open(dPath);
    
            if (isCompressed) fs.deleteOnExit(dPath);
    
            if (startOffset != 0) {
                skipFirstLine = true;
                --startOffset;
                fileIn.seek(startOffset);
            }
            reader = new LineReader(fileIn);
            if (skipFirstLine) {
                startOffset += reader.readLine(new Text(), 0,
                        (int) Math.min((long) Integer.MAX_VALUE, end - startOffset));
            }
            this.pos = startOffset;
        }
    
        public void initialize(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
        }
    
        public void close() throws IOException {
        }
    
        public float getProgress() throws IOException {
            if (startOffset == end) {
                return 0.0f;
            } else {
                return Math.min(1.0f, (pos - startOffset) / (float)
                        (end - startOffset));
            }
        }
    
        public boolean nextKeyValue() throws IOException {
            if (key.fileName == null) {
                key = new CompressedCombineFileWritable();
                key.fileName = dPath.getName();
            }
            key.offset = pos;
            if (value == null) {
                value = new Text();
            }
            int newSize = 0;
            if (pos < end) {
                newSize = reader.readLine(value);
                pos += newSize;
            }
            if (newSize == 0) {
                key = null;
                value = null;
                return false;
            } else {
                return true;
            }
        }
    
        public CompressedCombineFileWritable getCurrentKey()
                throws IOException, InterruptedException {
            return key;
        }
    
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }
    
    
        private void codecWiseDecompress(Configuration conf) throws IOException {
    
            CompressionCodecFactory factory = new CompressionCodecFactory(conf);
            CompressionCodec codec = factory.getCodec(path);
    
            if (codec == null) {
                System.err.println("No Codec Found For " + path);
                System.exit(1);
            }
    
            String outputUri =
                    CompressionCodecFactory.removeSuffix(path.toString(),
                            codec.getDefaultExtension());
            dPath = new Path(outputUri);
    
            InputStream in = null;
            OutputStream out = null;
            fs = this.path.getFileSystem(conf);
    
            try {
                in = codec.createInputStream(fs.open(path));
                out = fs.create(dPath);
                IOUtils.copyBytes(in, out, conf);
            } finally {
                IOUtils.closeStream(in);
                IOUtils.closeStream(out);
                rlength = fs.getFileStatus(dPath).getLen();
            }
        }
    
        private boolean findCodec(Configuration conf, Path p) {
    
            CompressionCodecFactory factory = new CompressionCodecFactory(conf);
            CompressionCodec codec = factory.getCodec(path);
    
            if (codec == null)
                return false;
            else
                return true;
    
        }
    
    }


    CompressedCombineFileWritable.java

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class CompressedCombineFileWritable implements WritableComparable {
    
        public long offset;
        public String fileName;
    
    
        public CompressedCombineFileWritable() {
            super();
        }
    
        public CompressedCombineFileWritable(long offset, String fileName) {
            super();
            this.offset = offset;
            this.fileName = fileName;
        }
    
        public void readFields(DataInput in) throws IOException {
            this.offset = in.readLong();
            this.fileName = Text.readString(in);
        }
    
        public void write(DataOutput out) throws IOException {
            out.writeLong(offset);
            Text.writeString(out, fileName);
        }
    
    
        public int compareTo(Object o) {
            CompressedCombineFileWritable that = (CompressedCombineFileWritable) o;
    
            int f = this.fileName.compareTo(that.fileName);
            if (f == 0) {
                return (int) Math.signum((double) (this.offset - that.offset));
            }
            return f;
        }
    
        @Override
        public boolean equals(Object obj) {
            if (obj instanceof CompressedCombineFileWritable)
                return this.compareTo(obj) == 0;
            return false;
        }
    
        @Override
        public int hashCode() {
    
            final int hashPrime = 47;
            int hash = 13;
            hash = hashPrime * hash + (this.fileName != null ? this.fileName.hashCode() : 0);
            hash = hashPrime * hash + (int) (this.offset ^ (this.offset >>> 16));
    
            return hash;
        }
    
        @Override
        public String toString() {
            return this.fileName + "-" + this.offset;
        }
    
    }



    MR測试类

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.MRJobConfig;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    
    public class CFWordCount extends Configured implements Tool {
    
        /**
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            System.exit(ToolRunner.run(new Configuration(), new CFWordCount(), args));
        }
    
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();
            conf.setLong(CombineFileInputFormat.SPLIT_MAXSIZE, 128 * 1024 * 1024);
            conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
            conf.setClass(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);
            Job job = new Job(conf);
            job.setJobName("CombineFile Demo");
            job.setJarByClass(CFWordCount.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            job.setInputFormatClass(CompressedCombineFileInputFormat.class);
            job.setMapperClass(TestMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setReducerClass(IntSumReducer.class);
            job.setNumReduceTasks(1);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            job.submit();
            job.waitForCompletion(true);
    
            return 0;
        }
    
        public static class TestMapper extends Mapper<CompressedCombineFileWritable, Text, Text, IntWritable> {
            private Text txt = new Text();
            private IntWritable count = new IntWritable(1);
    
            public void map(CompressedCombineFileWritable key, Text val, Context context) throws IOException, InterruptedException {
                StringTokenizer st = new StringTokenizer(val.toString());
                while (st.hasMoreTokens()) {
                    txt.set(st.nextToken());
                    context.write(txt, count);
                }
            }
        }
    }
    



    注意:使用CombineFileInputFormat过程中发现不管小文件积累到多大,甚至超过HDFS BlockSize后。仍然仅仅有一个map split,查看 hadoop 的源代码发现,使用CombineFileInputFormat时。假设没有显示指定CombineFileInputFormat.SPLIT_MAXSIZE,默认不会切分map split,解决方法例如以下:

    conf.setLong(CombineFileInputFormat.SPLIT_MAXSIZE, 128 * 1024 * 1024);


  • 相关阅读:
    matlab2016b -ubuntu 1604 -install- and -trouble -shooting--finally-all is ok!!
    cvpr2017-code-etc
    汇率换算自然语言理解功能JAVA DEMO
    聚焦新相亲时代:女孩在京有五六套房哭着想嫁富2代
    cvpr2017年的所有论文下载
    公司危机、下岗困局、不受重视,程序员该如何面对职场挫折?
    利用CH341A编程器刷新BIOS,恢复BIOS,妈妈再也不用担心BIOS刷坏了
    垃圾人定律和垃圾人生存方式定律
    90后女孩的杀身之祸----悲剧酿成--放弃所有的虚构的故事后,你终会发现,真实平淡的现实才是最美好的。
    仓央嘉措比较著名的诗
  • 原文地址:https://www.cnblogs.com/jhcelue/p/7249593.html
Copyright © 2011-2022 走看看