zoukankan      html  css  js  c++  java
  • Hadoop(16)-MapReduce框架原理-自定义FileInputFormat

    1. 需求

    将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value

    三个小文件

    one.txt

    yongpeng weidong weinan
    sanfeng luozong xiaoming

    two.txt

    shuaige changmo zhenqiang 
    dongli lingu xuanxuan

    three.txt

    longlong fanfan
    mazong kailun yuhang yixin
    longlong fanfan
    mazong kailun yuhang yixin

    2. 需求分析

    3.案例代码

    1) 自定义RecordReader

    package com.nty.inputformat;
    
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    /**
     * author nty
     * date time 2018-12-11 9:10
     */
    public class CustomRecordReader extends RecordReader<Text, BytesWritable> {
    
        /**
         * 由于采用了FileInputFormat的输入方式,所以输入源3个文件,会分成三个切片,所以一个RecordReader只处理一个文件,一次读完
         */
    
        //标记文件是否被读过,true表示没被读过
        private boolean flag = true;
    
        private Text key = new Text();
        private BytesWritable value = new BytesWritable();
    
        //输入流
        FSDataInputStream fis;
    
        private FileSplit fs;
    
        /**
         * 初始化方法,只调用一次
         * @param split
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            //FileSplit是InputSplit的子类
            fs = (FileSplit) split;
    
            //获取文件路径
            Path path = fs.getPath();
    
            //获取文件系统
            FileSystem fileSystem = FileSystem.get(context.getConfiguration());
            //FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
    
            //开流
            fis = fileSystem.open(path);
        }
    
        /**
         * 读取下一组KV
         * @return 读到了返回true,反之返回false
         * @throws IOException
         * @throws InterruptedException
         */
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if(flag){
                //读取文件进入key和value
                String path = fs.getPath().toString();
                key.set(path);
    
                //文件是一次性读完,bytes的长度不能为普遍的1024,当然这么写会涉及到大文件的问题,不做讨论.
                byte[] bytes = new byte[(int) fs.getLength()];
                fis.read(bytes);
                value.set(bytes,0,bytes.length);
    
                //重新标记
                flag = false;
    
                return  true;
            }
            return false;
        }
    
        /**
         * 获取当前读到的key
         * @return
         * @throws IOException
         * @throws InterruptedException
         */
        public Text getCurrentKey() throws IOException, InterruptedException {
            return this.key;
        }
    
        /**
         * 获取当前读到的value
         * @return
         * @throws IOException
         * @throws InterruptedException
         */
        public BytesWritable getCurrentValue() throws IOException, InterruptedException {
            return this.value;
        }
    
        /**
         * 获取当前读取的进度
         * @return
         * @throws IOException
         * @throws InterruptedException
         */
        public float getProgress() throws IOException, InterruptedException {
            //文件一次读完,只有0和1的进度,根据flag来判断
            return flag ? 0f : 1f;
        }
    
        /**
         * 关闭资源
         * @throws IOException
         */
        public void close() throws IOException {
            IOUtils.closeStream(fis);
        }
    }

    2) 自定义Inputformat

    package com.nty.inputformat;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import java.io.IOException;
    
    /**
     * author nty
     * date time 2018-12-11 9:09
     */
    //需求中,key为文件路径+名称,所以key类型为Text,value为文件内容,用BytesWritable
    public class CustomInputFormat extends FileInputFormat<Text, BytesWritable> {
    
        //最后输出的value为一个文件,所让文件不能被切分,返回false
        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            return false;
        }
    
        //返回自定义的 RecordReader
        public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            return new CustomRecordReader();
        }
    }

    3) 编写Mapper类

    package com.nty.inputformat;
    
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * author nty
     * date time 2018-12-11 9:10
     */
    public class CustomMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
        @Override
        protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
            context.write(key,value);
        }
    }

    4) 编写Reducer类

    package com.nty.inputformat;
    
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * author nty
     * date time 2018-12-11 9:10
     */
    public class CustomReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
        @Override
        protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
            for (BytesWritable value : values) {
                context.write(key, value);
            }
        }
    }

    5) 编写Driver类

    package com.nty.inputformat;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    
    /**
     * author nty
     * date time 2018-12-11 9:10
     */
    public class CustomDriver {
    
        public static void main(String[] args) throws  Exception{
            //获取job
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
    
            //设置类
            job.setJarByClass(CustomDriver.class);
            //设置input和output
            job.setInputFormatClass(CustomInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
            //设置Mapper和Reducer
            job.setMapperClass(CustomMapper.class);
            job.setReducerClass(CustomReducer.class);
    
            //设置Mapper和Reducer的输入输出
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(BytesWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BytesWritable.class);
    
            //设置文件路径
            FileInputFormat.setInputPaths(job, new Path("d:\Hadoop_test"));
            FileOutputFormat.setOutputPath(job, new Path("d:\Hadoop_test_out"));
            //提交
            boolean b = job.waitForCompletion(true);
    
            System.exit(b ? 0 : 1);
    
        }
    }
  • 相关阅读:
    去掉百度地图左下角文字和图标。
    使用CMake,且在GCC编译时指定相对源代码路径选项BUG的问题
    一键杀死某些指定进程的脚本
    KMS使用CLion作为IDE来调试
    ubuntu 18.04下安装编译的KMS,依赖库
    ubuntu 18.04下编译最新版本的KMS
    configure.ac中AC_CHECK_LIB的问题
    C/C++下__FILE__参数过长的问题解决办法
    Linux 下 UltraEdit 版本 破解 30 天试用限制
    ubuntu下配置ProFtpd服务使用sqlite3作为后端用户认证
  • 原文地址:https://www.cnblogs.com/duoduotouhenying/p/10101817.html
Copyright © 2011-2022 走看看