zoukankan      html  css  js  c++  java
  • 【转】[Hadoop源码解读](二)MapReduce篇之Mapper类

    转自:http://www.cnblogs.com/lucius/p/3449912.html

    前面在讲InputFormat的时候,讲到了Mapper类是如何利用RecordReader来读取InputSplit中的K-V对的。

      这一篇里,开始对Mapper.class的子类进行解读。

      先回忆一下。Mapper有setup(),map(),cleanup()和run()四个方法。其中setup()一般是用来进行一些map()前的 准备工作,map()则一般承担主要的处理工作,cleanup()则是收尾工作如关闭文件或者执行map()后的K-V分发等。run()方法提供了 setup->map->cleanup()的执行模板。

      在MapReduce中,Mapper从一个输入分片中读取数据,然后经过Shuffle and Sort阶段,分发数据给Reducer,在Map端和Reduce端我们可能使用设置的Combiner进行合并,这在Reduce前进行。 Partitioner控制每个K-V对应该被分发到哪个reducer[我们的Job可能有多个reducer],Hadoop默认使用 HashPartitioner,HashPartitioner使用key的hashCode对reducer的数量取模得来。

    复制代码
    1   public void run(Context context) throws IOException, InterruptedException {
    2     setup(context);
    3     while (context.nextKeyValue()) {
    4       map(context.getCurrentKey(), context.getCurrentValue(), context);
    5     }
    6     cleanup(context);
    7   }
    复制代码

    从上面run方法可以看出,K/V对是从传入的Context获取的。我们也可以从下面的map方法看出,输出结果K/V对也是通过Context来完成的。至于Context暂且放着。

    1   @SuppressWarnings("unchecked")
    2   protected void map(KEYIN key, VALUEIN value, 
    3                      Context context) throws IOException, InterruptedException {
    4     context.write((KEYOUT) key, (VALUEOUT) value);
    5   } 

    我们先来看看三个Mapper的子类,它们位于srcmapredorgapachehadoopmapreducelibmap中。

      1、TokenCounterMapper

    复制代码
     1 public class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>{
     2     
     3   private final static IntWritable one = new IntWritable(1);
     4   private Text word = new Text();
     5   
     6   @Override
     7   public void map(Object key, Text value, Context context
     8                   ) throws IOException, InterruptedException {
     9     StringTokenizer itr = new StringTokenizer(value.toString());
    10     while (itr.hasMoreTokens()) {
    11       word.set(itr.nextToken());
    12       context.write(word, one);
    13     }
    14   }
    15 }
    复制代码

    我们看到,对于一个输入的K-V对,它使用StringTokenizer来获取value中的tokens,然后对每一个 token,分发出一个<token,one>对,这将在reduce端被收集,同一个token对应的K-V对都会被收集到同一个 reducer上,这样我们就可以计算出所有mapper分发出来的以某个token为key的<token,one>的数量,然后只要在 reduce函数中加起来,就得到了token的计数。这就是为什么这个类叫做TokenCounterMapper的原因。

       在MapReduce的“Hello world”:WordCount例子中,我们完全可以直接使用这个TokenCounterMapper作为MapperClass,仅需用 job.setMapperClass(TokenCounterMapper.class)进行设置即可。

      2、InverseMapper

    复制代码
     1   public class InverseMapper<K, V> extends Mapper<K,V,V,K> {
     2 
     3 
     4   /** The inverse function.  Input keys and values are swapped.*/
     5   @Override
     6   public void map(K key, V value, Context context
     7                   ) throws IOException, InterruptedException {
     8     context.write(value, key);
     9   }
    10   
    11 }
    复制代码

     这个类更加简单,它紧紧是调换Key和Value,然后直接分发出去。举个例子:数据格式是<某商家,某商品>,我们既 可能需要计算一个商家对应的所有商品种类,也可能需要计算某个商品的销售商家数量,后者的情形,就可以使用InverseMapper来达到目的,使得相 同商品被分发到相同reducer。

      3、MultithreadedMapper

      这个类稍微有点复杂,它是使用多线程来执行一个Mapper。我们可以从类图中看到,它有一个mapClass属性,这个属性指定另一个Mapper类 [暂称workMapper,由mapred.map.multithreadedrunner.class设置],实际干活的其实是这个Mapper类 而不是MultithreadedMapper。runnsers是运行的线程的列表。

      下面是MultithreadedMapper的run()方法,它重写了Mapper中的run()。

    复制代码
     1   public void run(Context context) throws IOException, InterruptedException {
     2     outer = context;
     3     int numberOfThreads = getNumberOfThreads(context);
     4     mapClass = getMapperClass(context);
     5     if (LOG.isDebugEnabled()) {
     6       LOG.debug("Configuring multithread runner to use " + numberOfThreads + 
     7                 " threads");
     8     }
     9     
    10     runners =  new ArrayList<MapRunner>(numberOfThreads);
    11     for(int i=0; i < numberOfThreads; ++i) {
    12       MapRunner thread = new MapRunner(context);
    13       thread.start();
    14       runners.add(i, thread);
    15     }
    16     for(int i=0; i < numberOfThreads; ++i) {
    17       MapRunner thread = runners.get(i);
    18       thread.join();
    19       Throwable th = thread.throwable;
    20       if (th != null) {
    21         if (th instanceof IOException) {
    22           throw (IOException) th;
    23         } else if (th instanceof InterruptedException) {
    24           throw (InterruptedException) th;
    25         } else {
    26           throw new RuntimeException(th);
    27         }
    28       }
    29     }
    30   }
    复制代码

    从上面的代码我们可以看到,首先它设置运行上下文context和workMapper,然后启动多个MapRunner子线程[由mapred.map.multithreadedrunner.threads设置],然后使用join()等待子线程都执行完毕。

      MapRunner继承了Thread,它包含了一个独享的Context:subcontext,以及用mapper指定了workMapper,然后throwable是在MultithreadMapper的run()中进行综合的异常处理的。

    复制代码
     1   private class MapRunner extends Thread {
     2     private Mapper<K1,V1,K2,V2> mapper;
     3     private Context subcontext;
     4     private Throwable throwable;
     5 
     6     MapRunner(Context context) throws IOException, InterruptedException {
     7       mapper = ReflectionUtils.newInstance(mapClass, 
     8                                            context.getConfiguration());
     9       subcontext = new Context(outer.getConfiguration(), 
    10                             outer.getTaskAttemptID(),
    11                             new SubMapRecordReader(),
    12                             new SubMapRecordWriter(), 
    13                             context.getOutputCommitter(),
    14                             new SubMapStatusReporter(),
    15                             outer.getInputSplit());
    16     }
    17 
    18     public Throwable getThrowable() {
    19       return throwable;
    20     }
    21 
    22     @Override
    23     public void run() {
    24       try {
    25         mapper.run(subcontext);
    26       } catch (Throwable ie) {
    27         throwable = ie;
    28       }
    29     }
    30   }
    复制代码

    在MapRunner的Constructor中我们看见,MapRunner所包含的subcontext中使用了独立的 RecordReader、RecordWriter和StatusReporter,它们分别是SubMapRecordReader、 SubMapRecordWriter和SubMapStatusReporter,我们就不分析了。值得注意的 是,SubMapRecordReader在读K-V对和SubMapRecordWriter在写K-V对的时候都要同步。这是通过互斥访问 MultithreadedMapper的上下文outer来实现的。

      MultithreadedMapper适用于CPU密集型的任务,采用多个线程处理后,一个线程可以在另外的线程在执行时读取数据并执行,这样就使用了 更多的CPU周期来执行任务,从而提高吞吐率。注意读写操作都是线程安全的,因此不难想象对于IO密集型的作业,采用 MultithreadedMapper会适得其反,因为会有多个线程等待IO,IO成为限制吞吐率的关键。对于IO密集型的任务,我们应该采用增多 task数量的方法来解决,因为这样在IO上就是并行的。

      除非map()的确是CPU密集型的,否则不推荐使用MultithreadedMapper,而建议采用更多的map task。

  • 相关阅读:
    Android 开发 深入理解Handler、Looper、Messagequeue 转载
    Android 开发 Handler的基本使用
    Java 学习 注解
    Android 开发 AlarmManager 定时器
    Android 开发 框架系列 百度语音合成
    Android 开发 框架系列 Google的ORM框架 Room
    Android 开发 VectorDrawable 矢量图 (三)矢量图动画
    Android 开发 VectorDrawable 矢量图 (二)了解矢量图属性与绘制
    Android 开发 VectorDrawable 矢量图 (一)了解Android矢量图与获取矢量图
    Android 开发 知晓各种id信息 获取线程ID、activityID、内核ID
  • 原文地址:https://www.cnblogs.com/conie/p/3583586.html
Copyright © 2011-2022 走看看