zoukankan      html  css  js  c++  java
  • MapReduce算法

    MapReduce算法

    MapReduce算法讲大规模计算的过程分成了两个阶段:

    1. Map阶段:在这个阶段,通过Map过程,将原始数据列表,处理成中间数据,用于Reduce过程的处理
    2. Reduce阶段:将Map阶段产生的中间数据综合归纳成输出结果

    这样说起来似乎比较抽象,我们用一个实例(好像是mr论文里面的例子,otz)来说明这个过程:
    任务:我们现在有200篇文章,我们需要统计这200篇文章中,每一个英文单词都出现了几次。
    Map阶段:这个阶段是分别针对每一篇文章的,统计出这一篇文章中,每个单词出现了几次。它的运算结果类似这样:
    在第1篇文章中:找到了hello * 1, world * 1
    在第2篇文章种:找到了hello * 1, tony * 1, huang * 1
    ...
    Reduce阶段:这个阶段就是将上面的中间结果进行综合,它的运算结果类似这样:
    在所有文章中,一共有:hello * 2, tony * 1, world * 1, huang * 1
    所以,我们就会发现,我们很容易将Map过程分配到不同的计算机上执行(最简单的,每台机器计算一篇文章),而对于Reduce阶段也可以并行化(比如第一台机器Reduce1~4篇文章的数据,第二台机器Reduce5~8篇文章的数据,最后通过递归的reduce过程就可以把所有文章的数据整合在一起了)。
    所以,这个算法非常有利于对巨大的数据的并行化处理(paper的副标题里就这么写的嘛)

    Erlang实现 - 原型1

    罗唆了那么久,终于讲到该如何实现这个算法了。
    好了,首先,我们直接根据MapReduce的思想,利用erlang内置的lists库的函数来实现这个功能,代码如下:

    1 map_reduce(MapReduceSource) ->
    2     MapResult=lists:map(MapSource),
    3     lists:foldl(Reduce, [], MapResult).

    哇,这也太简单了吧?!首先调用lists:map函数将原结果通过Map函数生成中间结果(MapResult),然后又通过foldl进行Reduce过程。
    (电视购物的口气)没错,用Erlang就是那么简单!
    观众:这样你不是在串行执行嘛?!MapReduce的优势一点也没有发挥出来嘛。
    别着急嘛,这个是第一个原型嘛,下面我们就对它进行并行化的改造!

    Erlang实现 - 原型2

    在Erlang中实现并行化的最简单的方式(也是唯一的方式)当然就是进程(process)啦。所有的erlang大大们都教导我们,开erlang的进程的开销是很小的,所以,我们的思路就是针对源数据中的每一个元素创建一个map的进程,并发的执行map操作。同时呢,创建一个monitor进程去进行Reduce操作,最后再把最终结果返回给主进程。
    ok,直接上代码:

    -module(emr).
    -export([map_reduce/3]).


    % the monitor waiting for the map result, and then call the reduce to generate the final result
    monitor(ProcessPid, Result, Reduce, Count) ->
        receive
            MapResult ->
                ReducedResult = Reduce(MapResult, Result),
                case Count of
                    1 -> 
                        ProcessPid ! ReducedResult;
                    _ -> 
                        monitor(ProcessPid, ReducedResult, Reduce, Count - 1)
                end
        end.


    % a delegate to send the map result to the monitor
    map(MonitorPid, Map, Element) -> 
        MonitorPid ! Map(Element).


    % the map-reduce main function
    map_reduce(_Map, _Reduce, []) -> 
        [];
    map_reduce(Map, Reduce, List) ->
        Self = self(),
        Length = length(List),
        MonitorPid = spawn(fun() -> monitor(Self, [], Reduce, Length) end), 
        F = fun(Element) ->
            spawn(fun()->map(MonitorPid, Map, Element) end)
        end,
        lists:foreach(F,List),
        receive
            Result -> Result
        end.

    这里的map_reduce函数首先创建一个monitor进程,去处理计算结果,然后针对源数据中的没一个元素创建一个map函数的进程,最后再等待monitor进程把最终的计算结果发送回来。

    这里的map方法不是原始的Map函数,而是Map函数的一个马甲,map函数会把Map函数的计算结果发送给monitor进程。

    测试1

    写了这两个map_reduce函数,总得找点东西来测试一下吧?!,erlang的例子里面不是必然会出现阶乘函数嘛?!我们也就不要免俗了:

    -module(emr_test).
    -export([factorial/1,
     test/3, exec_test/4]).
     
    % an algorithm function for test
    factorial(1)
        -> 1;
    factorial(N)
        -> N * factorial(N - 1).
    % test a method on (Size) data for (Times) times, and give the {TotalTimeCost, AverageTimeCost}
    test(Method, Size, Times)
        ->
        Map = fun(X) -> 
            factorial(X) 
        end,
        Reduce = fun(MapResult, FinalResult) -> 
            FinalResult ++ [MapResult]
        end,
        Source = lists:seq(1, Size),
        {TimeCost, _Result} = timer:tc(?MODULE, exec_test, [Map, Reduce,Method, Source]),
        case Times of
            1 -> 
                {TimeCost, TimeCost};
            N -> 
                {OtherTimeCost, _OtherAvgTimeCost} = test(Method, Size, N - 1),
                {TimeCost + OtherTimeCost, (TimeCost + OtherTimeCost) / N}
        end.
     
    % execute the real test progress
    exec_test(Map, Reduce, Method, Source) ->
        case Method of
            map_reduce -> 
                emr:map_reduce(Map, Reduce, Source);
            sequence -> 
                AllMapResult = lists:map(Map, Source),
                lists:foldl(Reduce, [], AllMapResult) 
        end.

    这里的factorial就是标准的阶乘函数,这里的test是为了方便测试运算速度的一个代理。第一个参数表示了用什么方法来进行计算(map_reduce表示并行计算,sequence表示串行计算,也就是原型1的方法),第2个参数表示要计算到几的阶乘,第3个参数表示要进行几次测试计算平均值。而exec_test就是具体进行计算的函数了。

    这里放上我的测试环境和结果:
    测试环境:
    CPU:Intel Core 2 Quad Q9400S 2.66GHz (4 cores)
    内存:Kingston 2GB DDR3 1333MHz * 2
    操作系统:Apple Mac OS X Snow Leopard (10.6.2)
    (没错拉,是黑苹果。。。)
    计算1~10000的所有数的阶乘

    测试结果:

    1 emr_test:test(sequence, 10000, 2).

    总时间:361.98s,每次时间:180.99s

    1 emr_test:test(map_reduce, 10000, 2).

    总时间:107.22s,每次时间:53.61s

    =====================================================================================================

    -module(pmap).  

    -export([pmap/2]).  
      
    pmap(F, L) ->   
      S = self(),  
      Pids = lists:map(fun(I) ->   
        spawn(fun() -> do_fun(S, F, I) end)  
      end, L),  
      gather(Pids).  
      
    gather([H|T]) ->  
      receive  
        {H, Result} -> [Result|gather(T)]  
      end;  
    gather([]) ->  
      [].  
      
    do_fun(Parent, F, I) ->                        
        Parent ! {self(), (catch F(I))}.  


    pmap的原理也很简单,对List的每项元素的Fun调用都spawn一个process来实际处理,然后再调用gather来收集结果。 


    如此简洁的代码就实现了基本的MapReduce

  • 相关阅读:
    Cookie、Session详解
    阿里云高速maven库
    java23种设计模式详解
    分布式和集群的区别
    2016 年 Java 优秀文章
    java任务调度
    解酒
    中医教你如何调理女性内分泌失调
    Oracle RedoLog-二进制格式分析,文件头,DML,DDL
    Oracle RedoLog-基本概念和组成
  • 原文地址:https://www.cnblogs.com/xiao0913/p/3652676.html
Copyright © 2011-2022 走看看