zoukankan      html  css  js  c++  java
  • MapReduce输入输出类型、格式及实例

    输入格式

    1、输入分片与记录
    2、文件输入
    3、文本输入
    4、二进制输入
    5、多文件输入
    6、数据库格式输入

    1、输入分片与记录

    1、JobClient通过指定的输入文件的格式来生成数据分片InputSplit。
    2、一个分片不是数据本身,而是可分片数据的引用
    3、InputFormat接口负责生成分片。

    InputFormat 负责处理MR的输入部分,有三个作用:
    验证作业的输入是否规范。
    把输入文件切分成InputSplit。
    提供RecordReader 的实现类。把InputSplit读到Mapper中进行处理。

    2、文件输入

    抽象类:FilelnputFormat
    1、FilelnputFormat是全部使用文件作为数据源的InputFormat实现的基类。


    2、FilelnputFormat输入数据格式的分片大小由数据块大小决定

    FileInputFormat保存作为job输入的全部文件。并实现了对输入文件计算splits的方法。

    至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。

    package org.apache.hadoop.mapreduce.lib.input;
    public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
      protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
        return Math.max(minSize, Math.min(maxSize, blockSize));
      }
    
      /*Generate the list of files and make them into FileSplits.*/
      public List<InputSplit> getSplits(JobContext job) throws IOException {
         long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
         long maxSize = getMaxSplitSize(job);
         ......
         long blockSize = file.getBlockSize();
         long splitSize = computeSplitSize(blockSize, minSize, maxSize);
         ......
      }
      /*Get the minimum split size*/
      public static long getMinSplitSize(JobContext job) {
        return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
      }
    
      /*Get the maximum split size.*/
      public static long getMaxSplitSize(JobContext context) {
        return context.getConfiguration().getLong(SPLIT_MAXSIZE,Long.MAX_VALUE);
      }
    
        //是否分片
        /*
        Is the given filename splitable? Usually, true, but if the file is stream compressed, it will not be.
        <code>FileInputFormat</code> implementations can override this and return <code>false</code> to ensure that individual input files are never split-up so that {@link Mapper}s process entire files.
        */
        protected boolean isSplitable(JobContext context, Path filename) {
        return true;//默认须要分片
      }
    
    }
    自己定义输入格式

    假设我们不须要分片,那我们就须要对isSplitable方法进行重写
    1、继承FileInputFormat基类。
    2、重写里面的getSplits(JobContext context)方法。


    3、重写createRecordReader(InputSplit split,TaskAttemptContext context)方法。


    具体样例:
    http://blog.csdn.net/scgaliguodong123_/article/details/46492039

    InputSplit

    在运行mapreduce之前,原始数据被切割成若干split。每一个split作为一个map任务的输入,在map运行过程中split会被分解成一个个记录(key-value对), map会依次处理每一个记录。
    FileInputFormat仅仅划分比HDFS block大的文件,所以FileInputFormat划分
    的结果是这个文件或者是这个文件里的一部分。


    假设一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件
    的效率要比处理非常多小文件的效率高的原因。
    当Hadoop处理非常多小文件(文件大小小于hdfs block大小)的时候。因为
    FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个split并分配一个map任务,导致效率底下。


    比如:一个1G的文件。会被划分成16个64MB的split,并分配16个map任务处
    理,而10000个100kb的文件会被10000个map任务处理。

    Map任务的数量?

    一个InputSplit相应一个Map task。


    InputSplit的大小是由Math.max(minSize,Math.min(maxSize, blockSize))决定。
    单节点一般10-100个map task。map task运行时长不建议低于1 分钟,否
    则效率低。

    抽象类:CombineFilelnputFormat
    1、能够使用CombineFilelnputFormat来合并小文件。

    2、因为CombineFilelnputFormat是一个抽象类,使用的时候须要创建一个
    CombineFilelnputFormat的实体类,而且实现getRecordReader()的方法。

    3、避免文件分法的方法:
    A、数据块大小尽可能大。这样使文件的大小小于数据块的大小,就不用进行分片。(这样的方式不太友好)
    B、继承FilelnputFormat,而且重写isSplitable()方法。

    job.setInputFormatClass(CombineTextInputFormat.class);

    Hadoop2.6.0 CombineTextInputFormat源代码:

    package org.apache.hadoop.mapreduce.lib.input;
    /* Input format that is a <code>CombineFileInputFormat</code>-equivalent for <code>TextInputFormat</code>.*/
    public class CombineTextInputFormat
      extends CombineFileInputFormat<LongWritable,Text> {
    
      public RecordReader<LongWritable,Text> createRecordReader(InputSplit split,
        TaskAttemptContext context) throws IOException {
        return new CombineFileRecordReader<LongWritable,Text>(
          (CombineFileSplit)split, context, TextRecordReaderWrapper.class);
      }
    
      /*A record reader that may be passed to <code>CombineFileRecordReader</code> so that it can be used in a <code>CombineFileInputFormat</code>-equivalent for <code>TextInputFormat</code>.*/
      private static class TextRecordReaderWrapper
        extends CombineFileRecordReaderWrapper<LongWritable,Text> {
        // this constructor signature is required by CombineFileRecordReader
        public TextRecordReaderWrapper(CombineFileSplit split,
          TaskAttemptContext context, Integer idx)
          throws IOException, InterruptedException {
          super(new TextInputFormat(), split, context, idx);
        }
      }
    }
    

    3、文本输入

    类名:TextlnputFormat
    1、TextlnputFormat是默认的lnputFormat,每一行数据就是一条记录

    2、TextlnputFormat的key是LongWritable类型的。存储该行在整个文件的偏移量,value是每行的数据内容,Text类型。

    3、输入分片与HDFS数据块关系:TextlnputFormat每一条记录就是一行,非常有可能某一行跨数据块存放。默认以 或回车键作为一行记录。

    4、TextInputFormat继承了FileInputFormat。

    类名:KeyValueTextInputFormat
    能够通过设置key为行号的方式来知道记录的行号,而且能够通过key.value.separator.in.input设置key与value的切割符。
    当输入数据的每一行是两列,并用tab分离的形式的时候,KeyValueTextInputformat处理这样的格式的文件非常适合。

    假设行中有分隔符,那么分隔符前面的作为key,后面的作为value。假设行中没有分隔符,那么整行作为key,value为空。

    job.setInputFormatClass(KeyValueTextInputFormat.class);
    //默认分隔符就是制表符
    //conf.setStrings(KeyValueLineRecordReader.KEY_VALUE
    _SEPERATOR, "	")

    类名:NLineInputFormat
    能够设置每一个mapper处理的行数。能够通过mapred.line.input.format.lienspermap属性设置。
    NLineInputformat能够控制在每一个split中数据的行数

    //设置具体输入处理类
    job.setInputFormatClass(NLineInputFormat.class);
    //设置每一个split的行数
    NLineInputFormat.setNumLinesPerSplit(job, Integer.parseInt(args[2]));

    4、二进制输入

    输入类:

    SequenceFileInputFormat 将key和value以sequencefile格式输入。
    SequenceFileAsTextInputFormat 
    SequenceFileAsBinaryInputFormat 将key和value以原始二进制的格式输入。

    因为SequenceFile能够支持Splittable。所以能够作为mapreduce输入文件的格式,能够非常方便的得到己经含有<key,value>的分片。

    SequenceFile处理、压缩处理。

    5、多文件输入

    类名:MultipleInputs
    1、MultipleInputs能够提供多个输入数据类型。
    2、通过addInputPath()方法来设置多路径。

    6、数据库格式输入

    类名:DBInputFormat
    1、DBInputFormat是一个使用JDBC方式连接数据库,而且从关系型数据库中读取数据的一种输入格式。
    2、有多个map会去连接数据库。有可能造成数据库崩溃,因此,避免过多的数据库连接。


    3、HBase中的TablelnputFormat能够让MapReduce程序訪问HBase表里的数据。

    实例单输入路径

    [root@master liguodong]# hdfs dfs -cat /input.txt
    hello you
    hello everybody
    hello hadoop
    [root@master liguodong]# hdfs dfs -text /tmp.seq
    15/06/10 21:17:11 INFO bzip2.Bzip2Factory: Successfully loaded & initialized native-bzip2 library system-native
    15/06/10 21:17:11 INFO compress.CodecPool: Got brand-new decompressor [.bz2]
    100     apache software
    99      chinese good
    98      james NBA
    97      index pass
    96      apache software
    95      chinese good
    94      james NBA
    93      index pass
    ......
    package mrinputformat;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class TestInputFormat {
    
        public static class TokenizerMapper
           extends Mapper<IntWritable, Text, Text, IntWritable>{
    
            private final static IntWritable one = new IntWritable(1);//1
            private Text word = new Text();
    
            public void map(IntWritable key, Text value, Context context
                            ) throws IOException, InterruptedException 
            {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    //k v
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer
           extends Reducer<Text,IntWritable,Text,IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                           Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) 
                {
                    sum += val.get();
                }
                result.set(sum); 
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {
            //1、配置  
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "word count"); 
    
            //2、打包运行必须运行的方法
            job.setJarByClass(TestInputFormat.class);
    
            //3、输入路径  
            //hdfs://master:8020/tmp.seq
            //hdfs://master:8020/output
            FileInputFormat.addInputPath(job, new Path(args[0]));  
            //默认是TextInputFormat
            job.setInputFormatClass(SequenceFileInputFormat.class);
    
            //4、Map
            job.setMapperClass(TokenizerMapper.class);
    
            //5、Combiner
            job.setCombinerClass(IntSumReducer.class);
    
            //6、Reducer
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
    
            //7、 输出路径
            FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    
            //8、提交作业
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    运行结果:

    多输入路径方式

    package mrinputformat;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class TestInputFormat {
    
        //採用TextInputFormat
        public static class Mapper1
           extends Mapper<LongWritable, Text, Text, IntWritable>{
    
            private final static IntWritable one = new IntWritable(1);//1
            private Text word = new Text();
    
            public void map(LongWritable key, Text value, Context context
                            ) throws IOException, InterruptedException 
            {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    //k v
                    context.write(word, one);
                }
            }
        }
    
        //SequenceFileInputFormat
        public static class Mapper2
           extends Mapper<IntWritable, Text, Text, IntWritable>{
    
            private final static IntWritable one = new IntWritable(1);//1
            private Text word = new Text();
    
            public void map(IntWritable key, Text value, Context context
                            ) throws IOException, InterruptedException 
            {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    //k v
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer
           extends Reducer<Text,IntWritable,Text,IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                           Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) 
                {
                    sum += val.get();
                }
                result.set(sum); 
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {
            //1、配置  
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "word count"); 
    
            //2、打包运行必须运行的方法
            job.setJarByClass(TestInputFormat.class);
    
            //3、输入路径  
            //hdfs://master:8020/tmp.seq
            //hdfs://master:8020/output
            //单个输入路径
            //FileInputFormat.addInputPath(job, new Path(args[0]));  
            //默认是TextInputFormat
            //job.setInputFormatClass(SequenceFileInputFormat.class);
            //4、Map
            //job.setMapperClass(TokenizerMapper.class);
    
            //多个输入路径
            Path path1 = new Path("hdfs://master:8020/input.txt");
            Path path2 = new Path("hdfs://master:8020/tmp.seq");
            MultipleInputs.addInputPath(job, path1, TextInputFormat.class,Mapper1.class);
            MultipleInputs.addInputPath(job, path2, SequenceFileInputFormat.class,Mapper2.class);
    
            //5、Combiner
            job.setCombinerClass(IntSumReducer.class);
    
            //6、Reducer
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //7、 输出路径
            FileOutputFormat.setOutputPath(job, new Path("hdfs://master:8020/output"));
    
            //8、提交作业
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    运行结果:

    输出格式

    文本输出

    TextOutputFormat

    默认的输出格式。key是LongWritable,value是Text类型, key和value中间值用tab隔开的。

    二进制输出

    SequenceFileOutputFormat
    将key和value以sequencefile格式输出。

    SequenceFileAsBinaryOutputFormat
    将key和value以原始二进制的格式输出。

    MapFileOutputFormat
    将key和value写入MapFile中。因为MapFile中的key是有序的,所以写入的时候必须保证记录是按key值顺序写入的。

    多文件输出

    MultipleOutputFormat
    MultipleOutputs

    默认情况下一个reducer会产生一个输出,可是有些时候我们想一个reducer产生多个输出。 MultipleOutputFormat和MultipleOutputs能够实现这个功能。
    差别:MultipleOutputs能够产生不同类型的输出。

    数据库格式输出

    DBOutputFormat

  • 相关阅读:
    JAVA项目实战-设计模式-工厂模式的项目使用
    JAVA项目实战-设计模式——单例模式项目中运用
    JAVA项目实战-实现生成固定格式PDF文件和打包成zip压缩包并在浏览器中输出
    JAVA项目实战-阿里巴巴easyexcel导出导入工具使用
    JAVA项目实战-文件的上传和下载功能
    JAVA项目实战-生成二维码和验证码功能
    JAVA项目实战-高德地图API实现定位导航功能
    JAVA项目实战 -微信支付开发
    JAVA调用腾讯云API-实现语音合成(TTS)(三)
    MySQL高可用架构故障自动转移插件MHA
  • 原文地址:https://www.cnblogs.com/cxchanpin/p/7190503.html
Copyright © 2011-2022 走看看