zoukankan      html  css  js  c++  java
  • Hadoop中Partition解析

    1.解析Partition

    Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出,下面我们就来分析参与这个过程的类。

    Mapper的结果,可能送到Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。Mapper最终处理的键值对,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer那。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。它只有一个方法,

     

    [java] view plaincopy
    1. getPartition(Text key, Text value, int numPartitions)  

     

    输入是Map的结果对和Reducer的数目,输出则是分配的Reducer(整数编号)。就是指定Mappr输出的键值对到哪一个reducer上去。系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。这样保证如果有相同的key值,肯定被分配到同一个reducre上。如果有N个reducer,编号就为0,1,2,3……(N-1)

    Reducer是所有用户定制Reducer类的基类,和Mapper类似,它也有setup,reduce,cleanup和run方法,其中setup和cleanup含义和Mapper相同,reduce是真正合并Mapper结果的地方,它的输入是key和这个key对应的所有value的一个迭代器,同时还包括Reducer的上下文。系统中定义了两个非常简单的Reducer,IntSumReducer和LongSumReducer,分别用于对整形/长整型的value求和。

    Reduce的结果,通过Reducer.Context的方法collect输出到文件中,和输入类似,Hadoop引入了OutputFormat。OutputFormat依赖两个辅助接口:RecordWriter和OutputCommitter,来处理输出。RecordWriter提供了write方法,用于输出和close方法,用于关闭对应的输出。OutputCommitter提供了一系列方法,用户通过实现这些方法,可以定制OutputFormat生存期某些阶段需要的特殊操作。我们在TaskInputOutputContext中讨论过这些方法(明显,TaskInputOutputContext是OutputFormat和Reducer间的桥梁)。OutputFormat和RecordWriter分别对应着InputFormat和RecordReader,系统提供了空输出NullOutputFormat(什么结果都不输出,NullOutputFormat.RecordWriter只是示例,系统中没有定义),LazyOutputFormat(没在类图中出现,不分析),FilterOutputFormat(不分析)和基于文件FileOutputFormat的SequenceFileOutputFormat和TextOutputFormat输出。

    基于文件的输出FileOutputFormat利用了一些配置项配合工作,包括:
    mapred.output.compress:是否压缩;
    mapred.output.compression.codec:压缩方法;
    mapred.output.dir:输出路径;
    mapred.work.output.dir:输出工作路径。
    FileOutputFormat还依赖于FileOutputCommitter,通过FileOutputCommitter提供一些和Job,Task相关的临时文件管理功能。如FileOutputCommitter的setupJob,会在输出路径下创建一个名为_temporary的临时目录,cleanupJob则会删除这个目录。
    SequenceFileOutputFormat输出和TextOutputFormat输出分别对应输入的SequenceFileInputFormat和TextInputFormat。

    2.代码实例

    [java] view plaincopy
    1. package org.apache.hadoop.examples;  
    2.   
    3. import java.io.IOException;  
    4. import java.util.*;  
    5. import org.apache.hadoop.fs.Path;  
    6. import org.apache.hadoop.conf.*;  
    7. import org.apache.hadoop.io.*;  
    8. import org.apache.hadoop.mapred.*;  
    9. import org.apache.hadoop.util.*;  
    10.   
    11.   
    12.   
    13. //Partitioner函数的使用  
    14.   
    15. public class MyPartitioner  
    16.     // Map函数  
    17.     public static class MyMap extends MapReduceBase implements  
    18.             Mapper  
    19.         public void map(LongWritable key, Text value,  
    20.                 OutputCollector output, Reporter reporter)  
    21.                 throws IOException  
    22.             String[] arr_value value.toString().split(" ");  
    23.             //测试输出  
    24. //          for(int i=0;i  
    25. //          {  
    26. //              System.out.print(arr_value[i]+" ");  
    27. //          }  
    28. //          System.out.print(arr_value.length);  
    29. //          System.out.println();         
    30.             Text word1 new Text();  
    31.             Text word2 new Text();  
    32.             if (arr_value.length 3 
    33.                 word1.set("long");  
    34.                 word2.set(value);  
    35.             else if (arr_value.length 3 
    36.                 word1.set("short");  
    37.                 word2.set(value);  
    38.             else  
    39.                 word1.set("right");  
    40.                 word2.set(value);  
    41.              
    42.             output.collect(word1, word2);  
    43.          
    44.      
    45.       
    46.     public static class MyReduce extends MapReduceBase implements  
    47.             Reducer  
    48.         public void reduce(Text key, Iterator values,  
    49.                 OutputCollector output, Reporter reporter)  
    50.                 throws IOException  
    51.             int sum 0 
    52.             System.out.println(key);  
    53.             while (values.hasNext())  
    54.                 output.collect(key, new Text(values.next().getBytes()));      
    55.              
    56.          
    57.      
    58.   
    59.     // 接口Partitioner继承JobConfigurable,所以这里有两个override方法  
    60.     public static class MyPartitionerPar implements Partitioner  
    61.           
    62.         @Override  
    63.         public int getPartition(Text key, Text value, int numPartitions)  
    64.             // TODO Auto-generated method stub  
    65.             int result 0 
    66.             System.out.println("numPartitions--" numPartitions);  
    67.             if (key.toString().equals("long"))  
    68.                 result 0 numPartitions;  
    69.             else if (key.toString().equals("short"))  
    70.                 result 1 numPartitions;  
    71.             else if (key.toString().equals("right"))  
    72.                 result 2 numPartitions;  
    73.              
    74.             System.out.println("result--" result);  
    75.             return result;  
    76.          
    77.           
    78.         @Override  
    79.         public void configure(JobConf arg0)   
    80.          
    81.             // TODO Auto-generated method stub  
    82.          
    83.      
    84.   
    85.     //输入参数:/home/hadoop/input/PartitionerExample /home/hadoop/output/Partitioner  
    86.     public static void main(String[] args) throws Exception  
    87.         JobConf conf new JobConf(MyPartitioner.class);  
    88.         conf.setJobName("MyPartitioner");  
    89.           
    90.         //控制reducer数量,因为要分3个区,所以这里设定了3个reducer  
    91.         conf.setNumReduceTasks(3);  
    92.   
    93.         conf.setMapOutputKeyClass(Text.class);  
    94.         conf.setMapOutputValueClass(Text.class);  
    95.   
    96.         //设定分区类  
    97.         conf.setPartitionerClass(MyPartitionerPar.class);  
    98.   
    99.         conf.setOutputKeyClass(Text.class);  
    100.         conf.setOutputValueClass(Text.class);  
    101.   
    102.         //设定mapper和reducer类  
    103.         conf.setMapperClass(MyMap.class);  
    104.         conf.setReducerClass(MyReduce.class);  
    105.   
    106.         conf.setInputFormat(TextInputFormat.class);  
    107.         conf.setOutputFormat(TextOutputFormat.class);  
    108.   
    109.         FileInputFormat.setInputPaths(conf, new Path(args[0]));  
    110.         FileOutputFormat.setOutputPath(conf, new Path(args[1]));  
    111.   
    112.         JobClient.runJob(conf);  
    113.      
    114. }  

    版权声明:本文为博主原创文章,未经博主允许不得转载。

  • 相关阅读:
    EFCore数据库迁移命令
    EF基本操作
    EF执行存储过程
    [vue]element-ui使用
    [vue]vue-router的使用
    [vue]使用webpack打包
    [vue]插槽与自定义事件
    [vue]计算属性
    [vue]axios异步通信
    [vue]组件
  • 原文地址:https://www.cnblogs.com/jamesf/p/4751613.html
Copyright © 2011-2022 走看看