zoukankan      html  css  js  c++  java
  • MR 自定义inputformat,outputformat

    package com.lagou.mr.sequence;
    //自定义inputformat读取多个小文件合并为一个SequenceFile文件
    
    //SequenceFile文件中以kv形式存储文件,key--》文件路径+文件名称,value-->文件的整个内容
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import java.io.IOException;
    
    
    //TextInputFormat中泛型是LongWritable:文本的偏移量, Text:一行文本内容;指明当前inputformat的输出数据类型
    //自定义inputformat:key-->文件路径+名称,value-->整个文件内容
    public class CustomInputFormat extends FileInputFormat<Text, BytesWritable> {
    
        //重写是否可切分
        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            //对于当前需求,不需要把文件切分,保证一个切片就是一个文件
            return false;
        }
    
        //recordReader就是用来读取数据的对象
        @Override
        public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            CustomRecordReader recordReader = new CustomRecordReader();
            //调用recordReader的初始化方法
            recordReader.initialize(split, context);
            return recordReader;
        }
    }
    package com.lagou.mr.sequence;
    
    import com.sun.tools.doclint.Checker;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    //负责读取数据,一次读取整个文件内容,封装成kv输出
    public class CustomRecordReader extends RecordReader<Text, BytesWritable> {
        private FileSplit split;
        //hadoop配置文件对象
        private Configuration conf;
    
    
        //定义key,value的成员变量
        private Text key = new Text();
        private BytesWritable value = new BytesWritable();
    
        //初始化方法,把切片以及上下文提升为全局
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            this.split = (FileSplit) split;
            conf = context.getConfiguration();
        }
    
    
        private Boolean flag = true;
    
        //用来读取数据的方法
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            //对于当前split来说只需要读取一次即可,因为一次就把整个文件全部读取了。
            if (flag) {
                //准备一个数组存放读取到的数据,数据大小是多少?
                byte[] content = new byte[(int) split.getLength()];
                final Path path = split.getPath();//获取切片的path信息
                final FileSystem fs = path.getFileSystem(conf);//获取到文件系统对象
    
                final FSDataInputStream fis = fs.open(path); //获取到输入流
    
                IOUtils.readFully(fis, content, 0, content.length); //读取数据并把数据放入byte[]
                //封装key和value
                key.set(path.toString());
                value.set(content, 0, content.length);
    
                IOUtils.closeStream(fis);
                //把再次读取的开关置为false
                flag = false;
                return true;
            }
    
    
            return false;
        }
    
        //获取到key
        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
            return key;
        }
    
        //获取到value
        @Override
        public BytesWritable getCurrentValue() throws IOException, InterruptedException {
            return value;
        }
    
        //获取进度
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }
    
        //关闭资源
        @Override
        public void close() throws IOException {
    
        }
    }

    自定义outputformat

    package com.lagou.mr.output;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    
    public class CustomOutputFormat extends FileOutputFormat<Text, NullWritable> {
        //写出数据的对象
        @Override
        public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
            //定义写出数据的路径信息,并获取到输出流传入writer对象中
            final Configuration conf = context.getConfiguration();
            final FileSystem fs = FileSystem.get(conf);
            //定义输出的路径
            final FSDataOutputStream lagouOut = fs.create(new Path("e:/lagou.log"));
            final FSDataOutputStream otherOut = fs.create(new Path("e:/other.log"));
            CustomWriter customWriter = new CustomWriter(lagouOut, otherOut);
            return customWriter;
        }
    }
    package com.lagou.mr.output;
    
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    
    import java.io.IOException;
    
    
    public class CustomWriter extends RecordWriter<Text, NullWritable> {
        //定义成员变量
        private FSDataOutputStream lagouOut;
        private FSDataOutputStream otherOut;
    
        //定义构造方法接收两个输出流
    
    
        public CustomWriter(FSDataOutputStream lagouOut, FSDataOutputStream otherOut) {
            this.lagouOut = lagouOut;
            this.otherOut = otherOut;
        }
    
        //写出数据的逻辑,控制写出的路径
        @Override
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
            //写出数据需要输出流
            final String line = key.toString();
            if (line.contains("lagou")) {
                lagouOut.write(line.getBytes());
                lagouOut.write("
    ".getBytes());
            } else {
                otherOut.write(line.getBytes());
                otherOut.write("
    ".getBytes());
            }
        }
    
        //关闭,释放资源
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
    
            IOUtils.closeStream(lagouOut);
            IOUtils.closeStream(otherOut);
        }
    }
  • 相关阅读:
    自定义属性的操作 element.属性 以及 element.getAttribute('属性') 获取、自定义方法以及修改值
    鼠标点击、经过,离开案例
    水平垂直居中方法 flex和table-cell区别 父盒子使用定位 水平方向、垂直方向上是否受到影响?
    关于margin 和 margin auto
    python基础
    实验二流程图
    关于实验二的补充(面向对象的程序设计)
    树的重心 POJ_1655
    KMP板子题
    Educational Codeforces Round 62 (Rated for Div. 2) 2019年3月23日
  • 原文地址:https://www.cnblogs.com/wanghzh/p/14927765.html
Copyright © 2011-2022 走看看