zoukankan      html  css  js  c++  java
  • 关于MapReduce中自定义Combine类(一)

    MRJobConfig
         public static fina COMBINE_CLASS_ATTR
         属性COMBINE_CLASS_ATTR = "mapreduce.job.combine.class"
         ————子接口(F4) JobContent
               方法getCombinerClass
                 ————子实现类 JobContextImpl
                     实现getCombinerClass方法:
                     public Class<? extends Reducer<?,?,?,?>> getCombinerClass()
                              throws ClassNotFoundException {
                          return (Class<? extends Reducer<?,?,?,?>>)
                            conf.getClass(COMBINE_CLASS_ATTR, null);
                     }
                     因为JobContextImpl是MRJobConfig子类
                     所以得到了父类MRJobConfig的COMBINE_CLASS_ATTR属性
                     ————子类Job
                         public void setCombinerClass(Class<? extends Reducer> cls
                                   ) throws IllegalStateException {
                         ensureState(JobState.DEFINE);
                         conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
                         }
                    因为JobContextImpl是MRJobConfig子类,
                    而Job是JobContextImpl的子类
                    所以也有COMBINE_CLASS_ATTR属性
                    通过setCombinerClass设置了父类MRJobConfig的属性
     
     
    MRJobConfig
        ————子接口JobContent
            方法getCombinerClass
            ————子实现类 JobContextImpl
                ————子类 Job
            ————子实现类 TaskAttemptContext
                继承了方法getCombinerClass
     
    Task   
       $CombinerRunner(Task的内部类)   
                该内部类有方法create:
                public static <K,V> CombinerRunner<K,V> create(JobConf job,
                                   TaskAttemptID taskId,
                                   Counters.Counter inputCounter,
                                   TaskReporter reporter,
                                   org.apache.hadoop.mapreduce.OutputCommitter committer
                                  ) throws ClassNotFoundException
                {
                      Class<? extends Reducer<K,V,K,V>> cls =
                        (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
                      if (cls != null) {
                        return new OldCombinerRunner(cls, job, inputCounter, reporter);
                      }
                      // make a task context so we can get the classes
                      org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
                        new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
                            reporter);
                      Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =
                        (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
                           taskContext.getCombinerClass();
                      if (newcls != null) {
                        return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,
                                                          inputCounter, reporter, committer);
                      }
                      return null;
                }
                      其中这一段应该是旧的API
                      Class<? extends Reducer<K,V,K,V>> cls =
                              (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
                      if (cls != null) {
                              return new OldCombinerRunner(cls, job, inputCounter, reporter);
                      }
                      而这个是新的API
                      org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
                        new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
                            reporter);
                      Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =
                        (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
                           taskContext.getCombinerClass();
                      if (newcls != null) {
                        return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,
                                                          inputCounter, reporter, committer);
                      }
                      return null;
                      (不知道为什么要写全名,去掉那些包名、向上/下转型和各种泛型的话,看起来就会清晰很多?)
                      而TaskAttemptContext是JobContent的子实现类,所以继承了getCombinerClass方法
                      而且,这里用的是多态,其调用的是子实现类TaskAttemptContextImpl的getCombinerClass方法
                      (TaskAttemptContextImpl继承了JobContextImpl,而JobContextImpl实现了该方法)
                      所以最终get到了属性COMBINE_CLASS_ATTR,即得到了我们通过job.setCombinerClass的xxxC
                        而这个xxxC是给了newcls,而newcls是给了NewCombinerRunner的构造函数的reducerClassc参数
                          NewCombinerRunner(Class reducerClass,
                              JobConf job,
                              org.apache.hadoop.mapreduce.TaskAttemptID taskId,
                              org.apache.hadoop.mapreduce.TaskAttemptContext context,
                              Counters.Counter inputCounter,
                              TaskReporter reporter,
                              org.apache.hadoop.mapreduce.OutputCommitter committer)
                          {
                              super(inputCounter, job, reporter);
                              this.reducerClass = reducerClass;
                              this.taskId = taskId;
                              keyClass = (Class<K>) context.getMapOutputKeyClass();
                              valueClass = (Class<V>) context.getMapOutputValueClass();
                              comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();
                              this.committer = committer;
                          }
    Task          
      MapTask
            $MapOutputBuffer
                private CombinerRunner<K,V> combinerRunner;
                $SpillThread类($表示内部类)
                    combinerRunner = CombinerRunner.create(job, getTaskID(),
                                                 combineInputCounter,
                                                 reporter, null);
                    //此时,我们得到了设置好的合并类                            
                    if (combinerRunner == null) {
                          // spill directly
                          DataInputBuffer key = new DataInputBuffer();
                          while (spindex < mend &&
                              kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
                            final int kvoff = offsetFor(spindex % maxRec);
                            int keystart = kvmeta.get(kvoff + KEYSTART);
                            int valstart = kvmeta.get(kvoff + VALSTART);
                            key.reset(kvbuffer, keystart, valstart - keystart);
                            getVBytesForOffset(kvoff, value);
                            writer.append(key, value);
                            ++spindex;
                          }
                    } else {
                          int spstart = spindex;
                          while (spindex < mend &&
                              kvmeta.get(offsetFor(spindex % maxRec)
                                        + PARTITION) == i) {
                            ++spindex;
                          }
                          // Note: we would like to avoid the combiner if we've fewer
                          // than some threshold of records for a partition
                          if (spstart != spindex) {
                            combineCollector.setWriter(writer);
                            RawKeyValueIterator kvIter =
                              new MRResultIterator(spstart, spindex);
                            combinerRunner.combine(kvIter, combineCollector);
                          }
                    }
                
                再查看combine函数
                在Task的内部类NewCombinerRunner下
                public void combine(RawKeyValueIterator iterator,
                                    OutputCollector<K,V> collector)
                    throws IOException, InterruptedException,ClassNotFoundException
                {
                  // make a reducer
                  org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer =
                    (org.apache.hadoop.mapreduce.Reducer<K,V,K,V>)
                      ReflectionUtils.newInstance(reducerClass, job);
                  org.apache.hadoop.mapreduce.Reducer.Context
                       reducerContext = createReduceContext(reducer, job, taskId,
                                                            iterator, null, inputCounter,
                                                            new OutputConverter(collector),
                                                            committer,
                                                            reporter, comparator, keyClass,
                                                            valueClass);
                  reducer.run(reducerContext);
                }
                上面的reducerClass就是我们传入的xxxC
                最终是通过反射创建了一个xxxC对象,并将其强制向上转型为Reducer实例对象,
                然后调用了向上转型后对象的run方法(当前的xxxC没有run方法,调用的是父类Reduce的run)
                在类Reducer中,run方法如下
                /**
               * Advanced application writers can use the
               * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
               * control how the reduce task works.
               */
              public void run(Context context) throws IOException, InterruptedException {
                setup(context);
                try {
                  while (context.nextKey()) {
                    reduce(context.getCurrentKey(), context.getValues(), context);
                    // If a back up store is used, reset it
                    Iterator<VALUEIN> iter = context.getValues().iterator();
                    if(iter instanceof ReduceContext.ValueIterator) {
                      ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();       
                    }
                  }
                } finally {
                  cleanup(context);
                }
              }
              有由于多态,此时调用的reduce是子类xxxC中的reduce方法
             (多态态性质:子类复写了该方法,则实际上执行的是子类中的该方法)
              所以说,我们自定义combine用的类的时候,应该继承Reducer类,并且复写reduce方法
              且其输入形式:(以wordcount为例)
           reduce(Text key, Iterable<IntWritable> values, Context context)
           其中key是单词个数,而values是个数列表,也就是value1、value2........
           注意,此时已经是列表,即<键,list<值1、值2、值3.....>>
           (之所以得到这个结论,是因为我当时使用的combine类是WCReduce,
            即Reduce和combine所用的类是一样的,通过对代码的分析,传入值的结构如果是<lkey,value>的话,是不可能做到combine的啊——即所谓的对相同值合并,求计数的累积和,这根本就是两个步骤,对key相同的键值对在map端就进行了一次合并了,合并成了<key,value list>,然后才轮到combine接受直接换个形式的输入,并处理——我们的处理是求和,然后再输出到context,进入reduce端的shuffle过程。
            然后我在reduce中遍历了用syso输出
            结果发现是0,而这实际上是因为经过一次遍历,我的指针指向的位置就不对了啊,
            )
    嗯,自己反复使用以下的代码,不断的组合、注释,去测试吧~就会得出这样的结论了
    1. /reduce
    2.     publicstaticclassWCReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
    3.         private final IntWritableValueOut=newIntWritable();
    4.         @Override
    5.         protectedvoid reduce(Text key,Iterable<IntWritable> values,
    6.                 Context context)  throws IOException,InterruptedException{
    7.             for(IntWritable value : values){
    8.                 System.out.println(value.get()+"--");
    9.             }
    10.  
    11. //            int total = 0 ;
    12. //            for (IntWritable value : values) {
    13. //                total += value.get();
    14. //            }
    15. //            ValueOut.set(total);
    16. //            context.write(key, ValueOut);
    17.         }
    18.  
    19.     }
    20.           
    21. job.setCombinerClass(WCReduce.class);
     
     





    附件列表

  • 相关阅读:
    Diophantus of Alexandria(唯一分解定理)
    K
    HDU-多校2-Everything Is Generated In Equal Probability(公式+逆元)
    IOS学习经验总结
    ios编译出错:UIButton.h' has been modified since the precompiled header UIKit.pcm' was built
    ios学习--详解IPhone动画效果类型及实现方法
    iOS常用动画 类封装
    iOS学习之自定义弹出UIPickerView或UIDatePicker(动画效果)
    iOS学习之Xcode 的Debug技巧
    iOS学习之Map,定位,标记位置的使用
  • 原文地址:https://www.cnblogs.com/xuanlvshu/p/5744445.html
Copyright © 2011-2022 走看看