zoukankan      html  css  js  c++  java
  • 【Hadoop离线基础总结】MapReduce自定义InputFormat和OutputFormat案例

    MapReduce自定义InputFormat和OutputFormat案例


    自定义InputFormat 合并小文件

    • 需求
      无论hdfs还是mapreduce,存放小文件会占用元数据信息,白白浪费内存,实践中,又难免面临处理大量小文件的场景

    • 优化小文件的三种方式
      1.在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
      2.在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
      3.在mapreduce处理时,可采用combineInputFormat提高效率

    • 用代码实现第二种方式

    自定义InputFormat

    package cn.itcast.demo3;
    
    import jdk.nashorn.internal.ir.Splittable;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import java.io.IOException;
    import java.util.List;
    
    public class MyInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
    
    
        @Override
        public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            MyRecordReader myRecordReader = new MyRecordReader();
            myRecordReader.initialize(split, context);
            return myRecordReader;
        }
    
        /**
         * 表示我们的文件是否可切分
         * 返回false表示我们的文件不可切分,读取文件时会一次性将文件内容全部读取出来
         *
         * @param context
         * @param filename
         * @return
         */
        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            return false;
        }
    }
    

    自定义RecordReader

    package cn.itcast.demo3;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    import java.io.InputStream;
    
    public class MyRecordReader extends RecordReader<NullWritable, BytesWritable> {
        //定义文件切片
        private FileSplit fileSplit;
        //定义文件configuration
        private Configuration configuration;
        //定义v2
        private BytesWritable bytesWritable = new BytesWritable();
        //定义下面nextKeyValue返回值为false
        private boolean nextKeyValue = false;
    
        /**
         * 初始化方法
         * 这里可以拿到文件切片,也就意味着可以拿到文件,将文件转换为字节数组
         *
         * @param split
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            //获取文件切片
            this.fileSplit = (FileSplit) split;
            //获取文件Configuration
            this.configuration = context.getConfiguration();
        }
    
        /**
         * 返回true,表示文件读取完成,不会再往下继续读取文件
         * 返回false,表示会继续往下读取文件
         *
         * @return
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!nextKeyValue) {
                //根据文件的切片,将文件的内容全部读取出来,封装到BytesWritable中
                byte[] fileContent = new byte[(int) fileSplit.getLength()];
                //获取文件切片路径
                Path path = fileSplit.getPath();
                //获取文件系统
                FileSystem fileSystem = path.getFileSystem(configuration);
                //打开文件输入流
                FSDataInputStream inputStream = fileSystem.open(path);
                //将输入流转到字节数组中
                IOUtils.readFully(inputStream, fileContent, 0, (int) fileSplit.getLength());
                bytesWritable.set(fileContent, 0, fileContent.length);
                //将读取文件的标识设置为true,表示文件已经读取完成,不需要继续读取
                nextKeyValue = true;
                IOUtils.closeStream(inputStream);
                return nextKeyValue;
            }
            return false;
        }
    
        /**
         * 用来返回k1的值
         *
         * @return
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public NullWritable getCurrentKey() throws IOException, InterruptedException {
            return NullWritable.get();
        }
    
        /**
         * 用来返回v1的值
         *
         * @return
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public BytesWritable getCurrentValue() throws IOException, InterruptedException {
            return bytesWritable;
        }
    
        /**
         * 不太需要注意,就是用来读取运行进度的
         *
         * @return
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return nextKeyValue ? 1.0F : 0.0F;
        }
    
        /**
         * 用来读取完后释放资源的,了解即可
         *
         * @throws IOException
         */
        @Override
        public void close() throws IOException {
    
        }
    }
    

    定义一个Mapper类

    package cn.itcast.demo3;
    
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    public class MyMapperInput extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
        @Override
        protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
            //获取文件切片
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            //获取文件名称
            String name = inputSplit.getPath().getName();
            //输出k2,v2
            context.write(new Text(name), value);
        }
    }
    

    程序main函数入口

    package cn.itcast.demo3;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class OwnInputFormatMain extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
            //创建job对象
            Job job = Job.getInstance(super.getConf(), "ownInputFormat");
            //输入数据,设置输入路径,注意这里是自动以的InputFormat
            job.setInputFormatClass(MyInputFormat.class);
            MyInputFormat.addInputPath(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/5.大数据离线第五天/自定义inputformat_小文件合并/input"));
    
            //自定义map逻辑
            job.setMapperClass(MyMapperInput.class);
            //设置k2,v2输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(BytesWritable.class);
    
            //虽然没有reducer,但是不设置reduce输出类型,默认的是<LongWritable,Text>
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BytesWritable.class);
    
    		//输出数据,设置输出路径
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            SequenceFileOutputFormat.setOutputPath(job,new Path("file:////Volumes/赵壮备份/大数据离线课程资料/5.大数据离线第五天/自定义inputformat_小文件合并/sequence_output"));
    
            //提交任务到集群
            boolean b = job.waitForCompletion(true);
            return b ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int run = ToolRunner.run(new Configuration(), new OwnInputFormatMain(), args);
            System.exit(run);
        }
    }
    

    自定义OutputFormat 将一个文件中的数据分发到不同文件

    • 需求
      将订单的好评与差评区分开来,并将最终的数据发送到不同的文件夹下面去,其中数据第九个字段表示好评,中评,差评。0:好评,1:中评,2:差评

    • 代码实现

    自定义OutputFormat

    package cn.itcast.demo4;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> {
    
    
        @Override
        public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
            //从这个方法里就可以获取一个configuration
            Configuration configuration = context.getConfiguration();
            //获取文件系统
            FileSystem fileSystem = FileSystem.get(configuration);
            //设置好评文件的输出路径
            Path goodComment = new Path("/Volumes/赵壮备份/大数据离线课程资料/5.大数据离线第五天/自定义outputformat/myGoodComment/1.txt");
            //设置差评文件的输出路径
            Path badComment = new Path("/Volumes/赵壮备份/大数据离线课程资料/5.大数据离线第五天/自定义outputformat/myBadComment/1.txt");
            //获取文件输出流
            FSDataOutputStream fsDataOutputStream = fileSystem.create(goodComment);
            FSDataOutputStream fsDataOutputStream1 = fileSystem.create(badComment);
    
            MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream, fsDataOutputStream1);
            return myRecordWriter;
        }
    }
    

    自定义RecordWriter

    package cn.itcast.demo4;
    
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    
    import java.io.IOException;
    
    public class MyRecordWriter extends RecordWriter<Text, NullWritable> {
        //使用无参和带参构造调用goodStream和badStream
        private FSDataOutputStream goodStream;
        private FSDataOutputStream badStream;
    
        public MyRecordWriter() {
        }
    
        public MyRecordWriter(FSDataOutputStream goodStream, FSDataOutputStream badStream) {
            this.goodStream = goodStream;
            this.badStream = badStream;
        }
    
        /**
         * 这个write方法就是往外写出去数据
         *
         * @param key   可以根据这个key,来判断文件究竟往哪个目录下写
         * @param value
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
            //分割导入的数据
            String[] split = key.toString().split("	");
            //获取评论状态    0:好评    1:中评    2:差评;
            //判断评论状态,如果小于等于1则写到好评文件中,否则写到差评文件中
            if (Integer.parseInt(split[9]) <= 1) {
                goodStream.write(key.toString().getBytes());
                goodStream.write("
    ".getBytes());
            } else {
                badStream.write(key.toString().getBytes());
                badStream.write("
    ".getBytes());
            }
        }
        
        @Override
        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            IOUtils.closeStream(goodStream);
            IOUtils.closeStream(badStream);
        }
    }
    

    定义一个Mapper类

    package cn.itcast.demo4;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class MyOutputMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(value, NullWritable.get());
        }
    }
    

    程序main函数入口

    package cn.itcast.demo4;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class MyOutputMain extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
            //创建job对象
            Job job = Job.getInstance(super.getConf(), "OutputFormat");
            //输入数据,设置输入路径
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.setInputPaths(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/5.大数据离线第五天/自定义outputformat/input/ordercomment.csv"));
    
            //自定义map逻辑
            job.setMapperClass(MyOutputMapper.class);
            //设置k2,v2输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            //输出数据,设置输出路径,这里的输出路径不是真正的输出路径
            job.setOutputFormatClass(MyOutputFormat.class);
            MyOutputFormat.setOutputPath(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/5.大数据离线第五天/自定义outputformat/output"));
    
            //提交任务至集群
            boolean b = job.waitForCompletion(true);
            return b ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int run = ToolRunner.run(new Configuration(), new MyOutputMain(), args);
            System.exit(run);
        }
    }
    
  • 相关阅读:
    Kth Largest Element in an Array -- LeetCode
    First Missing Positive -- LeetCode
    Path Sum II (Find Path in Tree) -- LeetCode
    Email List
    Divide Two Integers -- LeetCode
    Delete Node in a Linked List
    Compare Version Numbers -- LeetCode
    Broken Code
    Trapping Rain Water (Bar Height) -- LeetCode
    Count and Say (Array Length Encoding) -- LeetCode
  • 原文地址:https://www.cnblogs.com/zzzsw0412/p/12772487.html
Copyright © 2011-2022 走看看