zoukankan      html  css  js  c++  java
  • 自定义InputFormat和OutputFormat案例

    一、自定义InputFormat

      InputFormat是输入流,在前面的例子中使用的是文件输入输出流FileInputFormat和FileOutputFormat,而FileInputFormat和FileOutputFormat它们默认使用的是继承它们的子类TextInputFormat和TextOutputFormat,以Text的方式去读取数据。

      当我们遇到许多小文件,要将他们整理合成为一个文件SequenceFile(存储了多个小文件),且文件内的存储格式为:文件路径+文件内容,这时我们可以通过封装自定义的InputFormat输入流来实现需求。

      思路如下:

        1.自定义FuncFileInputFormat类继承FileInputFormat(参数类型为NullWritable和BytesWritable),并重写isSplitable和createRecordReader方法;

        2.isSplitable方法中return false即可表示不切割,createRecordReader方法中要返回一个RecordReader类,这是我们要自定义的对输入文件的业务逻辑,所以创建FuncRecordReader类;

        3.FuncRecordReader类继承RecordReader类,参数类型同为NullWritable和BytesWritable,重写initialize、nextKeyValue、getCurrentKey、getCurrentValue、getProcess、close方法;

        4.Mapper:初始化setup方法,通过context拿到切片、获取路径、将路径写入定义的全局变量Text t,然后在map阶段将t和value输出到reducer;

        5.Reducer:遍历values,输出key,value;

        6.Driver:在设置完Mapper和Reducer类后,添加设置setInputFormatClass为FuncFileInputFormat、设置setOutputFormatClass为SequenceFileOutputFormat。

      代码如下:

    /**
     * @author: PrincessHug
     * @date: 2019/3/29, 20:49
     * @Blog: https://www.cnblogs.com/HelloBigTable/
     */
    public class FuncFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            return false;
        }
    
        @Override
        public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            FuncRecordReader recordReader = new FuncRecordReader();
            return recordReader;
        }
    }
    
    public class FuncRecordReader  extends RecordReader<NullWritable, BytesWritable> {
        boolean isProcess = false;
        FileSplit split;
        Configuration conf;
        BytesWritable value = new BytesWritable();
    
        //初始化
        @Override
        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            //初始化切片文件
            this.split = (FileSplit) inputSplit;
    
            //初始化配置信息
            conf = taskAttemptContext.getConfiguration();
    
        }
    
        //获取下一个文件
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!isProcess){
                //根据切片的长度来创建缓冲区
                byte[] buf = new byte[(int) split.getLength()];
                FSDataInputStream fis = null;
                FileSystem fs = null;
    
                try {
                    //获取路径
                    Path path = split.getPath();
                    //根据路径获取文件系统
                    fs = path.getFileSystem(conf);
                    //拿到输入流
                    fis = fs.open(path);
                    //数据拷贝
                    IOUtils.readFully(fis,buf,0,buf.length);
                    //拷贝缓存到最终的输出
                    value.set(buf,0,buf.length);
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    IOUtils.closeStream(fis);
                    IOUtils.closeStream(fs);
                }
                isProcess = true;
                return true;
            }
    
            return false;
        }
    
        @Override
        public NullWritable getCurrentKey() throws IOException, InterruptedException {
            return NullWritable.get();
        }
    
        @Override
        public BytesWritable getCurrentValue() throws IOException, InterruptedException {
            return value;
        }
    
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }
    
        @Override
        public void close() throws IOException {
    
        }
    }
    
    public class SequencceFileMapper extends Mapper<NullWritable, BytesWritable, Text,BytesWritable> {
        Text t = new Text();
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //拿到切片信息
            FileSplit split = (FileSplit) context.getInputSplit();
            //路径
            Path path = split.getPath();
            //即带路径有待名称
            t.set(path.toString());
        }
    
        @Override
        protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
            context.write(t,value);
        }
    }
    
    public class SequenceFileReducer extends Reducer<Text, BytesWritable,Text,BytesWritable> {
        @Override
        protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
            for (BytesWritable v:values){
                context.write(key,v);
            }
        }
    }
    
    public class SequenceFileDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //1.获取job信息
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            //2.获取Jar包
            job.setJarByClass(SequenceFileDriver.class);
    
            //3.获取Mapper、Redcuer类
            job.setMapperClass(SequencceFileMapper.class);
            job.setReducerClass(SequenceFileReducer.class);
    
            //4.设置自定义读取方法
            job.setInputFormatClass(FuncFileInputFormat.class);
    
            //5.设置默认的输出方式
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
            //6.获取Mapper输出数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(BytesWritable.class);
    
            //7.获取Reducer输出数据类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BytesWritable.class);
    
            //8.设置输入存在的路径与处理后的结果路径
            FileInputFormat.setInputPaths(job,new Path("G:\mapreduce\inputformat\in"));
            FileOutputFormat.setOutputPath(job,new Path("G:\mapreduce\inputformat\out"));
    
            //9.提交任务
            if (job.waitForCompletion(true)){
                System.out.println("运行完成!");
            }else {
                System.out.println("运行失败!");
            }
        }
    }
    

      

    二、自定义OutputFormat

      需求:目前我们有一个网站ip的文件,每行都有一个网站的ip地址,要求我们将含有“www.baidu.com”的ip地址取出放入一个结果文件,其他的地址放入另一个结果文件。

      思路:1.首先Mapper、Reduer就是简单的读取数据、写出数据;

        2.自定义FuncFileOutputFormat,重写它的getRecordWriter方法,返回一个FIleRecordWriter对象,这里我们再定义一个FileRecordWriter,重写FileRecordWriter、write、close方法;

        3.Driver:再设置Reducer输出后添加设置setOutputFormatClass为我们自定义的FuncFileOutputFormat即可;

      代码如下:

    /**
     * @author: PrincessHug
     * @date: 2019/3/30, 14:44
     * @Blog: https://www.cnblogs.com/HelloBigTable/
     */
    public class FileMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(new IntWritable(1),new value);
        }
    }
    
    public class FileReducer extends Reducer<IntWritable, Text,Text,NullWritable> {
        @Override
        protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    
            for (Text k:values){
                String s = k.toString() + "
    ";
                context.write(new Text(s),NullWritable.get());
            }
    
        }
    }
    
    public class FuncFileOutputFormat extends FileOutputFormat<Text, NullWritable> {
        @Override
        public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            return new FileRecordWriter(taskAttemptContext);
        }
    }
    
    public class FileRecordWriter extends RecordWriter<Text, NullWritable> {
        Configuration conf = null;
        FSDataOutputStream baidulog = null;
        FSDataOutputStream otherlog = null;
    
        //定义数据输出路径
        public FileRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
            //获取配置信息和文件系统
            conf = taskAttemptContext.getConfiguration();
            FileSystem fs = FileSystem.get(conf);
            //定义输出路径
           itstarlog = fs.create(new Path("G:\mapreduce\outputformat\out\itstart\baidu.logs"));
           otherlog = fs.create(new Path("G:\mapreduce\outputformat\out\other\other.logs"));
        }
    
        //数据输出
        @Override
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
            if (key.toString().contains("baidu")){
                baidulog.write(key.getBytes());
            }else {
                otherlog.write(key.getBytes());
            }
        }
    
        //关闭资源
        @Override
        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            if (itstarlog != null){
                itstarlog.close();
            }
            if (otherlog != null){
                otherlog.close();
            }
        }
    }
    
    
    public class FileDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //配置、job
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            //jar包
            job.setJarByClass(FileDriver.class);
    
            //Mapper、Reducer
            job.setMapperClass(FileMapper.class);
            job.setReducerClass(FileReducer.class);
    
            //Mapper输出
            job.setMapOutputKeyClass(IntWritable.class);
            job.setMapOutputValueClass(Text.class);
    
            //Reudcer输出
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            //自定义输出类
            job.setOutputFormatClass(FuncFileOutputFormat.class);
    
            //文件输入输出流
            FileInputFormat.setInputPaths(job,new Path("G:\mapreduce\outputformat\in"));
            FileOutputFormat.setOutputPath(job,new Path("G:\mapreduce\outputformat\out"));
    
            //提交任务
            if (job.waitForCompletion(true)){
                System.out.println("运行完成!");
            }else {
                System.out.println("运行失败!");
            }
        }
    }
    

      

  • 相关阅读:
    HDU 1716 排列2
    HDU 3405 World Islands
    HDU 5624 KK's Reconstruction
    HDU 2689 Tree
    UVA 12075 Counting Triangles
    UVA 11100 The Trip, 2007
    [USACO 2004DEC] Navigation Nightmare
    [USACO 2017DEC] Barn Painting
    [Usaco2017 Dec] A Pie for a Pie
    [USACO 2017DEC] Greedy Gift Takers
  • 原文地址:https://www.cnblogs.com/HelloBigTable/p/10638866.html
Copyright © 2011-2022 走看看