zoukankan      html  css  js  c++  java
  • MapReduce Job详解

        Job job = new Job(conf, "MaxTemperature");
        job.setJarByClass(MaxTemperature.class);
        job.setMapperClass(MaxTemperatureMapper.class);   

    Combiner类:   

         通常,每个Map会产生大量的输出,combiner类的作用就是在Map端对输出先做一次合并,以减少传输到reducer的数据量。
         另外,需要注意的是,根据框架要求,combiner的输入输出类型必须和mapper的输出和reducer的输入类型一致。
        
         Combiner函数的示例如下:

          我们以计算特定的key对应的平均值为例,展示一下combiner的用法:

    Mapper:

    Class Mapper
      method Map(Long, String)      
      Emit(String, Integer)

    Combiner:

    class Combiner
        method Combine(String t, Integers [r1, r2, . . .])
        sum ← 0
        cnt ← 0
        for all integer r ∈ integers [r1, r2, . . .] do
            sum ← sum + r
            cnt ← cnt + 1
        Emit(String t, pair (sum, cnt))  // 分离sum和cnt

    Reducer:

    class Reducer
        method Reduce(String t, pairs [(s1, c1), (s2, c2) . . .])
        sum ← 0
        cnt ← 0
        for all pair (s, c) ∈ pairs [(s1, c1), (s2, c2) . . .] do
            sum ← sum + s
            cnt ← cnt + c
        ravg ← sum/cnt
        Emit(String t, Integer ravg)

    乍一看应该没有问题,但是不幸的是,这个combiner是不正确的。因为框架要求,combiner的输入输出类型必须和mapper的输出以及reducer的输入类型一致,更应是和Reducer的输入输出类型一致。即,当Combiner类正确的情况下,可以完全删除而对程序的运行结果没有任何影响,只是对于数据量大的job执行时间会变长。而上面将Combiner删除后,程序就会出错。这时候Mapper的输出类型和Reducer的输入类型不一致。所以会出现这个问题

    改变的方式非常简单,把Mapper的输出包装一下即可:

    class Mapper
        method Map(String t, Integer r)
            Emit(String t, pair (r, 1))
    
    class Combiner
        method Combine(String t, pairs [(s1, c1), (s2, c2) . . .])
        sum ← 0
        cnt ← 0
        for all pair (s, c) ∈ pairs [(s1, c1), (s2, c2) . . .] do
            sum ← sum + s
            cnt ← cnt + c
        Emit(String t, pair (sum, cnt))
    
    class Reducer
        method Reduce(String t, pairs [(s1, c1), (s2, c2) . . .])
        sum ← 0
        cnt ← 0
        for all pair (s, c) ∈ pairs [(s1, c1), (s2, c2) . . .] do
            sum ← sum + s
            cnt ← cnt + c
        ravg ← sum/cnt
        Emit(String t, integer ravg)

      //job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);  

    Partitioner类:   

    Partitioner负责对map中间输出键值对按键进行分区,用键或者键的子集来获得分区,典型的是用Hash函数来完成。
         分区的总数目和当然工作的reduce任务的总个数相同(reduce任务个数可以通过job.setNumReduceTasks(tasks)进行设置),
         因此,这种分组使得发送到reduce任务的中间键值对(记录条数)减少。
         PartitionerClass需要继承自Partitioner类,并重载它的getPartition方法

    /**
    * @param key 将要被分组的键
       * @param value 实体的值
       * @param numPartitions 总的分组数目
       * @return 返回给定key的分组个数
       */
      int getPartition(K2 key, V2 value, int numPartitions);  

       PartionerClass的示例代码如下:

    public static class CatPartiotner extends Partitioner<Text,Text>{
     @Override
     public int getPartition(Text key, Text value, int numPartitions){
     String[] parts=key.toString().split("-");
    if(parts.length==2){
               return Math.abs(parts[0],hashCode())%numPartitions;
    }
         return Math.abs(key.toString().hashCode())%numPartitions; } }

    然后在job中设置Partitioner Class:

    //job.setPartitionerClass(cls);

      // job.setInputFormatClass(Class<? extends InputFormat> cls);    //设置job的输入格式

      // job.setOutputFormatClass(Class<? extends InputFormat> cls); //设置job的输出格式

      Grouping类:

        定义一个比较器,对于Reducer的一个单一的调用reduce(Obect,Iterable,Context),该比较器控制哪些些键被聚合在一起。按照键值关系进行分组。 

        job.setGroupingComparatorClass(Class<? extends RawComparator> cls)

         Sorting类:

    定义一个比较器,确定在这些键被传递给Reducer之前是以何种方式排序的。主要是对键进行排序。

        job.setSortComparatorClass(Class<? extends RawComparator> cls )
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //job.setNumReduceTasks(tasks);
        for(int i=0;i<otherArgs.length-1;i++)
        FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }

  • 相关阅读:
    json数据读取后自动进行模板生成
    json数据排序
    fastjson json数据处理
    xml数据转Json
    jackson-dataformat-xml xml转json
    jar工具打包
    工具操作
    IBM MQ
    RabbitMq
    Excel4J
  • 原文地址:https://www.cnblogs.com/shudonghe/p/3114184.html
Copyright © 2011-2022 走看看