zoukankan      html  css  js  c++  java
  • Hadoop(20)-MapReduce框架原理-OutputFormat

    1.outputFormat接口实现类

    2.自定义outputFormat

    步骤:

    1). 定义一个类继承FileOutputFormat

    2). 定义一个类继承RecordWrite,重写write方法

    3. 案例

    有一个log文件,将包含nty的输出到nty.log文件,其他的输出到other.log

    http://www.baidu.com
    http://www.google.com
    http://cn.bing.com
    http://www.nty.com
    http://www.sohu.com
    http://www.sina.com
    http://www.sin2a.com
    http://www.sin2desa.com
    http://www.sindsafa.com

    自定义类继承FileOutputFormat

    package com.nty.outputFormat;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * author nty
     * date time 2018-12-12 19:28
     */
    public class FilterOutputFormat extends FileOutputFormat<LongWritable, Text> {
    
    
        @Override
        public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
            FilterRecordWrite frw = new FilterRecordWrite();
            frw.init(job);
            return frw;
        }
    }

    自定义RecordWriter,重写write

    package com.nty.outputFormat;
    
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * author nty
     * date time 2018-12-12 19:29
     */
    public class FilterRecordWrite extends RecordWriter<LongWritable, Text> {
    
        private FSDataOutputStream nty;
    
        private FSDataOutputStream other;
    
        //将job通过参数传递过来
        public void init(TaskAttemptContext job) throws IOException {
    
            String outDir = job.getConfiguration().get(FileOutputFormat.OUTDIR);
    
            FileSystem fileSystem = FileSystem.get(job.getConfiguration());
    
            nty = fileSystem.create(new Path(outDir + "/nty.log"));
            other = fileSystem.create(new Path(outDir + "/other.log"));
    
        }
    
        @Override
        public void write(LongWritable key, Text value) throws IOException, InterruptedException {
            String address = value.toString() + "
    ";
    
            if(address.contains("nty")) {
                nty.write(address.getBytes());
            } else {
                other.write(address.getBytes());
            }
    
        }
    
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            //关流
            IOUtils.closeStream(nty);
            IOUtils.closeStream(other);
        }
    }

    Driver类设置

    package com.nty.outputFormat;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * author nty
     * date time 2018-12-12 19:29
     */
    public class FilterDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
    
            job.setJarByClass(FilterDriver.class);
    
            job.setOutputFormatClass(FilterOutputFormat.class);
    
            FileInputFormat.setInputPaths(job, new Path("d:\Hadoop_test"));
            FileOutputFormat.setOutputPath(job, new Path("d:\Hadoop_test_out"));
    
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);
        }
    }

    输出结果

  • 相关阅读:
    struts2--表单重复提交
    struts2--文件上传与下载
    springboot配置redis+jedis,支持基础redis,并实现jedis GEO地图功能
    Mybatis配置——自动使用驼峰命名 属性映射字段(默认为false)
    解决PageHelper.startPage(page, size)后,关于PageInfo的total等属性不正确等问题
    Socket网络通信——IO、NIO、AIO介绍以及区别
    FTPClient TLS 与 FTP 进行数据传输异常:Remote host closed connection during handshake
    常用Concurrent.util包工具类——高并发
    线程池——Executors
    多线程的设计模式:Future、Master-Worker
  • 原文地址:https://www.cnblogs.com/duoduotouhenying/p/10110609.html
Copyright © 2011-2022 走看看