zoukankan      html  css  js  c++  java
  • hadoop中的Partition

    解析Partition

    Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出,下面我们就来分析参与这个过程的类。

    Mapper的结果,可能送到Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。Mapper最终处理的键值对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer那。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。它只有一个方法,

    输入是Map的结果对<key, value>和Reducer的数目,输出则是分配的Reducer(整数编号)。就是指定Mappr输出的键值对到哪一个reducer上去。系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。这样保证如果有相同的key值,肯定被分配到同一个reducre上。如果有N个reducer,编号就为0,1,2,3……(N-1)

    1、为何使用Partitioner,主要是想reduce的结果能够根据key再次分类输出到不同的文件夹中。

    2、结果能够直观,同时做到对数据结果的简单的统计分析。

    1、输入的数据文件内容如下(1条数据内容少,1条数据内容超长,3条数据内容正常):

    kaka    1       28
    hua 0 26
    chao 1
    tao 1 22
    mao 0 29 22

    2、目的是为了分别输出结果,正确的结果输出到一个文本,太短的数据输出到一个文本,太长的输出到一个文本,共三个文本输出。

    代码


        package com.partition;

        import java.io.IOException;
        import java.util.*;

        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.conf.*;
        import org.apache.hadoop.io.*;
        import org.apache.hadoop.mapred.*;
        import org.apache.hadoop.util.*;

        import com.hadoop.mapred.WordCount.Map;
    import com.hadoop.mapred.WordCount.Reduce;

        //Partitioner函数的使用

        public class MyPartitioner {
            //Map函数
            public static class MyMap extends MapReduceBase implements Mapper<LongWritable,Text,Text,Text>{
                
                
                public void map(LongWritable key, Text value,
                        OutputCollector<Text, Text> output, Reporter reporter)
                        throws IOException {
                    
                    String [] arr_value = value.toString().split("\t");
                    Text word1 = new Text();
                    Text word2 = new Text();
                    
                    if(arr_value.length > 3){
                        word1.set("long");
                        word2.set(value);
                    }else if(arr_value.length < 3){
                        word1.set("short");
                        word2.set(value);
                    }else {
                        word1.set("right");
                        word2.set(value);
                    }
                    
                    output.collect(word1, word2);
                }        
            }        
            
            public static class MyPartitionerPar implements Partitioner<Text, Text> {

                @Override
                public int getPartition(Text key, Text value, int numPartitions) {
                    int result = 0;
                    System.out.println("numPartitions--"+numPartitions);
                    if (key.toString().equals("long")) {
                        result = 0 % numPartitions;
                    } else if (key.toString().equals("short")) {
                        result = 1 % numPartitions;
                    } else if (key.toString().equals("right")) {
                        result = 2 % numPartitions;
                    }
                    return result;
                }
        //通过上面的函数可以知道,想把哪个key分到对用的partition中,只需要在这个key对应的partitiob中,返回特定的值。
                @Override
                public void configure(JobConf arg0) {
                    // TODO Auto-generated method stub
                    
                }

            }
            
            public static class MyReduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
                 public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
                   int sum = 0;
                   while (values.hasNext()) {
                       output.collect(key, new Text(values.next().getBytes()));
                   }               
                 }
               }
            
            public static void main(String[] args) throws Exception {
                 JobConf conf = new JobConf(MyPartitioner.class);
                 conf.setJobName("MyPartitioner");
                 conf.setNumReduceTasks(3);
                 
                 conf.setMapOutputKeyClass(Text.class);
                 conf.setMapOutputValueClass(Text.class);
                 
                 conf.setPartitionerClass(MyPartitionerPar.class);

                 conf.setOutputKeyClass(Text.class);
                 conf.setOutputValueClass(Text.class);
            
                 conf.setMapperClass(MyMap.class);
                 conf.setReducerClass(MyReduce.class);
            
                 conf.setInputFormat(TextInputFormat.class);
                 conf.setOutputFormat(TextOutputFormat.class);
            
                 FileInputFormat.setInputPaths(conf, new Path(args[0]));
                  FileOutputFormat.setOutputPath(conf, new Path(args[1]));
            
                 JobClient.runJob(conf);
            }    
        }
       

    Reducer是所有用户定制Reducer类的基类,和Mapper类似,它也有setup,reduce,cleanup和run方法,其中setup和cleanup含义和Mapper相同,reduce是真正合并Mapper结果的地方,它的输入是key和这个key对应的所有value的一个迭代器,同时还包括Reducer的上下文。系统中定义了两个非常简单的Reducer,IntSumReducer和LongSumReducer,分别用于对整形/长整型的value求和。

    Reduce的结果,通过Reducer.Context的方法collect输出到文件中,和输入类似,Hadoop引入了OutputFormat。OutputFormat依赖两个辅助接口:RecordWriter和OutputCommitter,来处理输出。RecordWriter提供了write方法,用于输出<key, value>和close方法,用于关闭对应的输出。OutputCommitter提供了一系列方法,用户通过实现这些方法,可以定制OutputFormat生存期某些阶段需要的特殊操作。我们在TaskInputOutputContext中讨论过这些方法(明显,TaskInputOutputContext是OutputFormat和Reducer间的桥梁)。OutputFormat和RecordWriter分别对应着InputFormat和RecordReader,系统提供了空输出NullOutputFormat(什么结果都不输出,NullOutputFormat.RecordWriter只是示例,系统中没有定义),LazyOutputFormat(没在类图中出现,不分析),FilterOutputFormat(不分析)和基于文件FileOutputFormat的SequenceFileOutputFormat和TextOutputFormat输出。

    基于文件的输出FileOutputFormat利用了一些配置项配合工作,包括:
    mapred.output.compress:是否压缩;
    mapred.output.compression.codec:压缩方法;
    mapred.output.dir:输出路径;
    mapred.work.output.dir:输出工作路径。
    FileOutputFormat还依赖于FileOutputCommitter,通过FileOutputCommitter提供一些和Job,Task相关的临时文件管理功能。如FileOutputCommitter的setupJob,会在输出路径下创建一个名为_temporary的临时目录,cleanupJob则会删除这个目录。
    SequenceFileOutputFormat输出和TextOutputFormat输出分别对应输入的SequenceFileInputFormat和TextInputFormat。

  • 相关阅读:
    torch 入门
    编译CDH Spark源代码
    Marsedit 破解版下载(3.5.6)
    程序员必备:技术面试准备手册
    360私有化详细资料曝光:抵押总部大楼(转)
    底层软件工程师的一次冒险经历
    这十种算法撑起了整个世界
    秒杀系统架构分析与实战(深度学习资料)
    北京程序员 VS 硅谷程序员(转)
    Timestamp 使用
  • 原文地址:https://www.cnblogs.com/limingluzhu/p/3014024.html
Copyright © 2011-2022 走看看