zoukankan      html  css  js  c++  java
  • MapReduce自定义InputFormat和OutputFormat

    一、自定义InputFormat

    需求:将多个小文件合并为SequenceFile(存储了多个小文件)
    
    存储格式:文件路径+文件的内容
    
            c:/a.txt I love Beijing 
            c:/b.txt I love China
            
            inputFormat(自定义加上路径)

    1.Mapper类

    package com.css.inputformat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{
    
        Text k = new Text();
        
        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
            // 1.拿到切片信息
            FileSplit split = (FileSplit) context.getInputSplit();
            // 2.路径
            Path path = split.getPath();
            // 3.即带路径又带名称
            k.set(path.toString());
        }
        
        @Override
        protected void map(NullWritable key, BytesWritable value,Context context)
                        throws IOException, InterruptedException {
            // 输出
            context.write(k, value);
        }
    }

    2.Reducer类

    package com.css.inputformat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{
    
        @Override
        protected void reduce(Text key, Iterable<BytesWritable> values,
                Context context) throws IOException, InterruptedException {
            for (BytesWritable v : values) {
                context.write(key, v);
            }
        }
    }

    3.自定义InputFormat类

    package com.css.inputformat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    /**
     * 1.创建自定义Inputformat
     */
    public class FuncFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{
    
        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            // 不切原来的文件
            return false;
        }
    
        @Override
        public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            FuncRecordReader RecordReader = new FuncRecordReader();
            return RecordReader;
        }
    }

    4.自定义RecordReader类

    package com.css.inputformat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    /**
     * 2.编写RecordReader
     */
    public class FuncRecordReader extends RecordReader<NullWritable, BytesWritable>{
    
        boolean isProcess = false;
        FileSplit split;
        Configuration conf;
        BytesWritable value = new BytesWritable();
        // 初始化
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            // 初始化文件切片
            this.split = (FileSplit) split;
            // 初始化配置信息
            conf = context.getConfiguration();
        }
    
        @Override
        public boolean nextKeyValue() {
            if (!isProcess) {
                // 1.根据切片的长度来创建缓冲区
                byte[] buf = new byte[(int)split.getLength()];
                FSDataInputStream fis = null;
                FileSystem fs = null;
                try {
                    // 2.获取路径
                    Path path = split.getPath();
                    
                    // 3.根据路径获取文件系统
                    fs = path.getFileSystem(conf);
                    
                    // 4.拿到输出流
                    fis = fs.open(path);
                    
                    // 5.数据拷贝
                    IOUtils.readFully(fis, buf, 0, buf.length);
                    
                    // 6.拷贝缓存到最终的输出
                    value.set(buf, 0, buf.length);;
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    IOUtils.closeStream(fis);
                    IOUtils.closeStream(fs);
                }
                
                isProcess = true;
                return true;
            }
            return false;
        }
    
        @Override
        public NullWritable getCurrentKey() throws IOException, InterruptedException {
            return NullWritable.get();
        }
    
        @Override
        public BytesWritable getCurrentValue() throws IOException, InterruptedException {
            return value;
        }
    
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }
    
        @Override
        public void close() throws IOException {
        }
    }

    5.Driver类

    package com.css.inputformat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    
    public class SequenceDriver {
        public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
            // 1.获取job信息
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            // 2.获取jar包
            job.setJarByClass(SequenceDriver.class);
    
            // 3.获取自定义的mapper与reducer类
            job.setMapperClass(SequenceFileMapper.class);
            job.setReducerClass(SequenceFileReducer.class);
            
            // 4.设置自定义读取方式
            job.setInputFormatClass(FuncFileInputFormat.class);
            // 5.设置默认的输出方式
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
            // 6.设置map输出的数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(BytesWritable.class);
    
            // 7.设置reduce输出的数据类型(最终的数据类型)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BytesWritable.class);
    
            // 8.设置输入存在的路径与处理后的结果路径
            FileInputFormat.setInputPaths(job, new Path("c:/in1027/"));
            FileOutputFormat.setOutputPath(job, new Path("c:/out1027/"));
    
            // 9.提交任务
            boolean rs = job.waitForCompletion(true);
            System.out.println(rs ? 0 : 1);
        }
    }

    6.输入小文件

    (1)a.txt
    I love Beijing
    (2)b.txt
    I love China
    (3)c.txt
    Bejing is the capital of China

    7.输出文件part-r-00000

    SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable      嫜瑻z萶2
    ?擎?  (   file:/c:/in1027/a.txt   I love Beijing   &   file:/c:/in1027/b.txt   I love China   8   file:/c:/in1027/c.txt   Bejing is the capital of China
     
    二、自定义OutputFormat
    需求:过滤日志文件
        把包含main的放在一个文件中 d:/main.logs
        把不包含main的放在另外一个文件 d:/other.logs

    1.Mapper类

    package com.css.outputformat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class FileMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
    
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 输出
            context.write(value, NullWritable.get());
        }
    }

    2.Reducer类

    package com.css.outputformat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class FileReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
    
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values,
                Context context) throws IOException, InterruptedException {
            // 数据换行
            String k = key.toString() + "
    ";
            context.write(new Text(k), NullWritable.get());
        }
    }

    3.自定义OutputFormat类

    package com.css.outputformat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class FuncFileOutputFormat extends FileOutputFormat<Text, NullWritable>{
    
        @Override
        public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
                throws IOException, InterruptedException {
            FileRecordWriter fileRecordWriter = new FileRecordWriter(job);
            return fileRecordWriter;
        }
    }

    4.自定义RecordWriter类

    package com.css.outputformat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    
    public class FileRecordWriter extends RecordWriter<Text, NullWritable>{
    
        Configuration conf = null;
        FSDataOutputStream mainlog = null;
        FSDataOutputStream otherlog = null;
        
        // 1.定义数据输出路径
        public FileRecordWriter(TaskAttemptContext job) throws IOException{
            // 获取配置信息
            conf = job.getConfiguration();
            // 获取文件系统
            FileSystem fs = FileSystem.get(conf);
            // 定义输出路径
            mainlog = fs.create(new Path("c:/outputmain/main.logs")); // part-r-00000
            otherlog = fs.create(new Path("c:/outputother/other.logs"));
        }
        
        // 2.数据输出
        @Override
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
            // 判断的话根据key
            if (key.toString().contains("main")) {
                // 写出到文件
                mainlog.write(key.getBytes());
            }else {
                otherlog.write(key.getBytes());
            }
        }
    
        // 3.关闭资源
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            if (null != mainlog) {
                mainlog.close();
            }
            if (null != otherlog) {
                otherlog.close();
            }
        }
    }

    5.Driver类

    package com.css.outputformat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class FileDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            // 1.获取job信息
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 2.获取jar包
            job.setJarByClass(FileDriver.class);
            // 3.获取自定义的mapper与reducer类
            job.setMapperClass(FileMapper.class);
            job.setReducerClass(FileReducer.class);
            // 4.设置map输出的数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
            // 5.设置reduce输出的数据类型(最终的数据类型)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            // 6.设置自定义outputFormat
            job.setOutputFormatClass(FuncFileOutputFormat.class);
            // 7.设置输入存在的路径与处理后的结果路径
            FileInputFormat.setInputPaths(job, new Path("c:/in1029/"));
            FileOutputFormat.setOutputPath(job, new Path("c:/out1029/"));
            // 8.提交任务
            boolean rs = job.waitForCompletion(true);
            System.out.println(rs ? 0 : 1);
        }
    }

    6.输入文件data.logs

    http://www.baidu.com
    http://taobao.com
    http://jd.com
    http://it.com
    http://main.js
    http://qq.com
    http://main.com.cn

    7.输出文件

    (1)main.logs
    http://main.com.cn
     http://main.js
    2)other.logs
    http://it.com
     http://jd.com
     http://qq.com
     http://taobao.com
     http://www.baidu.com
      
  • 相关阅读:
    Java中Calendar.DAY_OF_WEEK需要减一的原因
    类变量方法,局部变量和成员变量的区别(this关键字的使用)
    简述位移运算符(二进制转换示例)
    Java循环结构之while和do-while循环
    在MyEclipse中使用javadoc导出API文档详解
    js控制input type=checkbox 的勾选
    DWZ框架一些技巧
    三层规则嵌套逻辑勾选
    关于DWZ模板中全选的使用
    关于针对不同需求。又不需要改之前代码的一个列子
  • 原文地址:https://www.cnblogs.com/areyouready/p/9904262.html
Copyright © 2011-2022 走看看