zoukankan      html  css  js  c++  java
  • 自定义 OutputFormat案例

    一、需求分析

    1、内容

    http://www.baidu.com
    http://www.google.com
    http://cn.bing.com
    http://www.atguigu.com
    http://www.sohu.com
    http://www.sina.com
    http://www.sin2a.com
    http://www.sin2desa.com
    http://www.sindsafa.com

    2、需求

    过滤输入的log日志,包含atguigu的网站输出到e:a.log,不包含atguigu的网站输出到e:.log

    3、分析

    a、输出结果分离

    b、自定义OutputFormat类

    二、具体代码

    1、Mapper

    package com.filter;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // value就是一行的数据,因此,value为key
            context.write(value, NullWritable.get());
        }
    }

    注意:Mapper的输出的value的类型为 NullWritable

    2、Reducer

    package com.filter;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class FilterReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
        Text v = new Text();
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            // 1. 防止重复
            for (NullWritable value : values) {
                // 2.换行输入
                String line = key.toString() + "	
    ";
                v.set(line);
                context.write(v, NullWritable.get());
            }
        }
    }

    注意:换行输入

    3、自定义OutputFormat类

    package com.filter;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class FOutputFormat extends FileOutputFormat<Text, NullWritable> {
        public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
            FrecordWriter recordWriter = new FrecordWriter(job);
            return recordWriter;
        }
    }

    注意:继承FileOutputFormat类,输入的数据值类型为,Reducer输出的数据中类型

    4、实现自定义OutputFormat类的 RecordWriter继承类

    package com.filter;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    
    import java.io.IOException;
    
    public class FrecordWriter extends RecordWriter<Text, NullWritable> {
        // 1.获取系统文件
        FileSystem fs;
        FSDataOutputStream aOS;
        FSDataOutputStream bOS;
        public FrecordWriter(TaskAttemptContext job) {
            try {
                Configuration conf = job.getConfiguration();
                fs = FileSystem.get(conf);
                // 2.创建输出文件路径
                Path aPth = new Path("E:\a.log");
                Path bPth = new Path("E:\b.log");
                // 3.创建输出流
                aOS = fs.create(aPth);
                bOS = fs.create(bPth);
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
            if (key.toString().contains("atguigu")){
                aOS.write(key.toString().getBytes());
            }else {
                bOS.write(key.toString().getBytes());
            }
        }
    
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            IOUtils.closeStream(aOS);
            IOUtils.closeStream(bOS);
        }
    }

    注意:

    a、在构造方法中,获取fs和输出流

    b、在write()方法中,写主逻辑

    c、在close方法中关闭

    5、Driver

    package com.filter;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class FilterDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            args = new String[]{"E:\a\input\test.log", "E:\a\output"};
            // 1. job
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 2. 设置jar
            job.setJarByClass(FilterDriver.class);
            // 3. 关联Mapper和Reduce
            job.setMapperClass(FilterMapper.class);
            job.setReducerClass(FilterReducer.class);
            // 4. 设置Mapper的输出的k和v
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
            // 5. 设置输出的k v
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            // 8. 设置 OutputFormat
            job.setOutputFormatClass(FOutputFormat.class);
            // 6. 设置输入输出
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            // 7. 提交任务
            boolean wait = job.waitForCompletion(true);
            System.exit(wait? 0:1);
        }
    }

    注意:设置OutputFormat的驱动

  • 相关阅读:
    cgic: CGI的C函数库
    linux下的webserver BOA及CGIC库的使用指南(转帖)
    UDP 收/发 广播包
    winsock 收发广播包
    Linux系统下UDP发送和接收广播消息小例子
    uboot里读sd卡内容
    uboot从SD卡烧写内核和文件系统
    intellij 创建一个文件自动就add到git了,这个怎么取消
    内部类和外部类之间的相互调用
    JDK8的新特性——Lambda表达式
  • 原文地址:https://www.cnblogs.com/wt7018/p/13635373.html
Copyright © 2011-2022 走看看