zoukankan      html  css  js  c++  java
  • hadoop-MapReduce框架原理之OutputFormat数据输出

    1.OutputFormat接口实现类

      OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口;

      1.1 文本输出TextOutputFormat

        默认的输出格式是TextOutputFormat,它把每条记录写为文本行;它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串;

      1.2 SequenceFileOutputFormat

        将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易压缩;

      1.3 自定义OutputFormat

        根据用户需求,自定义实现输出;

    2.自定义OutputFormat

      2.1 使用场景

        为了实现控制最终文件的输出路径和输出格式,可以自动以OutputFormat;

        例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义OutputFormat来实现;

      2.2 自定义OutputFormat步骤

        2.2.1 自定义一个类继承FileOutputFormat;

        2.2.2 改写RecordWriter,具体改写输出数据的方法write();

      2.3 需求

        过滤输入的log日志,包含baidu的网站输出到baidu.log文件中,不包含baidu的网站输出到other.log文件中;

    http://www.baidu.com
    http://www.google.com
    http://cn.bing.com
    http://www.baidu.com
    http://www.sohu.com
    http://www.sina.com
    http://www.sin2a.com
    http://www.sin2desa.com
    http://www.sindsafa.com

    3.自定义OutputFormat案例实操

      3.1 FilterMapper编写

    package com.wn.outputformat;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class FilterMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //写出
            context.write(value,NullWritable.get());
        }
    }

      3.2 FilterReducer编写

    package com.wn.outputformat;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class FilterReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
    
        Text text=new Text();
    
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            //获取一行
            String line = text.toString();
            //拼接
            line=line+"
    ";
            //设置
            text.set(line);
            //输出
            context.write(text,NullWritable.get());
        }
    }

      3.3 FilterRecordWriter编写

    package com.wn.outputformat;
    
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    
    import java.io.IOException;
    
    public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {
    
        FSDataOutputStream atguiguOut = null;
        FSDataOutputStream otherOut = null;
    
        public FilterRecordWriter(TaskAttemptContext job){
            //获取文件系统
            FileSystem fs;
            try {
                fs=FileSystem.get(job.getConfiguration());
                //创建输出文件路径
                Path atguiguPath = new Path("E:\北大青鸟\大数据04\hadoop\baidu.log");
                Path otherPath = new Path("E:\北大青鸟\大数据04\hadoop\other.log");
                //创建输出流
                atguiguOut = fs.create(atguiguPath);
                otherOut=fs.create(otherPath);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
            //判断包含"baidu"输出到不同文件
            if (text.toString().contains("baidu")){
                atguiguOut.write(text.toString().getBytes());
            }else{
                otherOut.write(text.toString().getBytes());
            }
        }
    
        @Override
        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            //关闭资源
            IOUtils.closeStream(atguiguOut);
            IOUtils.closeStream(otherOut);
        }
    }

      3.4 FilterOutputFormat编写

    package com.wn.outputformat;
    
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable> {
    
        @Override
        public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            return new FilterRecordWriter(taskAttemptContext);
        }
    }

      3.5 FilterDriver编写

    package com.wn.outputformat;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class FilterDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //输入输出路径需要根据自己电脑上的实际的输入输出路径设置
            args=new String[]{"E:\北大青鸟\大数据04\hadoop\input","E:\北大青鸟\大数据04\hadoop\output"};
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(FilterDriver.class);
            job.setMapperClass(FilterMapper.class);
            job.setReducerClass(FilterReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            //将自己自定义的输出格式组件设置到job中
            job.setOutputFormatClass(FilterOutputFormat.class);
    
            FileInputFormat.setInputPaths(job,new Path(args[0]));
    
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);
        }
    }
  • 相关阅读:
    7月15日考试 题解(链表+状压DP+思维题)
    暑假集训日记
    C# .NET 使用 NPOI 生成 .xlsx 格式 Excel
    JavaSE 基础 第42节 局部内部类
    JavaSE 基础 第41节 匿名内部类
    JavaSE 基础 第40节 内部类概述
    JavaSE 基础 第39节 接口的应用
    JavaSE 基础 第38节 接口的实现
    JavaSE 基础 第37节 接口概述
    JavaSE 基础 第36节 抽象类概述与使用
  • 原文地址:https://www.cnblogs.com/wnwn/p/12642495.html
Copyright © 2011-2022 走看看