zoukankan      html  css  js  c++  java
  • 关于MapReduce中自定义分组类(三)

    Job类
    1.  /**
    2.    * Define the comparator that controls which keys are grouped together
    3.    * for a single call to
    4.    * {@link Reducer#reduce(Object, Iterable,
    5.    *                       org.apache.hadoop.mapreduce.Reducer.Context)}
    6.    * @param cls the raw comparator to use
    7.    * @throws IllegalStateException if the job is submitted
    8.    * @see #setCombinerKeyGroupingComparatorClass(Class)
    9.    */
    10.   publicvoid setGroupingComparatorClass(Class<? extends RawComparator> cls
    11.                                          ) throws IllegalStateException{
    12.     ensureState(JobState.DEFINE);
    13.     conf.setOutputValueGroupingComparator(cls);
    14.   }
     
    JobConf类
    在JobConf类中的setOutputValueGroupingComparator方法:
    1.  /**
    2.    * Set the user defined {@link RawComparator} comparator for
    3.    * grouping keys in the input to the reduce.
    4.    *
    5.    * <p>This comparator should be provided if the equivalence rules for keys
    6.    * for sorting the intermediates are different from those for grouping keys
    7.    * before each call to
    8.    * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
    9.    * 
    10.    * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
    11.    * in a single call to the reduce function if K1 and K2 compare as equal.</p>
    12.    *
    13.    * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
    14.    * how keys are sorted, this can be used in conjunction to simulate
    15.    * <i>secondary sort on values</i>.</p>
    16.    * 
    17.    * <p><i>Note</i>: This is not a guarantee of the reduce sort being
    18.    * <i>stable</i> in any sense. (In any case, with the order of available
    19.    * map-outputs to the reduce being non-deterministic, it wouldn't make
    20.    * that much sense.)</p>
    21.    *
    22.    * @param theClass the comparator class to be used for grouping keys.
    23.    *                 It should implement <code>RawComparator</code>.
    24.    * @see #setOutputKeyComparatorClass(Class)
    25.    * @see #setCombinerKeyGroupingComparator(Class)
    26.    */
    27.   publicvoid setOutputValueGroupingComparator(
    28.       Class<? extends RawComparator> theClass){
    29.     setClass(JobContext.GROUP_COMPARATOR_CLASS,
    30.              theClass,RawComparator.class);
    31.   }
     
    ctrl+O
    找到getOutputValueGroupingComparator
    1. /**
    2.    * Get the user defined {@link WritableComparable} comparator for
    3.    * grouping keys of inputs to the reduce.
    4.    *
    5.    * @return comparator set by the user for grouping values.
    6.    * @see #setOutputValueGroupingComparator(Class) for details.
    7.    */
    8.   publicRawComparator getOutputValueGroupingComparator(){
    9.     Class<? extends RawComparator> theClass = getClass(
    10.       JobContext.GROUP_COMPARATOR_CLASS, null,RawComparator.class);
    11.     if(theClass == null){
    12.       return getOutputKeyComparator();
    13.     }
    14.     returnReflectionUtils.newInstance(theClass,this);
    15.   }
     
    那么谁调用了getOutputValueGroupingComparator方法
    ReduceTask类
    在ReduceTask类中:
    (这里没有定义属性comparator,因为直接作为返回值接受接好了啊)
    1.  RawComparator comparator = job.getOutputValueGroupingComparator();
    这里get到的comparator其实就是我们自定义的xxxG
    于是查找,哪里用到了comparator
    1. if(useNewApi){
    2.       runNewReducer(job, umbilical, reporter, rIter, comparator,
    3.                     keyClass, valueClass);
    4.     }else{
    5.       runOldReducer(job, umbilical, reporter, rIter, comparator,
    6.                     keyClass, valueClass);
    7.     }
    因为有新旧API之分啊
     
    所以找到该runNewReducer方法:
    1. private<INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewReducer(JobConf job,
    2.                      final TaskUmbilicalProtocol umbilical,
    3.                      final TaskReporter reporter,
    4.                      RawKeyValueIterator rIter,
    5.                      RawComparator<INKEY> comparator,
    6.                      Class<INKEY> keyClass,
    7.                      Class<INVALUE> valueClass
    8.                      ) throws IOException,InterruptedException,
    9.                               ClassNotFoundException{
    10.     // wrap value iterator to report progress.
    11.     final RawKeyValueIterator rawIter = rIter;
    12.     rIter =newRawKeyValueIterator(){
    13.       publicvoid close() throws IOException{
    14.         rawIter.close();
    15.       }
    16.       publicDataInputBuffer getKey() throws IOException{
    17.         return rawIter.getKey();
    18.       }
    19.       publicProgress getProgress(){
    20.         return rawIter.getProgress();
    21.       }
    22.       publicDataInputBuffer getValue() throws IOException{
    23.         return rawIter.getValue();
    24.       }
    25.       public boolean next() throws IOException{
    26.         boolean ret = rawIter.next();
    27.         reporter.setProgress(rawIter.getProgress().getProgress());
    28.         return ret;
    29.       }
    30.     };
    31.     // make a task context so we can get the classes
    32.     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
    33.       new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
    34.           getTaskID(), reporter);
    35.     // make a reducer
    36.     org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
    37.       (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
    38.         ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
    39.     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
    40.       newNewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
    41.     job.setBoolean("mapred.skip.on", isSkipping());
    42.     job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    43.     org.apache.hadoop.mapreduce.Reducer.Context
    44.          reducerContext = createReduceContext(reducer, job, getTaskID(),
    45.                                                rIter, reduceInputKeyCounter,
    46.                                                reduceInputValueCounter,
    47.                                                trackedRW,
    48.                                                committer,
    49.                                                reporter, comparator, keyClass,
    50.                                                valueClass);
    51.     try{
    52.       reducer.run(reducerContext);
    53.     } finally {
    54.       trackedRW.close(reducerContext);
    55.     }
    56.   }
    runNewReducer方法接收该comparator参数后传递给了createReduceContext方法
     
    Task类
    在Task里面的createReduceContext方法:
    1.  @SuppressWarnings("unchecked")
    2.   protectedstatic<INKEY,INVALUE,OUTKEY,OUTVALUE>
    3.   org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
    4.   createReduceContext(org.apache.hadoop.mapreduce.Reducer
    5.                         <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
    6.                       Configuration job,
    7.                       org.apache.hadoop.mapreduce.TaskAttemptID taskId,
    8.                       RawKeyValueIterator rIter,
    9.                       org.apache.hadoop.mapreduce.Counter inputKeyCounter,
    10.                       org.apache.hadoop.mapreduce.Counter inputValueCounter,
    11.                       org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
    12.                       org.apache.hadoop.mapreduce.OutputCommitter committer,
    13.                       org.apache.hadoop.mapreduce.StatusReporter reporter,
    14.                       RawComparator<INKEY> comparator,
    15.                       Class<INKEY> keyClass,Class<INVALUE> valueClass
    16.   ) throws IOException,InterruptedException{
    17.     org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
    18.     reduceContext =
    19.       newReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId,
    20.                                                               rIter,
    21.                                                               inputKeyCounter,
    22.                                                               inputValueCounter,
    23.                                                               output,
    24.                                                               committer,
    25.                                                               reporter,
    26.                                                               comparator,
    27.                                                               keyClass,
    28.                                                               valueClass);
     
    ReduceContextImpl类
    找到ReduceContextImpl中找到:
    1.  publicReduceContextImpl(Configuration conf,TaskAttemptID taskid,
    2.                            RawKeyValueIterator input,
    3.                            Counter inputKeyCounter,
    4.                            Counter inputValueCounter,
    5.                            RecordWriter<KEYOUT,VALUEOUT> output,
    6.                            OutputCommitter committer,
    7.                            StatusReporter reporter,
    8.                            RawComparator<KEYIN> comparator,
    9.                            Class<KEYIN> keyClass,
    10.                            Class<VALUEIN> valueClass
    11.                           ) throws InterruptedException,IOException{
    12.     super(conf, taskid, output, committer, reporter);
    13.     this.input = input;
    14.     this.inputKeyCounter = inputKeyCounter;
    15.     this.inputValueCounter = inputValueCounter;
    16.     this.comparator = comparator;
    17.     this.serializationFactory =newSerializationFactory(conf);
    18.     this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
    19.     this.keyDeserializer.open(buffer);
    20.     this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
    21.     this.valueDeserializer.open(buffer);
    22.     hasMore = input.next();
    23.     this.keyClass = keyClass;
    24.     this.valueClass = valueClass;
    25.     this.conf = conf;
    26.     this.taskid = taskid;
    27.   }
     
    ReduceContextImpl类内查找comparator
    1. /**
    2.    * Advance to the next key/value pair.
    3.    */
    4.   @Override
    5.   public boolean nextKeyValue() throws IOException,InterruptedException{
    6.     if(!hasMore){
    7.       key = null;
    8.       value = null;
    9.       returnfalse;
    10.     }
    11.     firstValue =!nextKeyIsSame;
    12.     DataInputBuffer nextKey = input.getKey();
    13.     currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
    14.                       nextKey.getLength()- nextKey.getPosition());
    15.     buffer.reset(currentRawKey.getBytes(),0, currentRawKey.getLength());
    16.     key = keyDeserializer.deserialize(key);
    17.     DataInputBuffer nextVal = input.getValue();
    18.     buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
    19.         - nextVal.getPosition());
    20.     value = valueDeserializer.deserialize(value);
    21.  
    22.     currentKeyLength = nextKey.getLength()- nextKey.getPosition();
    23.     currentValueLength = nextVal.getLength()- nextVal.getPosition();
    24.  
    25.     if(isMarked){
    26.       backupStore.write(nextKey, nextVal);
    27.     }
    28.  
    29.     hasMore = input.next();
    30.    if(hasMore){
    31.       nextKey = input.getKey();
    32.       nextKeyIsSame = comparator.compare(currentRawKey.getBytes(),0,
    33.                                      currentRawKey.getLength(),
    34.                                      nextKey.getData(),
    35.                                      nextKey.getPosition(),
    36.                                      nextKey.getLength()- nextKey.getPosition()
    37.                                          )==0;
    38.     }else{
    39.       nextKeyIsSame =false;
    40.     }
    41.     inputValueCounter.increment(1);
    42.     returntrue;
    43.   }
    这个compare方法,调用的是接口RawComparator中的
    public int compare(byte[] b1int s1int l1byte[] b2int s2int l2); 
    而一般如Text、IntWritable这些都实现了该方法
     
    (一)未设置
    1. if(theClass == null){
    2.       return getOutputKeyComparator();
    3.     }
    1. /**
    2.    * Get the {@link RawComparator} comparator used to compare keys.
    3.    *
    4.    * @return the {@link RawComparator} comparator used to compare keys.
    5.    */
    6.   publicRawComparator getOutputKeyComparator(){
    7.     Class<? extends RawComparator> theClass = getClass(
    8.       JobContext.KEY_COMPARATOR, null,RawComparator.class);
    9.     if(theClass != null)
    10.       returnReflectionUtils.newInstance(theClass,this);
    11.     returnWritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class),this);
    12.   }
     
    没有job.setGroupingComparatorClass(xxxG.class);的时候,即使用默认的,调用Map输出的时候的key所属的类中的comparae,比如Text中的
    原来默认情况下,调用的是比较器啊(更准确说是那个比较方法)
    (这里比较器又分两种:
               1    key的类类型中的compareTo方法
               2    自定义比较器类中的compare方法
    无论我们使用1还是2哪种方式,显然,分组和比较要么都用1 ,要么都用2,这样都是同一套规则,显然也不怎么合适。
    所以我们一般是在自定义比较器类的同时又自定义分组类
     
    (二)设置了
     
    1.     returnReflectionUtils.newInstance(theClass,this);
    如果我们job.setGroupingComparatorClass(xxxG.class),则是创建我们自定义的这个分组类的这个xxxG
    这个xxxG得继承WritableComparator类,复写compare方法
    如:
    public static class SelfGroupComparator extends WritableComparator{
    复写compare方法即可
    这样,调用逻辑和compare的一样。
     
     
    我更推荐方法2
     
     
    alt+左箭头,返回上一次查看源码的地方





  • 相关阅读:
    Java实现第八届蓝桥杯9算数式
    Java实现第八届蓝桥杯9算数式
    java实现第七届蓝桥杯寒假作业
    java实现第七届蓝桥杯寒假作业
    java实现第六届蓝桥杯隔行变色
    java实现第六届蓝桥杯无穷分数
    mysql-5.7.19-winx64服务无法启动解决方案
    MySQL集群搭建详解
    Windows下多个Mysql实例配置主从
    在一台机子上,安装,运行两mysql数据库实例
  • 原文地址:https://www.cnblogs.com/xuanlvshu/p/5748428.html
Copyright © 2011-2022 走看看