zoukankan      html  css  js  c++  java
  • MapReduce类型与格式(输入与输出)

    一、输入格式

    (1)输入分片记录

    ①JobClient通过指定的输入文件的格式来生成数据分片InputSplit;

    ②一个分片不是数据本身,而是可分片数据的引用;

    ③InputFormat接口负责生成分片;

    源码位置:org.apache.hadoop.mapreduce.lib.input包(新)

              org.apache.hadoop.mapred.lib 包(旧)

    查看其中FileInputFormat类中的getSplits()方法;

    computeSplitSize()函数决定分片大小;

    各种输入类的结构关系图:

    MapReduce类型与格式

    (2)文件输入

    抽象类:FileInputFormat

    ①FileInputFormat是所有使用文件作为数据源的InputFormat实现的基类;

    ②FileInputFormat输入数据格式的分配大小由数据块大小决定;

    抽象类:CombineFileInputFormat

    ①可以使用CombineFileInputFormat来合并小文件;

    ②因为CombineFileInputFormat是一个抽象类,使用的时候需要创建一个CombineFileInputFormat的实体类,并且实现getRecordReader()的方法;

    ③避免文件分割的方法:

    A.数据块大小尽可能大,这样使文件的大小小于数据块的大小,就不用进行分片;

    B.继承FileInputFormat,并且重载isSplitable()方法;

    (3)文本输入

    类名:TextInputFormat

    ①TextInputFormat是默认的InputFormat,每一行数据就是一条记录;

    ②TextInputFormat的key是LongWritable类型的,存储该行在整个文件的偏移量,value是每行的数据内容,Text类型;

    ③输入分片与HDFS数据块关系:TextInputFormat每一条记录就是一行,很有可能某一行跨数据块存放;

    类名:KeyValueInputFormat类

    可以通过key为行号的方式来知道记录的行号,并且可以通过key.value.separator.in.input设置key与value的分割符;

    类名:NLineInputFormat类

    可以设置每个mapper处理的行数,可以通过mapred.line.input.format.lienspermap属性设置;

    (4)二进制输入

    类名:SequenceFileInputFormat

    SequenceFileAsTextInputFormat

    SequenceFileAsBinaryInputFormat

    由于SequenceFile能够支持Splittable,所以能够作为mapreduce输入文件的格式,能够很方便的得到已经含有,value>的分片;

    (5)多文件输入

    类名:MultipleInputs

    ①MultipleInputs能够提供多个输入数据类型;

    ②通过addInputPath()方法来设置多路径;

    (6)数据库格式输入

    类名:DBInputFormat

    ①DBInputFormat是一个使用JDBC并且从关系型数据库中读取数据的一种输入格式;

    ②避免过多的数据库连接;

    ③HBase中的TableInputFormat可以让MapReduce程序访问HBase表里的数据;

    实验部分:

    新建项目TestMRInputFormat,新建包com.mr,导入相关依赖包

    实验①,以SequenceFile作为输入,故预先运行SequenceFileWriter.java产生一个b.seq文件;

    新建类:TestInputFormat1.java(基于WordCount.java修改):

    package com.mr;

    import java.io.IOException;

    import java.util.StringTokenizer;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.IntWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.GenericOptionsParser;

    public class TestInputFormat {

      public static class TokenizerMapper

           extends Mapper< IntWritable, Text, Text, IntWritable>{

       

        private final static IntWritable one = new IntWritable(1);

        private Text word = new Text();

         

        public void map(IntWritable key, Text value, Context context

                        ) throws IOException, InterruptedException {

          StringTokenizer itr = new StringTokenizer(value.toString());

          while (itr.hasMoreTokens()) {

            word.set(itr.nextToken());

            context.write(word, one);

          }

        }

      }

      public static class IntSumReducer

           extends Reducer {

        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable values,

                           Context context

                           ) throws IOException, InterruptedException {

          int sum = 0;

          for (IntWritable val : values) {

            sum += val.get();

          }

          result.set(sum);

          context.write(key, result);

        }

      }

      public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (otherArgs.length != 2) {

          System.err.println("Usage: wordcount ");

          System.exit(2);

        }

        Job job = new Job(conf, "word count");

        job.setJarByClass(TestInputFormat.class);

        job.setMapperClass(TokenizerMapper.class);

        job.setCombinerClass(IntSumReducer.class);

        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

        job.setInputFormatClass(SequenceFileInputFormat.class);//输入格式的设定

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

      }

    }

    Eclipse中运行,参数配置如下图:

    MapReduce类型与格式

    输出统计结果如下:

    MapReduce类型与格式

    实验②,多种来源输入:

    TestInputFormat2.java:

    package com.mr;

    import java.io.IOException;

    import java.util.StringTokenizer;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.IntWritable;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.GenericOptionsParser;

    public class TestInputFormat2 {

      public static class Mapper1  //第一个mapper类

           extends Mapper<<font color="#ed1c24">LongWritable, Text, Text, IntWritable>{

        

        private final static IntWritable one = new IntWritable(1);

        private Text word = new Text();

          

        public void map(LongWritable key, Text value, Context context

                        ) throws IOException, InterruptedException {

          StringTokenizer itr = new StringTokenizer(value.toString());

          while (itr.hasMoreTokens()) {

            word.set(itr.nextToken());

            context.write(word, one);

          }

        }

      }

    public static class Mapper2 extends  //第二个mapper类

    Mapper {

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

    public void map(IntWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    StringTokenizer itr = new StringTokenizer(value.toString());

    while (itr.hasMoreTokens()) {

    word.set(itr.nextToken());

    context.write(word, one);

    }

    }

    }

      

      

      public static class IntSumReducer 

           extends Reducer {

        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable values, 

                           Context context

                           ) throws IOException, InterruptedException {

          int sum = 0;

          for (IntWritable val : values) {

            sum += val.get();

          }

          result.set(sum);

          context.write(key, result);

        }

      }

      public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, "word count");

        job.setJarByClass(TestInputFormat2.class);

        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        Path path1 = new Path("/a.txt");

        Path path2 = new Path("/b.seq");

       //多输入

        MultipleInputs.addInputPath(job, path1,TextInputFormat.class, Mapper1.class);

        MultipleInputs.addInputPath(job, path2,SequenceFileInputFormat.class, Mapper2.class);

        FileOutputFormat.setOutputPath(job, new Path("/output2"));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

      }

    }

    创建输入文本文件a.txt:

    aaa bbb

    ccc aaa

    ddd eee

    将项目打包为jar(不知道为什么eclipse中不能运行,还没找到原因,用jar命令可以运行):

    File->Export->Runnable JAR file,命名jar文件为testMR.jar。

    命令行中运行:

    $hadoop jar testMR.jar com.mr.TestInputFormat2 

    MapReduce类型与格式

    输出统计结果如下:

    MapReduce类型与格式

    二、输出格式

    各种类关系结构图:

    MapReduce类型与格式

     

     

    (1)文本输出

    类名:TextOutputFormat

    ①默认的输出方式,key是LongWritable类型的,value是Text类型的;

    ②以“key value”的方式输出行;

    (2)二进制输出

    类名:SequenceFileOutputFormat

    SequenceFileAsTextOutputFormat

    SequenceFileAsBinaryOutputFormat

    MapFileOutputFormat

    (3)多文件输出

    类名:MultipleOutputFormat

          MultipleOutputs

    区别:MultipleOutputs可以产生不同类型的输出;

    (4)数据库输出

    类名:DBOutputFormat

     http://blog.sina.com.cn/s/blog_4438ac090101qfuh.html

  • 相关阅读:
    CSS3自定义滚动条样式 -webkit-scrollbar
    仿flash的文字动画效果
    使用PowerDesigner导出MySQL数据库建模
    将博客搬至CSDN
    centos6.3安装MySQL 5.6(转)
    # mysql -u root -p -bash: mysql: command not found
    win8设置保护眼睛的颜色
    网关末尾要么是1要么是254
    虚机centos和本机Windows之间文件的拷贝无法用xftp时用FileZilla也行
    Java基础知识总结之基础数据类型
  • 原文地址:https://www.cnblogs.com/nucdy/p/5958514.html
Copyright © 2011-2022 走看看