zoukankan      html  css  js  c++  java
  • reduce个数问题

    reduce的数目到底和哪些因素有关 

    1、我们知道map的数量和文件数、文件大小、块大小、以及split大小有关,而reduce的数量跟哪些因素有关呢? 
     设置mapred.tasktracker.reduce.tasks.maximum的大小可以决定单个tasktracker一次性启动reduce的数目,但是不能决定总的reduce数目。 

    Job Counters 
    		Data-local map tasks=2
    		Total time spent by all maps waiting after reserving slots (ms)=0
    		Total time spent by all reduces waiting after reserving slots (ms)=0
    		SLOTS_MILLIS_MAPS=10695
    		SLOTS_MILLIS_REDUCES=29502
    		Launched map tasks=2
    		Launched reduce tasks=4

     确实启动了4个reduce:看下输出:

    diegoball@diegoball:~/IdeaProjects/test/build/classes$ hadoop fs -ls  /user/diegoball/join_ou1123
    11/03/25 15:28:45 INFO security.Groups: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000
    11/03/25 15:28:45 WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
    Found 5 items
    -rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:28 /user/diegoball/join_ou1123/_SUCCESS
    -rw-r--r--   1 diegoball supergroup        124 2011-03-25 15:27 /user/diegoball/join_ou1123/part-00000
    -rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:27 /user/diegoball/join_ou1123/part-00001
    -rw-r--r--   1 diegoball supergroup        214 2011-03-25 15:28 /user/diegoball/join_ou1123/part-00002
    -rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:28 /user/diegoball/join_ou1123/part-00003

     只有2个reduce在干活。为什么呢? 
    shuffle的过程,需要根据key的值决定将这条<K,V> (map的输出),送到哪一个reduce中去。送到哪一个reduce中去靠调用默认的org.apache.hadoop.mapred.lib.HashPartitioner的getPartition()方法来实现。 
    HashPartitioner类:

    package org.apache.hadoop.mapred.lib;
    
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    import org.apache.hadoop.mapred.Partitioner;
    import org.apache.hadoop.mapred.JobConf;
    
    /** Partition keys by their {@link Object#hashCode()}. 
     */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
    
      public void configure(JobConf job) {}
    
      /** Use {@link Object#hashCode()} to partition. */
      public int getPartition(K2 key, V2 value,
                              int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
      }
    }

     numReduceTasks的值在JobConf中可以设置。默认的是1:显然太小。 
       这也是为什么默认的设置中总启动一个reduce的原因。 
       返回与运算的结果和numReduceTasks求余。 
       Mapreduce根据这个返回结果决定将这条<K,V>,送到哪一个reduce中去。 

    public int hashCode() {
        return (int)value;
      }

     简简单单的返回了原值的整型值。 
     因为getPartition(K2 key, V2 value,int numReduceTask)返回的结果只有2个不同的值,所以最终只有2个reduce在干活。 
      

    package com.alipay.dw.test;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.Partitioner;
    
    /**
     * Created by IntelliJ IDEA.
     * User: diegoball
     * Date: 11-3-10
     * Time: 下午5:26
     * To change this template use File | Settings | File Templates.
     */
    public class MyPartitioner implements Partitioner<IntWritable, IntWritable> {
    	public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
    		/* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */
    		int nbOccurences = key.get();
    		if (nbOccurences > 20051210)
    			return 0;
    		else
    			return 1;
    	}
    
    	public void configure(JobConf arg0) {
    
    	}
    }

     仅仅需要覆盖getPartition()方法就OK。通过: 
    conf.setPartitionerClass(MyPartitioner.class); 
    可以设置自定义的partition类。 
    同样由于之返回2个不同的值0,1,不管conf.setNumReduceTasks(4);设置多少个reduce,也同样只会有2个reduce在干活。 

    11/03/25 15:24:49 WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
    Found 5 items
    -rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/_SUCCESS
    -rw-r--r--   1 diegoball supergroup      24546 2011-03-25 15:23 /user/diegoball/opt.del/part-00000
    -rw-r--r--   1 diegoball supergroup      10241 2011-03-25 15:23 /user/diegoball/opt.del/part-00001
    -rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/part-00002
    -rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/part-00003

     part-00000和part-00001是这2个reduce的输出,由于使用了自定义的MyPartitioner,所有key小于20051210的的<K,V>都会放到第一个reduce中处理,key大于20051210就会被放到第二个reduce中处理。 
    每个reduce的输出key又是经过key排序的,所以最终的结果集降序排列。 

    Job Counters 
    		Data-local map tasks=2
    		Total time spent by all maps waiting after reserving slots (ms)=0
    		Total time spent by all reduces waiting after reserving slots (ms)=0
    		SLOTS_MILLIS_MAPS=16395
    		SLOTS_MILLIS_REDUCES=3512
    		Launched map tasks=2
    		Launched reduce tasks=1

      只启动了一个reduce。 
      (1)、 当setNumReduceTasks( int a) a=1(即默认值),不管Partitioner返回不同值的个数b为多少,只启动1个reduce,这种情况下自定义的Partitioner类没有起到任何作用。 
      (2)、 若a!=1: 
       a、当setNumReduceTasks( int a)里 a设置小于Partitioner返回不同值的个数b的话:

    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
    		/* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */
    		int nbOccurences = key.get();
    		if (nbOccurences < 20051210)
    			return 0;
    		if (nbOccurences >= 20051210 && nbOccurences < 20061210)
    			return 1;
    		if (nbOccurences >= 20061210 && nbOccurences < 20081210)
    			return 2;
    		else
    			return 3;
    	}

      同时设置setNumReduceTasks( 2)。

     于是抛出异常:

    11/03/25 17:03:41 INFO mapreduce.Job: Task Id : attempt_201103241018_0023_m_000000_1, Status : FAILED
    java.io.IOException: Illegal partition for 20110116 (3)
    	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:900)
    	at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:508)
    	at com.alipay.dw.test.KpiMapper.map(Unknown Source)
    	at com.alipay.dw.test.KpiMapper.map(Unknown Source)
    	at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
    	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:397)
    	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    	at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at javax.security.auth.Subject.doAs(Subject.java:396)
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:742)
    	at org.apache.hadoop.mapred.Child.main(Child.java:211)

     某些key没有找到所对应的reduce去处。原因是只启动了a个reduce。 
      
       b、当setNumReduceTasks( int a)里 a设置大于Partitioner返回不同值的个数b的话,同样会启动a个reduce,但是只有b个redurce上会得到数据。启动的其他的a-b个reduce浪费了。

       c、理想状况是a=b,这样可以合理利用资源,负载更均衡。

    总结和map partition函数和参数的设置有关

  • 相关阅读:
    javaweb请求编码 url编码 响应编码 乱码问题 post编码 get请求编码 中文乱码问题 GET POST参数乱码问题 url乱码问题 get post请求乱码 字符编码
    windows查看端口占用 windows端口占用 查找端口占用程序 强制结束端口占用 查看某个端口被占用的解决方法 如何查看Windows下端口占用情况
    javaWeb项目中的路径格式 请求url地址 客户端路径 服务端路径 url-pattern 路径 获取资源路径 地址 url
    ServletRequest HttpServletRequest 请求方法 获取请求参数 请求转发 请求包含 请求转发与重定向区别 获取请求头字段
    HttpServletResponse ServletResponse 返回响应 设置响应头设置响应正文体 重定向 常用方法 如何重定向 响应编码 响应乱码
    Servlet主要相关类核心类 容器调用的过程浅析 servlet解读 怎么调用 Servlet是什么 工作机制
    linq查询语句转mongodb
    winddows rabbitmq安装与配置
    Redis For Windows安装及密码
    出现,视图必须派生自 WebViewPage 或 WebViewPage错误解决方法
  • 原文地址:https://www.cnblogs.com/yaohaitao/p/6696894.html
Copyright © 2011-2022 走看看