zoukankan      html  css  js  c++  java
  • 自定义InputFormat 案例

    无论是HDFS还是MapReduce在处理小文件时,都要消耗大量内存,效率低

    一、回顾

    1、HDFS

    har,对外对应一个NameNode,对内对应多个文件

    2、MapReduce

    CombineTextInputFormat,分为虚拟存储过程和切片过程

    虚拟存储过程和切片过程都要和最大值做比较

    改变切片

    二、需求分析

    1、需求

    将多个小文件合并成一个SequencFile文件。

    补充:SequenceFile文件是Hadoop用来存储二进制形式的key-value的文件格式,SequenceFile文件存储多个文件,存储格式为:

    key:文件路径 + 文件名(Text)

    value: 文件的具体内容(BytesWritable)

    注意:多个小文件变成一个SequenceFile,切片过程没有改变,把多个文件存储在一起

    2、分析过程

    a、自定义InputFormat类,继承FileInputFormat

    b、重写createRecordReader() ,返回RecordReader数据类型

    c、创建类继承RecordReader类

    三、代码

    1、自定义的InputFormat

    package com.whole;
    
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import java.io.IOException;
    
    /**
     * 1. 返回的数据类型是 RecordReader, 因此需要返回 recordReader
     * 2. 定义一个 wholeRecordReader 继承 RecordReader
     * 3. 初始化(后面)
     */
    public class wholeInputFormat extends FileInputFormat<Text, BytesWritable> {
        public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            wholeRecordReader  recordReader = new wholeRecordReader();
            recordReader.initialize(split, context);
            return recordReader;
        }
    }

    2、定义继承RecordReader的类

    package com.whole;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    /**
     * 0. key value  数据类型 对应 Text BytesWritable (自定义的 InputFormat 的 key value)
     * 1. 初始化: 定义 FileSplit split
     * 2. getCurrentKey() 返回 key
     * 3. getCurrentValue() 返回 value
     * 4. 编写核心业务, 具体见 下面的 注释
     */
    
    public class wholeRecordReader  extends RecordReader<Text, BytesWritable> {
        FileSplit split;
        boolean isProcess = true;
        Text k = new Text();
        Configuration conf;
        BytesWritable v = new BytesWritable();
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            // 初始化
            this.split = (FileSplit)split;
            conf = context.getConfiguration();
        }
    
        public boolean nextKeyValue() throws IOException, InterruptedException {
            // 核心业务
            if (isProcess){
                // 0. 定义 buff 区
                byte[] buff = new byte[(int) split.getLength()];
                // 1. 通过split -> path,通过 path -> fs
                Path path = split.getPath();
                FileSystem fs = path.getFileSystem(conf);
                // 2. 读取数据
                FSDataInputStream fIS = fs.open(path);
                // 3. 读取文件内容
                IOUtils.readFully(fIS, buff, 0, buff.length);
                // 4. 设置 v
                v.set(buff, 0, buff.length);
                // 5. 设置 k
                k.set(split.getPath().toString());
    IOUtils.closeStream(fIS); isProcess
    = false; return true; } return false; } public Text getCurrentKey() throws IOException, InterruptedException { // 获取当前的 key return k; } public BytesWritable getCurrentValue() throws IOException, InterruptedException { // 获取当前的 value return v; } public float getProgress() throws IOException, InterruptedException { return 0; } public void close() throws IOException { } }

    3、Mapper

    package com.whole;
    
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * 1.输入 key value Text BytesWritable (自定义的 InputFormat 的 key value) ; 输出 key value
     */
    public class wholeMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
        @Override
        protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
            context.write(key, value);
        }
    }

    4、Reducer

    package com.whole;
    
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class wholeReducer extends Reducer<Text, BytesWritable,Text,BytesWritable> {
        @Override
        protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
            // 循环输出
            for (BytesWritable value : values) {
                context.write(key, value);
            }
        }
    }

    5、Driver

    package com.whole;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    
    import java.io.IOException;
    
    public class wholeDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            args = new String[]{"E:\a\input", "E:\a\output"};
            // 1. 获取 job
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 2. 设置 jar
            job.setJarByClass(wholeDriver.class);
            // 3. 关联 mapper reducer
            job.setMapperClass(wholeMapper.class);
            job.setReducerClass(wholeReducer.class);
            // 4. 设置 mapper 输出的 key value
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(BytesWritable.class);
            // 5. 设置 输出的 key value
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BytesWritable.class);
            // 重要 设置 输入 输出 格式
            job.setInputFormatClass(wholeInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            // 6. 设置 输入 输出 路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            // 7. 提交 job
            boolean wait = job.waitForCompletion(true);
            System.exit(wait? 0: 1);
        }
    }

    注意:

    自定义的InputForamt类,只是把小文件合成SequenceFile格式的文件,并没有改变切片次数

    核心代码:

    1、继承RecordReade类中的 nextKeyValue() 方法

    2、Driver类中 设置 InputFormat 和OutPutFormat

    job.setInputFormatClass(wholeInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
  • 相关阅读:
    Windows下使用nmake编译C/C++的makefile
    poj 1228
    poj 1039
    poj 1410
    poj 3304
    poj 1113
    poj 2074
    uva 1423 LA 4255
    poj 1584
    poj 3277
  • 原文地址:https://www.cnblogs.com/wt7018/p/13615144.html
Copyright © 2011-2022 走看看