zoukankan      html  css  js  c++  java
  • 关于Hadoop中reducer端combiner的一些思考

    什么是Combiner Functions

    “Many MapReduce jobs are limited by the bandwidth available on the cluster, so it pays to minimize the data transferred between map and reduce tasks. Hadoop allows the user to specify a combiner function to be run on the map output—the combiner function’s output forms the input to the reduce function. Since the combiner function is an optimization, Hadoop does not provide a guarantee of how many times it will call it for a particular map output record, if at all. In other words, calling the combiner function zero, one, or many times should produce the same output from the reducer.” -- 《Hadoop: The Definitive Guide》
    

    简单的说,combiner是一个在mapper之后运行的function,非常类似reducer的功能,所以在《Hadoop In Action》又叫作“local reduce”。它的好处是减少网络的数据传输,从而提高性能。但因为是一个优化功能,所以Hadoop并不保证会运行它。

    其实这个的有一个更深入的设计问题,这里有一个假设就是大家倾向于fat mapper和slim reducer。就是一般情况下,大家会尽可能的在mapper里实现复杂的逻辑和运算,在reducer只是做简单的汇聚。这就是为什么有mapper端的combiner而没有reducer端的combiner。

    reducer端的combiner

    是不是试想这样的情景,一个项目需要多个mapper和reducer才能完成,而且必须在reducer端实现业务逻辑。举一个例子,输入日志包含如下字段:用户ID,国家,timestamp。需要统计不同国家的用户的访问时间(定义为最后一个访问的时间戳去第一次访问的时间戳)。日志为~10G/小时,但是分析以一天为单位。

    在这里,同一个用户会出现在一天的任何小时,所以必须将同一个用户汇聚到一起来计算访问时间。显然无法在mapper端实现这样的功能。相同的用户以“用户ID”作为partition key排序后汇聚到reducer端。采用如下的标准模板(Perl语言为例):

    while ( my $line = <STDIN> ) {
    	chomp($line);
    	( $user_id, $country, $timestamp ) = split( /\t/, $line );
    
    	# set base key
    	$key = $base_key;
    
    	if ($cur_key) {
    		if ( $key ne $cur_key ) {
    			&onEndKey();
    			&onBeginKey();
    		}
    		&onSameKey();
    	}
    	else {
    		&onBeginKey();
    		&onSameKey();
    	}
    }
    
    if ($cur_key) {
    	&onEndKey();
    }

    这里需要另一个mapper/reducer来汇聚不同国家的时间。如果在第一个reducer端能够有一个combiner,那么将极大的减少网络传输,甚至避免out of memory问题(会单独写一个文章)。因为现有的Hadoop并没有这个功能,只能自己来了。最简单的实现就是提供一个全局的hash来汇聚。因为从本质上来说,combiner或者reducer其实就是一个hash。

    具体的实现就略去了,这里只是提供一个设计思路,欢迎大家一起讨论。

  • 相关阅读:
    Java数据持久层
    一张图解决ThreadLocal
    类加载器及其加载原理
    手写LRU缓存淘汰算法
    使用归并排序思想解决逆序对数量问题
    Same Origin Policy 浏览器同源策略详解
    如何估算线程池的线程数?
    分布式锁为什么要选择Zookeeper而不是Redis?
    SpringBoot的SpringMVC使用FastJson依赖时LocalDateTime全局配置序列化格式
    数据库中的枚举值如何存储
  • 原文地址:https://www.cnblogs.com/ainima/p/6331295.html
Copyright © 2011-2022 走看看