zoukankan      html  css  js  c++  java
  • OutputFormat---自定义输出方式

    简介

    可以自定义输出的格式和文件,例如包含某字段的输出到一个指定文件,不包含某字段的输出到另一个文件。

    案例

    数据

    www.nevesettle.com
    www.baidu.com
    www.qq.com
    www.mi.com
    www.jd.com
    www.std.com
    

    Mapper

    package com.neve.outputformat;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            context.write(value,NullWritable.get());
        }
    }
    
    

    Reducer

    package com.neve.outputformat;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    
            for (NullWritable value : values) {
                context.write(key,value);
            }
    
        }
    }
    
    

    Driver

    package com.neve.outputformat;
    
    import com.neve.phone.FlowBean;
    import com.neve.phone.FlowMapper;
    import com.neve.phone.FlowReducer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class LogDriver {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            //1.创建配置
            Configuration configuration = new Configuration();
            //2.创建job
            Job job = Job.getInstance(configuration);
            //3.关联驱动类
            job.setJarByClass(LogDriver.class);
            //4.关联mapper和reducer类
            job.setMapperClass(LogMapper.class);
            job.setReducerClass(LogReducer.class);
            //5.设置mapper的输出值和value
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
            //6.设置最终的输出值和value
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            //7.设置输入输出路径
            FileInputFormat.setInputPaths(job,new Path("F:\Workplace\IDEA_Workplace\hadoopStudy2\outputformatinput"));
            FileOutputFormat.setOutputPath(job,new Path("F:\Workplace\IDEA_Workplace\hadoopStudy2\outputformatoutput"));
            //设置自定义的format类
            job.setOutputFormatClass(LogOutputFormat.class);
            //8.提交job
            job.waitForCompletion(true);
    
        }
    }
    
    

    LogOutputFormat

    package com.neve.outputformat;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
    
        public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
            LogRecordWriter logw = new LogRecordWriter(job);
            return logw;
        }
    }
    	
    

    LogRecordWriter

    package com.neve.outputformat;
    
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    
    import java.io.IOException;
    
    public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
    
        //定义输出路径
        private String nelog = "F:\nelog.log";
        private String otherlog = "F:\otherlog.log";
    
        private FileSystem fs ;
        private FSDataOutputStream neos;
        private FSDataOutputStream otheros;
    
    
        public LogRecordWriter(TaskAttemptContext job) throws IOException {
            //获取文件系统对象
            fs = FileSystem.get(job.getConfiguration());
            neos = fs.create(new Path(nelog));
            otheros = fs.create(new Path(otherlog));
        }
    
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
            String string = key.toString();
            if (string.contains("neve")){
                neos.writeBytes(string + "
    ");
            }else {
                otheros.writeBytes(string + "
    ");
            }
        }
    
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            IOUtils.closeStream(neos);
            IOUtils.closeStream(otheros);
        }
    }
    
    
  • 相关阅读:
    eclipse快捷键失效
    git学习 branch log rebase merge fetch remote add push pull
    解决netty tcp自定义消息格式粘包/拆包问题
    多线程while(!state){}有问题,volatile优化,sleep睡着之后唤醒,刷新变量缓存
    玄学eclipse ,突然所有文件报错,然后,ctrl+a, ctrl+x, ctrl+v就好了
    玄学springboot applicationcontext.getBean(用类名String还是类型Class), getBean(..)的调用场景结果不同?getBean(..)还会阻塞?@DependsOn按照名称依赖,那么getBean用类名String
    玄学yml,被@ActiveProfiles注解误导
    玄学yml,被@ActiveProfiles注解误导
    java动态代理,多服务实例,线程安全target,注解,面向切面修改具有注解的方法行为,ThreadLocal<Object>
    java键盘输入方法-
  • 原文地址:https://www.cnblogs.com/wuren-best/p/13797885.html
Copyright © 2011-2022 走看看