zoukankan      html  css  js  c++  java
  • 【转】[Hadoop源码解读](二)MapReduce篇之Mapper类

    转自:http://www.cnblogs.com/lucius/p/3449912.html

    前面在讲InputFormat的时候,讲到了Mapper类是如何利用RecordReader来读取InputSplit中的K-V对的。

      这一篇里,开始对Mapper.class的子类进行解读。

      先回忆一下。Mapper有setup(),map(),cleanup()和run()四个方法。其中setup()一般是用来进行一些map()前的 准备工作,map()则一般承担主要的处理工作,cleanup()则是收尾工作如关闭文件或者执行map()后的K-V分发等。run()方法提供了 setup->map->cleanup()的执行模板。

      在MapReduce中,Mapper从一个输入分片中读取数据,然后经过Shuffle and Sort阶段,分发数据给Reducer,在Map端和Reduce端我们可能使用设置的Combiner进行合并,这在Reduce前进行。 Partitioner控制每个K-V对应该被分发到哪个reducer[我们的Job可能有多个reducer],Hadoop默认使用 HashPartitioner,HashPartitioner使用key的hashCode对reducer的数量取模得来。

    复制代码
    1   public void run(Context context) throws IOException, InterruptedException {
    2     setup(context);
    3     while (context.nextKeyValue()) {
    4       map(context.getCurrentKey(), context.getCurrentValue(), context);
    5     }
    6     cleanup(context);
    7   }
    复制代码

    从上面run方法可以看出,K/V对是从传入的Context获取的。我们也可以从下面的map方法看出,输出结果K/V对也是通过Context来完成的。至于Context暂且放着。

    1   @SuppressWarnings("unchecked")
    2   protected void map(KEYIN key, VALUEIN value, 
    3                      Context context) throws IOException, InterruptedException {
    4     context.write((KEYOUT) key, (VALUEOUT) value);
    5   } 

    我们先来看看三个Mapper的子类,它们位于srcmapredorgapachehadoopmapreducelibmap中。

      1、TokenCounterMapper

    复制代码
     1 public class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>{
     2     
     3   private final static IntWritable one = new IntWritable(1);
     4   private Text word = new Text();
     5   
     6   @Override
     7   public void map(Object key, Text value, Context context
     8                   ) throws IOException, InterruptedException {
     9     StringTokenizer itr = new StringTokenizer(value.toString());
    10     while (itr.hasMoreTokens()) {
    11       word.set(itr.nextToken());
    12       context.write(word, one);
    13     }
    14   }
    15 }
    复制代码

    我们看到,对于一个输入的K-V对,它使用StringTokenizer来获取value中的tokens,然后对每一个 token,分发出一个<token,one>对,这将在reduce端被收集,同一个token对应的K-V对都会被收集到同一个 reducer上,这样我们就可以计算出所有mapper分发出来的以某个token为key的<token,one>的数量,然后只要在 reduce函数中加起来,就得到了token的计数。这就是为什么这个类叫做TokenCounterMapper的原因。

       在MapReduce的“Hello world”:WordCount例子中,我们完全可以直接使用这个TokenCounterMapper作为MapperClass,仅需用 job.setMapperClass(TokenCounterMapper.class)进行设置即可。

      2、InverseMapper

    复制代码
     1   public class InverseMapper<K, V> extends Mapper<K,V,V,K> {
     2 
     3 
     4   /** The inverse function.  Input keys and values are swapped.*/
     5   @Override
     6   public void map(K key, V value, Context context
     7                   ) throws IOException, InterruptedException {
     8     context.write(value, key);
     9   }
    10   
    11 }
    复制代码

     这个类更加简单,它紧紧是调换Key和Value,然后直接分发出去。举个例子:数据格式是<某商家,某商品>,我们既 可能需要计算一个商家对应的所有商品种类,也可能需要计算某个商品的销售商家数量,后者的情形,就可以使用InverseMapper来达到目的,使得相 同商品被分发到相同reducer。

      3、MultithreadedMapper

      这个类稍微有点复杂,它是使用多线程来执行一个Mapper。我们可以从类图中看到,它有一个mapClass属性,这个属性指定另一个Mapper类 [暂称workMapper,由mapred.map.multithreadedrunner.class设置],实际干活的其实是这个Mapper类 而不是MultithreadedMapper。runnsers是运行的线程的列表。

      下面是MultithreadedMapper的run()方法,它重写了Mapper中的run()。

    复制代码
     1   public void run(Context context) throws IOException, InterruptedException {
     2     outer = context;
     3     int numberOfThreads = getNumberOfThreads(context);
     4     mapClass = getMapperClass(context);
     5     if (LOG.isDebugEnabled()) {
     6       LOG.debug("Configuring multithread runner to use " + numberOfThreads + 
     7                 " threads");
     8     }
     9     
    10     runners =  new ArrayList<MapRunner>(numberOfThreads);
    11     for(int i=0; i < numberOfThreads; ++i) {
    12       MapRunner thread = new MapRunner(context);
    13       thread.start();
    14       runners.add(i, thread);
    15     }
    16     for(int i=0; i < numberOfThreads; ++i) {
    17       MapRunner thread = runners.get(i);
    18       thread.join();
    19       Throwable th = thread.throwable;
    20       if (th != null) {
    21         if (th instanceof IOException) {
    22           throw (IOException) th;
    23         } else if (th instanceof InterruptedException) {
    24           throw (InterruptedException) th;
    25         } else {
    26           throw new RuntimeException(th);
    27         }
    28       }
    29     }
    30   }
    复制代码

    从上面的代码我们可以看到,首先它设置运行上下文context和workMapper,然后启动多个MapRunner子线程[由mapred.map.multithreadedrunner.threads设置],然后使用join()等待子线程都执行完毕。

      MapRunner继承了Thread,它包含了一个独享的Context:subcontext,以及用mapper指定了workMapper,然后throwable是在MultithreadMapper的run()中进行综合的异常处理的。

    复制代码
     1   private class MapRunner extends Thread {
     2     private Mapper<K1,V1,K2,V2> mapper;
     3     private Context subcontext;
     4     private Throwable throwable;
     5 
     6     MapRunner(Context context) throws IOException, InterruptedException {
     7       mapper = ReflectionUtils.newInstance(mapClass, 
     8                                            context.getConfiguration());
     9       subcontext = new Context(outer.getConfiguration(), 
    10                             outer.getTaskAttemptID(),
    11                             new SubMapRecordReader(),
    12                             new SubMapRecordWriter(), 
    13                             context.getOutputCommitter(),
    14                             new SubMapStatusReporter(),
    15                             outer.getInputSplit());
    16     }
    17 
    18     public Throwable getThrowable() {
    19       return throwable;
    20     }
    21 
    22     @Override
    23     public void run() {
    24       try {
    25         mapper.run(subcontext);
    26       } catch (Throwable ie) {
    27         throwable = ie;
    28       }
    29     }
    30   }
    复制代码

    在MapRunner的Constructor中我们看见,MapRunner所包含的subcontext中使用了独立的 RecordReader、RecordWriter和StatusReporter,它们分别是SubMapRecordReader、 SubMapRecordWriter和SubMapStatusReporter,我们就不分析了。值得注意的 是,SubMapRecordReader在读K-V对和SubMapRecordWriter在写K-V对的时候都要同步。这是通过互斥访问 MultithreadedMapper的上下文outer来实现的。

      MultithreadedMapper适用于CPU密集型的任务,采用多个线程处理后,一个线程可以在另外的线程在执行时读取数据并执行,这样就使用了 更多的CPU周期来执行任务,从而提高吞吐率。注意读写操作都是线程安全的,因此不难想象对于IO密集型的作业,采用 MultithreadedMapper会适得其反,因为会有多个线程等待IO,IO成为限制吞吐率的关键。对于IO密集型的任务,我们应该采用增多 task数量的方法来解决,因为这样在IO上就是并行的。

      除非map()的确是CPU密集型的,否则不推荐使用MultithreadedMapper,而建议采用更多的map task。

  • 相关阅读:
    js获取input file文件二进制码
    nginx新手入门
    代码神器Atom,最常用的几大插件,你值得拥有。
    css3 3d 与案例分析
    express搭建简易web的服务器
    hosts文件管理和nginx总结
    css3 3D
    问题大神
    面试题整理
    版本控制简介,git使用----使用GitHub托管代码
  • 原文地址:https://www.cnblogs.com/conie/p/3583586.html
Copyright © 2011-2022 走看看