zoukankan      html  css  js  c++  java
  • Hadoop(20)-MapReduce框架原理-OutputFormat

    1.outputFormat接口实现类

    2.自定义outputFormat

    步骤:

    1). 定义一个类继承FileOutputFormat

    2). 定义一个类继承RecordWrite,重写write方法

    3. 案例

    有一个log文件,将包含nty的输出到nty.log文件,其他的输出到other.log

    http://www.baidu.com
    http://www.google.com
    http://cn.bing.com
    http://www.nty.com
    http://www.sohu.com
    http://www.sina.com
    http://www.sin2a.com
    http://www.sin2desa.com
    http://www.sindsafa.com

    自定义类继承FileOutputFormat

    package com.nty.outputFormat;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * author nty
     * date time 2018-12-12 19:28
     */
    public class FilterOutputFormat extends FileOutputFormat<LongWritable, Text> {
    
    
        @Override
        public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
            FilterRecordWrite frw = new FilterRecordWrite();
            frw.init(job);
            return frw;
        }
    }

    自定义RecordWriter,重写write

    package com.nty.outputFormat;
    
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * author nty
     * date time 2018-12-12 19:29
     */
    public class FilterRecordWrite extends RecordWriter<LongWritable, Text> {
    
        private FSDataOutputStream nty;
    
        private FSDataOutputStream other;
    
        //将job通过参数传递过来
        public void init(TaskAttemptContext job) throws IOException {
    
            String outDir = job.getConfiguration().get(FileOutputFormat.OUTDIR);
    
            FileSystem fileSystem = FileSystem.get(job.getConfiguration());
    
            nty = fileSystem.create(new Path(outDir + "/nty.log"));
            other = fileSystem.create(new Path(outDir + "/other.log"));
    
        }
    
        @Override
        public void write(LongWritable key, Text value) throws IOException, InterruptedException {
            String address = value.toString() + "
    ";
    
            if(address.contains("nty")) {
                nty.write(address.getBytes());
            } else {
                other.write(address.getBytes());
            }
    
        }
    
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            //关流
            IOUtils.closeStream(nty);
            IOUtils.closeStream(other);
        }
    }

    Driver类设置

    package com.nty.outputFormat;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * author nty
     * date time 2018-12-12 19:29
     */
    public class FilterDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
    
            job.setJarByClass(FilterDriver.class);
    
            job.setOutputFormatClass(FilterOutputFormat.class);
    
            FileInputFormat.setInputPaths(job, new Path("d:\Hadoop_test"));
            FileOutputFormat.setOutputPath(job, new Path("d:\Hadoop_test_out"));
    
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);
        }
    }

    输出结果

  • 相关阅读:
    XML(学习笔记)
    css样式学习笔记
    Request(对象)
    sql一些错误修改的总结
    转载(如何学习C#)
    sql server(学习笔记2 W3Cschool)
    sql sqrver(学习笔记1 W3Cschool)
    关于 flutter开发碰到的各种问题,有的已经解决有的一直没解决或者用其他方法替代
    关于 Flutter IOS build It appears that your application still contains the default signing identifier.
    关于 flutter本地化问题 The getter 'pasteButtonLabel' was called on null
  • 原文地址:https://www.cnblogs.com/duoduotouhenying/p/10110609.html
Copyright © 2011-2022 走看看