zoukankan      html  css  js  c++  java
  • reduce个数究竟和哪些因素有关

    reduce的数目究竟和哪些因素有关

     

     

    1、我们知道map的数量和文件数、文件大小、块大小、以及split大小有关,而reduce的数量跟哪些因素有关呢?

     设置mapred.tasktracker.reduce.tasks.maximum的大小能够决定单个tasktracker一次性启动reduce的数目,可是不能决定总的reduce数目。



      conf.setNumReduceTasks(4);JobConf对象的这种方法能够用来设定总的reduce的数目,看下Job Counters的统计:

     

     

    	Job Counters 
    		Data-local map tasks=2
    		Total time spent by all maps waiting after reserving slots (ms)=0
    		Total time spent by all reduces waiting after reserving slots (ms)=0
    		SLOTS_MILLIS_MAPS=10695
    		SLOTS_MILLIS_REDUCES=29502
    		Launched map tasks=2
    		Launched reduce tasks=4
    

     

     确实启动了4个reduce:看下输出:

     

    diegoball@diegoball:~/IdeaProjects/test/build/classes$ hadoop fs -ls  /user/diegoball/join_ou1123
    11/03/25 15:28:45 INFO security.Groups: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000
    11/03/25 15:28:45 WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
    Found 5 items
    -rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:28 /user/diegoball/join_ou1123/_SUCCESS
    -rw-r--r--   1 diegoball supergroup        124 2011-03-25 15:27 /user/diegoball/join_ou1123/part-00000
    -rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:27 /user/diegoball/join_ou1123/part-00001
    -rw-r--r--   1 diegoball supergroup        214 2011-03-25 15:28 /user/diegoball/join_ou1123/part-00002
    -rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:28 /user/diegoball/join_ou1123/part-00003
    

     仅仅有2个reduce在干活。为什么呢?

    shuffle的过程。须要依据key的值决定将这条<K,V> (map的输出),送到哪一个reduce中去。送到哪一个reduce中去靠调用默认的org.apache.hadoop.mapred.lib.HashPartitioner的getPartition()方法来实现。
    HashPartitioner类:

    package org.apache.hadoop.mapred.lib;
    
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    import org.apache.hadoop.mapred.Partitioner;
    import org.apache.hadoop.mapred.JobConf;
    
    /** Partition keys by their {@link Object#hashCode()}. 
     */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
    
      public void configure(JobConf job) {}
    
      /** Use {@link Object#hashCode()} to partition. */
      public int getPartition(K2 key, V2 value,
                              int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
      }
    }

     numReduceTasks的值在JobConf中能够设置。

    默认的是1:显然太小。


       这也是为什么默认的设置中总启动一个reduce的原因。

       返回与运算的结果和numReduceTasks求余。

       Mapreduce依据这个返回结果决定将这条<K,V>,送到哪一个reduce中去。



    key传入的是LongWritable类型,看下这个LongWritable类的hashcode()方法:

     

     public int hashCode() {
        return (int)value;
      }

     简简单单的返回了原值的整型值。

     由于getPartition(K2 key, V2 value,int numReduceTask)返回的结果仅仅有2个不同的值,所以终于仅仅有2个reduce在干活。
     

     HashPartitioner是默认的partition类。我们也能够自己定义partition类 :

     package com.alipay.dw.test;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.Partitioner;
    
    /**
     * Created by IntelliJ IDEA.
     * User: diegoball
     * Date: 11-3-10
     * Time: 下午5:26
     * To change this template use File | Settings | File Templates.
     */
    public class MyPartitioner implements Partitioner<IntWritable, IntWritable> {
        public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
            /* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */
            int nbOccurences = key.get();
            if (nbOccurences > 20051210)
                return 0;
            else
                return 1;
        }
    
        public void configure(JobConf arg0) {
    
        }
    }

     只须要覆盖getPartition()方法就OK。

    通过:
    conf.setPartitionerClass(MyPartitioner.class);
    能够设置自己定义的partition类。
    相同因为之返回2个不同的值0,1,无论conf.setNumReduceTasks(4);设置多少个reduce,也相同仅仅会有2个reduce在干活。

    因为每一个reduce的输出key都是经过排序的,上述自己定义的Partitioner还能够达到排序结果集的目的:

     

    11/03/25 15:24:49 WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
    Found 5 items
    -rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/_SUCCESS
    -rw-r--r--   1 diegoball supergroup      24546 2011-03-25 15:23 /user/diegoball/opt.del/part-00000
    -rw-r--r--   1 diegoball supergroup      10241 2011-03-25 15:23 /user/diegoball/opt.del/part-00001
    -rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/part-00002
    -rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/part-00003
    

     part-00000和part-00001是这2个reduce的输出,因为使用了自己定义的MyPartitioner,全部key小于20051210的的<K,V>都会放到第一个reduce中处理。key大于20051210就会被放到第二个reduce中处理。


    每一个reduce的输出key又是经过key排序的,所以终于的结果集降序排列。


    可是假设使用上面自己定义的partition类,又conf.setNumReduceTasks(1)的话。会如何? 看下Job Counters:

    	Job Counters 
    		Data-local map tasks=2
    		Total time spent by all maps waiting after reserving slots (ms)=0
    		Total time spent by all reduces waiting after reserving slots (ms)=0
    		SLOTS_MILLIS_MAPS=16395
    		SLOTS_MILLIS_REDUCES=3512
    		Launched map tasks=2
    		Launched reduce tasks=1
    

      仅仅启动了一个reduce。
      (1)、 当setNumReduceTasks( int a) a=1(即默认值),无论Partitioner返回不同值的个数b为多少,仅仅启动1个reduce,这样的情况下自己定义的Partitioner类没有起到不论什么作用。


      (2)、 若a!=1:
       a、当setNumReduceTasks( int a)里 a设置小于Partitioner返回不同值的个数b的话:

        public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
            /* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */
            int nbOccurences = key.get();
            if (nbOccurences < 20051210)
                return 0;
            if (nbOccurences >= 20051210 && nbOccurences < 20061210)
                return 1;
            if (nbOccurences >= 20061210 && nbOccurences < 20081210)
                return 2;
            else
                return 3;
        }
     

      同一时候设置setNumReduceTasks( 2)。

     

     于是抛出异常:

      11/03/25 17:03:41 INFO mapreduce.Job: Task Id : attempt_201103241018_0023_m_000000_1, Status : FAILED
    java.io.IOException: Illegal partition for 20110116 (3)
    	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:900)
    	at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:508)
    	at com.alipay.dw.test.KpiMapper.map(Unknown Source)
    	at com.alipay.dw.test.KpiMapper.map(Unknown Source)
    	at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
    	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:397)
    	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    	at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at javax.security.auth.Subject.doAs(Subject.java:396)
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:742)
    	at org.apache.hadoop.mapred.Child.main(Child.java:211) 
    

     某些key没有找到所相应的reduce去处。

    原因是仅仅启动了a个reduce。
     
       b、当setNumReduceTasks( int a)里 a设置大于Partitioner返回不同值的个数b的话,相同会启动a个reduce。可是仅仅有b个redurce上会得到数据。启动的其它的a-b个reduce浪费了。

     

       c、理想状况是a=b,这样能够合理利用资源,负载更均衡。

  • 相关阅读:
    WPS 模拟手写签名
    Flask 正则匹配路由、异常
    FLASK 加载配置、简单传参调用、指定请求方式、返回json、网页跳转(也可以自己的视图函数)、自定义状态码
    python IDLE 自动提示功能
    PYQT设计无边框窗体
    PYQT窗口居中
    PYQT窗口托盘目录
    PYQT窗口风格
    PYQT窗口可视化编程
    PYQT控件使用
  • 原文地址:https://www.cnblogs.com/mfrbuaa/p/5144262.html
Copyright © 2011-2022 走看看