zoukankan      html  css  js  c++  java
  • Mapreduce的Mapper和Reducer类函数(2)

     一、Mapper类

    /*
    ## Mapper基类的变量泛型
    ## 这里的K1为偏移量,所以为整型,在java中为Long,但是Mapreduce自己定义了一种泛型,效率会更高,即LongWritable。
    ## 这里的V1为String, Mapreduce对应的为Text。
    ## K2为Text
    ## V2为LongWritable
    */
    
    public class WordCount extends Mapper<LongWritable, Text, Text, LongWritable>{
    
        protected void map (LongWritable key, Text value, Context context) throws IOException, InterrupedException{
            // 1:对每一行数据进行字符串拆分
        String line = value.toString();
        String[] split = line.split(",");
        for (String word:split) { // word

    Mapper类,Reducer类中的函数(转载)

    一、Mapper类

    1、4个函数的解析

     Mapper有setup(),map(),cleanup()和run()四个方法。其中setup方法和cleanup方法默认是不做任何操作,且它们只被执行一次。其中setup()一般是用来进行一些map()前的准备工作,map()则一般承担主要的处理工作,cleanup()则是收尾工作如关闭文件或者执行map()后的K-V分发等。run()方法提供了setup->map->cleanup()的执行模板。

        在MapReduce中,Mapper从一个输入分片中读取数据,然后经过Shuffle and Sort阶段,分发数据给Reducer,在Map端和Reduce端我们可能使用设置的Combiner进行合并,这在Reduce前进行。Partitioner控制每个K-V对应该被分发到哪个reducer[我们的Job可能有多个reducer],Hadoop默认使用HashPartitioner,HashPartitioner使用key的hashCode对reducer的数量取模得来。

    protected void setup(Mapper.Context context) throws IOException,InterruptedException //Called once at the beginning of the task
    protected void cleanup(Mapper.Context context)throws IOException,InterruptedException //Called once at the end of the task. 
    protected void map(KEYIN key, VALUEIN value Mapper.Context context)throws IOException,InterruptedException
    {
       context.write((KEYOUT) key,(VALUEOUT) value);
    }
    
    //Called once for each key/value pair in the input split. Most applications should override this, but the default is the identity function. 
    public void run(Mapper.Context context)throws IOException,InterruptedException
    {
        setup(context);
        while(context.nextKeyValue())
        {
          map(context.getCurrentKey(),context.getCurrentValue(),context)
        }
        cleanup(context);
    }
    //Expert users can override this method for more complete control over the execution of the Mapper. 
    // 执行顺序:setup --->   map/run   ----> cleanup
    Mapper的三个子类,它们位于srcmapredorgapachehadoopmapreducelibmap中
    (详解http://blog.csdn.net/posa88/article/details/7901304)
    1、TokenCounterMapper
    2、InverseMapper3、MultithreadedMapper

    2、setPartitionerClass、setSortComparatorClass和setGroupingComparatorClass三者关系

    转自:https://blog.csdn.net/qq_30281559/article/details/88839403

    Map首先将输出写到环形缓存当中,开始spill过程:

    • job.setPartitionerClass(PartitionClass.class);

    【按key分区】map阶段最后调用。对key取hash值(或其它处理),指定进入哪一个reduce

    • job.setSortComparatorClass(SortComparator.class);

    【按key排序】每个分区内,对 键 或 键的部分 进行排序,保证分区内局部有序;

    • job.setGroupingComparatorClass(Grouptail.class);

    【按key分组】构造一个key对应的value迭代器。同一分区中满足同组条件(可以是不同的key)的进入同一个Interator,执行一次reduce方法;

    partiton是为了完成在shuffle阶段使用哪个reducetask。groupComparator是为了在一个reducetask下区分key的聚合。

    举个栗子,在map阶段输出结果为1,一,2,二。设置reducetask数量为2,名字为r1,r2。 此时想把1,一,交给r1处理。2,二,交给r2处理。就需要自定义partiton通过返回值来完成。 但是1,一虽然进入了r1。但是r1,并不认为两者是相同的,也就是在输出的结果上并没有放在一行,而是两行。 如果想让输出结果放到一行,就需要重新定义groupComparator组件。

    ps:一个reducetask会输出一个文件。一个reduce阶段的key对应文件中的一行 

     

    • 进入同一个reduce的key是按照顺序排好的,该类使得:
    • 如果连续(注意,一定连续)的两条或多条记录满足同组(即compare方法返回0)的条件,
    • 即使key不相同,他们的value也会进入同一个values,执行一个reduce方法。
    • 相反,如果原来key相同,但是并不满足同组的条件,他们的value也不会进入一个values。
    • 最后返回的key是:满足这些条件的一组key中排在最后的那个。

    二、Reducer类

    转自:Mapreduce不设置reduce,只执行map的输出结果

    在写MR程序时候,有时我们不需要reduce,比如对原始数据做Format等,这样我们在MR程序中就不需要写reduce函数,同样在main函数配置中也不需要reduce相关的配置信息,在MR执行的过程中,会为MR生成一个系统自带的reduce,这个reduce是系统为了保持框架的完整性自动调用的reduce函数,但这个函数并不做shuffle和数据拖取,生成的结果文件就是map的输出文件,也就是说,有多少个map,那么输出的结果就有多少个文件。so,总结如下:

    1. MR可以没有reduce

    2. 如果没有reduce,那么系统也会自动生成一个reduce,但是这个reduce不做任何操作,也不做shuffle拖取数据

    3. 最终文件的数量就是map的数量,根据数据的输入量和块大小和切片最大最小值有关

    4. 最简便的方法就是直接将reduce的数量设置成0

    1、4个函数

    protected void setup(Mapper.Context context) throws IOException,InterruptedException //Called once at the beginning of the task
    protected void cleanup(Mapper.Context context)throws IOException,InterruptedException //Called once at the end of the task. 
    protected void reduce(KEYIN key, VALUEIN value Reducer.Context context)throws IOException,InterruptedException
    {
    
        for(VALUEIN value: values) {
          context.write((KEYOUT) key, (VALUEOUT) value);
        }
    }
    
    //This method is called once for each key. Most applications will define their reduce class by overriding this method. The default implementation is an identity function. 
    
    public void run(Reducer.Context context)throws IOException,InterruptedException
    {
           setup(context);
           while (context.nextKey()) {
          
           reduce(context.getCurrentKey(), context.getValues(), context);
          // If a back up store is used, reset it
          
    ((ReduceContext.ValueIterator)
              
    (context.getValues().iterator())).resetBackupStore();
        }
        
    cleanup(context);
      }
    }
    //Advanced application writers can use the run(org.apache.hadoop.mapreduce.Reducer.Context) method to control how the reduce task works
    // 执行顺序:setup --->   map/run   ----> cleanup

     

    代码疑点:

    1、Hadoop MultipleInputs.addInputPath 读取多个路径

    https://blog.csdn.net/t1dmzks/article/details/76473905

    MultipleInputs.addInputPath

    作用
    可以指定多个输入路径,每个路径都可以指定相应的map方法
    使用方法
    MultipleInputs.addInputPath
    (Job job, Path path, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass)

    举例

    使用wordcount来举例
    F:hadooptestwordcountinput1下有个word.txt,单词用空格分割

    aa bb cc
    
    dd ee ff
    
    aa  bb  ff


    F:hadooptestwordcountinput2下有个word.txt。单词用 ## 分割

    aa##bb##cc
    ee##gg##kk


    代码

    package com.myhadoop.multiple;
    
    import com.myhadoop.mapreduce.test.WordCount;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    /**
     * Created by kaishun on 2017/7/31.
     */
    public class TestMultipleInputs {
        public static class MapA extends Mapper<LongWritable, Text, Text, IntWritable>
        {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
            public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
            {
                String lines = value.toString();
                String strs[] = lines.split("\s+");
                for (int i = 0; i <strs.length ; i++) {
                    word.set(strs[i]);
                    context.write(word, one);
                }
            }
        }
    
        public static class MapB extends Mapper<LongWritable, Text, Text, IntWritable>
        {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
            public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
            {
                String lines = value.toString();
                String strs[] = lines.split("##");
                for (int i = 0; i <strs.length ; i++) {
                    word.set(strs[i]);
                    context.write(word, one);
                }
            }
        }
    
    
    
        public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
        {
            public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException
            {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                context.write(key, new IntWritable(sum));
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJobName("MultipleWordCount");
            job.setJarByClass(WordCount.class);
            //多个输入,分别对应不同的map
            MultipleInputs.addInputPath(job,new Path("F:\hadooptest\wordcount\input1"),TextInputFormat.class,WordCount.MapA.class);
            MultipleInputs.addInputPath(job,new Path("F:\hadooptest\wordcount\input2"),TextInputFormat.class,WordCount.MapB.class);
    
            job.setNumReduceTasks(1);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            //分到一个reduce
            job.setReducerClass(WordCount.Reduce.class);
    
            FileOutputFormat.setOutputPath(job, new Path(args[0]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    }

    输出

    aa  3
    bb  3
    cc  2
    dd  1
    ee  2
    ff  2
    gg  1
    kk  1

    2、hadoop中的job.setOutputKeyClass与job.setMapOutputKeyClass

    mr程序中一般都会有hadoop中的job.setOutputKeyClass(theClass)与job.setOutputValueClass(theClass),

    但是有的程序处理以上两个外还有job.setMapOutputKeyClass(theClass)与job.setMapOu

    tputValueClass(Text.class),一直没弄懂是怎么回事,网上查了下,原来当mapper与reducer

    的输出类型一致时可以用 job.setOutputKeyClass(theClass)与job.setOutputValueClass

    (theClass)这两个进行配置就行,但是当mapper用于reducer两个的输出类型不一致的时候就需

    要分别进行配置了。

    https://www.codelast.com/hadoop%E5%BC%80%E5%8F%91%E5%9F%BA%E7%A1%80%E7%9F%A5%E8%AF%86%E8%AE%B0%E5%BD%95/ 

    3、如果mapper的输出value为一个 ThriftWritable 类型的对象,在设置Hadoop job属性时,setMapOutputValueClass() 应该怎样写,

    假设你的mapper类是这样定义的:

    public class A extends Mapper<LongWritable, Text, Text, ThriftWritable<MyType>> {
      //TODO:
    }

    其中,MyType是一个实现了 org.apache.thrift.TBase 接口的类。那么,在设置Hadoop job的属性时,我们可以这样写:

    Job job = new Job(configuration, "My example job.");
    job.setMapOutputValueClass(ThriftWritable.class);

    这样写要注意,在reducer中取出同一个key的各value值时,需要用 setConverter() 方法来指定Thrift对象类型:

    protected void reduce(Text key, Iterable<ThriftWritable<MyType>> values, Context context)
        throws IOException, InterruptedException {
      for (ThriftWritable<MyType> value : values) {
        value.setConverter(MyType.class);    // must set the class
        MyType obj = value.get();
        //TODO:
      }
    }

    如果你不 setConverter() 的话,将抛出一个java.lang.IllegalStateException异常,提示你无法识别类型。

    4、要注意在reducer中对同一个key的多个value循环取值的方法

    假设在一个reduce()方法中对同一个key的多个value循环,做一些处理后取出想要的那个value,并输出:

    protected void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
      Text outputValue = null;
      for (Text value : values) {
        //TODO: some logic processing 
        outputValue = value;
      }
     
      if (outputValue != null) {
        context.write(key, outputValue);
      }
    }

    那么,输出的outputValue将永远是所有value里面,最后一个循环到的value,这是因为程序会复用value这个对象,当使用 outputValue = value 这种赋值方式时,outputValue得到的是value的引用,而value又被复用了,所以outputValue最后将被赋予最后一个value的值。这可能会导致你在“TODO”那里做的处理失效(例如取了一个含有最大数字值的value),所以,为了保持逻辑正确,可以把 outputValue = value 换成:

    outputValue = new Text(value);

    创建一个新的对象,这样就不会出现上面所说的问题了。

  • 相关阅读:
    Qt简介以及如何配置Qt使用VS2010进行开发
    QT里重定向另外一个控制台程序的输出
    windows下制作PHP扩展
    20款Notepad++插件下载和介绍
    音频编码协议介绍
    用 PHP 读取文件的正确方法
    QT进程间通信
    关于YUV色彩空间
    解析xml 四种
    System.getProperty
  • 原文地址:https://www.cnblogs.com/Lee-yl/p/10996374.html
Copyright © 2011-2022 走看看