zoukankan      html  css  js  c++  java
  • 关于MapReduce中自定义分区类(四)

    MapTask类

    在MapTask类中找到run函数
    1. if(useNewApi){
    2.       runNewMapper(job, splitMetaInfo, umbilical, reporter);
    3.     }
    再找到runNewMapper
    1. @SuppressWarnings("unchecked")
    2.   private<INKEY,INVALUE,OUTKEY,OUTVALUE>
    3.   void runNewMapper(final JobConf job,
    4.                     final TaskSplitIndex splitIndex,
    5.                     final TaskUmbilicalProtocol umbilical,
    6.                     TaskReporter reporter
    7.                     ) throws IOException,ClassNotFoundException,
    8.                              InterruptedException{
    9.     // make a task context so we can get the classes
    10.     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
    11.       new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
    12.                                                                   getTaskID(),
    13.                                                                   reporter);
    14.     // make a mapper
    15.     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
    16.       (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
    17.         ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    18.     // make the input format
    19.     org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
    20.       (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
    21.         ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    22.     // rebuild the input split
    23.     org.apache.hadoop.mapreduce.InputSplit split = null;
    24.     split = getSplitDetails(newPath(splitIndex.getSplitLocation()),
    25.         splitIndex.getStartOffset());
    26.     LOG.info("Processing split: "+ split);
    27.  
    28.     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
    29.       newNewTrackingRecordReader<INKEY,INVALUE>
    30.         (split, inputFormat, reporter, taskContext);
    31.  
    32.     job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    33.     org.apache.hadoop.mapreduce.RecordWriter output = null;
    34.  
    35.     // get an output object
    36.     if(job.getNumReduceTasks()==0){
    37.       output =  如果jreduce个数等于0.则执行该方法
    38.         newNewDirectOutputCollector(taskContext, job, umbilical, reporter);
    39.     }else{
    40.        如果reduce个数大于0.则执行该方法
    41.       output =newNewOutputCollector(taskContext, job, umbilical, reporter);
    42.     }
    43.  
    44.     org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
    45.     mapContext =
    46.       newMapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
    47.           input, output,
    48.           committer,
    49.           reporter, split);
    50.  
    51.     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
    52.         mapperContext =
    53.           newWrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
    54.               mapContext);
    55.  
    56.     try{
    57.       input.initialize(split, mapperContext);
    58.       mapper.run(mapperContext);
    59.       mapPhase.complete();
    60.       setPhase(TaskStatus.Phase.SORT);
    61.       statusUpdate(umbilical);
    62.       input.close();
    63.       input = null;
    64.       output.close(mapperContext);
    65.       output = null;
    66.     } finally {
    67.       closeQuietly(input);
    68.       closeQuietly(output, mapperContext);
    69.     }
    70.   }
    我们知道,分区是在map函数输出的时候做的 ,所以这里是get output object
    1. // get an output object
    2.     if(job.getNumReduceTasks()==0){
    3.  
    4.       output =  如果jreduce个数等于0.则执行该方法
    5.         newNewDirectOutputCollector(taskContext, job, umbilical, reporter);
    6.     }else{
    7.        如果reduce个数大于0.则执行该方法
    8.       output =newNewOutputCollector(taskContext, job, umbilical, reporter);
    9.     }
    如果没有reduce任务,则new NewDirectOutputCollector()
    (Collection过程我还没探索过呢)
    如果有NewOutputCollector任务,则运行new NewOutputCollector()
     
    内部类NewOutputCollector
    在内部类NewOutputCollector中找到该方法(构造方法)
    1. NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
    2.                        JobConf job,
    3.                        TaskUmbilicalProtocol umbilical,
    4.                        TaskReporter reporter
    5.                        ) throws IOException,ClassNotFoundException{
    6.       collector = createSortingCollector(job, reporter);
    7.  
    8.       partitions = jobContext.getNumReduceTasks();
    9.  
    10.       if(partitions >1){
    11.         partitioner =(org.apache.hadoop.mapreduce.Partitioner<K,V>)
    12.           ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
    13.       }else{
    14.         partitioner =new org.apache.hadoop.mapreduce.Partitioner<K,V>(){
    15.           @Override
    16.           publicint getPartition(K key, V value,int numPartitions){
    17.             return partitions -1;
    18.           }
    19.         };
    20.       }
    21.     }
    通过partitions = jobContext.getNumReduceTasks();语句获取到Reduce任务个数
    如果Reduce任务数小于等于1,则新建一个Partitioner对象的同时并复写getPartition方法,这个复写的方法直接统一返回-1,就都在一个分区了。
    如果Reduce任务数大于 ,则通过反射创建jobContext.getPartitionerClass()获取到的对象
    于是查看:
    jobContext接口
    jobContext接口中的
    1. /**
    2.    * Get the {@link Partitioner} class for the job.
    3.    *
    4.    * @return the {@link Partitioner} class for the job.
    5.    */
    6.   publicClass<? extends Partitioner<?,?>> getPartitionerClass()
    7.      throws ClassNotFoundException;
    我们还是看其实现类jobContextImpl吧
    jobContextImpl类
    注意是在mapreduce包下啊,不是mapred包下
    1. /**
    2.    * Get the {@link Partitioner} class for the job.
    3.    *
    4.    * @return the {@link Partitioner} class for the job.
    5.    */
    6.   @SuppressWarnings("unchecked")
    7.   publicClass<? extends Partitioner<?,?>> getPartitionerClass()
    8.      throws ClassNotFoundException{
    9.     return(Class<? extends Partitioner<?,?>>)
    10.       conf.getClass(PARTITIONER_CLASS_ATTR,HashPartitioner.class);
    11.   }
    conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
    的意思是,从PARTITIONER_CLASS_ATTR属性中取出值,作为类返回,如果不存在,则使用和默认值HashPartitioner.class
    也就是说,当Reduce个数大于1的时候,其默认调用的是HashPartitioner.class
    1. publicclassHashPartitioner<K, V>extendsPartitioner<K, V>{
    2. /** Use {@link Object#hashCode()} to partition. */
    3. publicint getPartition(K key, V value,
    4. int numReduceTasks){
    5. return(key.hashCode()&Integer.MAX_VALUE)% numReduceTasks;
    6. }
    7. }
    发现HashPartitioner调用的是getPartition方法,最终使用的是key对象中的hashcode方法
    而我们使用eclipse(Alt+Shift+ S  按下H)复写的hashcode是将两个属性(账户和金额都考虑进去了)
    嗯,果然自己修改自定义key类中的hashcode,测试了一下是可以的,只要hashcode是只根据我们的账户account进行生产
    1. @Override
    2.         publicint hashCode(){
    3.             final int prime =31;
    4.             int result =1;
    5.             result = prime * result +((account == null)?0: account.hashCode());
    6.      //     result = prime * result + ((amount == null) ? 0 : amount.hashCode());
    7.             return result;
    8.         }
     
    另一种更主流的方式:
    自定义的Partition类为什么要是Group的内部类呢?自己改为外部类自己测试下,发现完全可以
    具体的形式
    1. publicstaticclassKeyPartitioner extends  Partitioner<SelfKey,DoubleWritable>{
    2.  
    3.             @Override
    4.             publicint getPartition(SelfKey key,DoubleWritable value,int numPartitions){
    5.                 /**
    6.                  * 如何保证数据整体输出上的有序,需要我们自定义业务逻辑
    7.                  * 必须提示前知道num reduce task 个数?
    8.                  * w  单词字符[a-zA-Z_0-9]
    9.                  *  
    10.                  */
    11.                 String account =key.getAccount();
    12.                 //0xxaaabbb 0-9 
    13.                 //[0-2][3-6][7-9]
    14.                 if(account.matches("\w*[0-2]")){
    15.                     return0;
    16.                 }elseif(account.matches("\w*[3-6]")){
    17.                     return1;
    18.                 }elseif(account.matches("\w*[7-9]")){
    19.                     return2;
    20.                 }
    21.                 return0;
    22.  
    23.             }
    24.         }
    这是为了保证S1和S2都在分区1,而不会出现S1中的其中几个在分区1 ,另外几个在分区2
    因为我们此时的键——是账户+金额,所以可能明明都是账户S1的分区却不一样,最后导致排序混乱?
     
     
     





  • 相关阅读:
    02_离线计算系统_第2天(HDFS详解)
    01_离线计算系统_第1天(HADOOP快速入门)
    01_离线计算系统_第1天(HADOOP快速入门)
    第4天 java高级特性增强 ---有用 第一遍
    038_字符串的转义
    037_标准化日期代码
    036_js中的字符串比较大小
    035_jQaury中的each()循环
    034_json对象字符串长什么样子?
    033_SpringMVC返回String,view,Object的原理
  • 原文地址:https://www.cnblogs.com/xuanlvshu/p/5750405.html
Copyright © 2011-2022 走看看