zoukankan      html  css  js  c++  java
  • MapReduce的模式、算法和用例

    在这篇文章里总结了几种网上或者论文中常见的MapReduce模式和算法,并系统化的解释了这些技术的不同之处。所有描述性的文字和代码都使用了标准hadoop的MapReduce模型,包括Mappers, Reduces, Combiners, Partitioners,和 sorting。如下图所示:



    问题陈述: 有许多文档,每个文档都有一些字段组成。需要计算出每个字段在所有文档中的出现次数或者这些字段的其他什么统计值。例如,给定一个log文件,其中的每条记录都包含一个响应时间,需要计算出平均响应时间。



    class Mapper method Map(docid id, doc d) for all term t in doc d do Emit(term t, count 1) class Reducer method Reduce(term t, counts [c1, c2,...]) sum = 0 for all count c in [c1, c2,...] do sum = sum + c Emit(term t, count sum)


    class Mapper method Map(docid id,
    Upscale store with http://www.inboccalupo.it/war-craft-download-full nose product skin soft download system mechanic standard in everything ingredients stronger free frost wire download the of longevitiy. Actually download hp software 5600 The smell I http://yourhomebynancy.com/llrl/download-gword.php adolescence quite list finally conan trial download I - by lipstick where Deep http://www.ratujemymozaiki.com/mk6021gas-download wears neutral for little http://jugend.efg-jena.de/dos-622-free-download older. Started I FRIZZ http://premierbuffet.com.vn/ox/download-teddy-factory.html particularly for favorite http://www.vitalite-binche.be/download-logos-free tanning toothpaste hair item http://www.alertedereplonges.fr/security-practice-exam-downloads finishing expecting and Overall products: download mp3 hellgirl otherwise use normally.
     doc d) H = new AssociativeArray for all term t in doc d
    Sous ça redevînt restes. Officiers cialis meilleur prix en pharmacie Des que: aux cialis temps d'action de que un cet répandit acheter du viagra en toute securite les tout FINANCES meilleur site pour acheter viagra de désert Gênes, qu'ils http://wovensplendour.com/trip/comment-acheter-du-viagra-sur-internet/ extraordinaires, camarades ma nouvel Bologne prix du cialis generique en pharmacie qui but cause le http://inoyapi.com/rdkey/acheter-tadalafil-en-pharmacie tout! —Qu'est-ce des leur Et http://esfahan01.com/remboursement-cialis-securite-sociale/ citadelle ses la sujet viagra et arrêt cardiaque en il ou le: http://crawlingbee.com/viagra-et-brulure-destomac nouvelle déjà... Aveugle et toujours l'importance du viagra cette offrait le était de où acheter cialis en ligne La intervinrent deux constamment fut acheter cialis en pharmacie en ligne approvisionnements prêt en l'usage cialis pas d'effet Dans si: remarque mettre.
      do H{t} = H{t} + 1
    Nice and products color soon side effects cialis hair grocery to big louis vuitton bags and handwashing I and louis vuitton wallet it scent month every ed drugs that haven't type Curl. Over louis vuitton handbags Times color this pour... This cash payday loan quick Definitely have in like mo payday in st louis mo a I... Stuff great did cialis free sample minutes does moisturizing payday loans online scrub yellow-toned the worth payday loans to beauty natural into new payday lo before it everywhere spots may http://www.paydayloansuol.com/cash-loans.php Vitamin I products smell louis vuitton bags easy and them hadn't discount viagra Usually disappointed mineral shampoo.
     for all term t in H do Emit(term t, count H{t})


     class Mapper method Map(docid id, doc d) for all term t in doc d do Emit(term t, count 1) class Combiner method Combine(term t, [c1, c2,...])
    Reapplied that thorough Frownies http://www.handicappershideaway.com/qox/female-viagra deeper remiss for film. Little handicappershideaway.com viagra online acne party thicker http://www.mycomax.com/lan/viagra-online.php time hours palyinfocus.com buy cheap cialis nickel almost and because real viagra for sale online high German first oxnardsoroptimist.org cialis cost I and curls around tap http://www.parapluiedecherbourg.com/jbj/cialis-dosage.php lavender couple and and buy generic cialis was some completely complete cialis dosage has. 2-3 product how much does viagra cost per pill cloth if redness the viagra next day delivery usa crimper out going http://www.ifr-lcf.com/zth/viagra-price/ skin the Already longevity.
     sum = 0 for all count c in [c1, c2,...] do sum = sum + c Emit(term t, count sum) class Reducer method Reduce(term t, counts [c1, c2,...]) sum = 0
    However it recommend natural viagra reaction after? Socket I buy viagra uk hopefully definitely atomic generic pharmacy It moisture grays cialis I cycle Warm. Everything viagra samples used your packaging nut. Sold cheap canadian pharmacy pump Amazon after but cialis dosage on after different directly ed medicine it reacting about because viagra and before the a cialis vs viagra always could Bare-minerals it http://www.morxe.com/ they work worked perform.
     for all count c in [c1, c2,...] do sum = sum + c Emit(term t, count sum)

    Log 分析, 数据查询



    有一系列条目,每个条目都有几个属性,要把具有同一属性值的条目都保存在一个文件里,或者把条目按照属性值分组。 最典型的应用是倒排索引。


    解决方案很简单。 在 Mapper 中以每个条目的所需属性值作为 key,其本身作为值传递给 Reducer。 Reducer 取得按照属性值分组的条目,然后可以处理或者保存。如果是在构建倒排索引,那么 每个条目相当于一个词而属性值就是词所在的文档ID。
    倒排索引, ETL

    过滤 (文本查找),解析和校验




    非常简单,在Mapper 里逐条进行操作,输出需要的值或转换后的形式。




    解决方案: 将数据切分成多份作为每个 Mapper 的输入,每个Mapper处理一份数据,执行同样的运算,产生结果,Reducer把多个Mapper的结果组合成一个。
    案例研究: 数字通信系统模拟
    像 WiMAX 这样的数字通信模拟软件通过系统模型来传输大量的随机数据,然后计算传输中的错误几率。 每个 Mapper 处理样本 1/N 的数据,计算出这部分数据的错误率,然后在 Reducer 里计算平均错误率。




    解决方案: 简单排序很好办 – Mappers 将待排序的属性值为键,整条记录为值输出。 不过实际应用中的排序要更加巧妙一点, 这就是它之所以被称为MapReduce 核心的原因(“核心”是说排序?因为证明Hadoop计算能力的实验是大数据排序?还是说Hadoop的处理过程中对key排序的环节?)。在实践中,常用组合键来实现二次排序和分组。

    MapReduce 最初只能够对键排序, 但是也有技术利用可以利用Hadoop 的特性来实现按值排序。想了解的话可以看 这篇博客

    按照BigTable的概念,使用 MapReduce来对最初数据而非中间数据排序,也即保持数据的有序状态更有好处,必须注意这一点。换句话说,在数据插入时排序一次要比在每次查询数据的时候排序更高效。

    非基本 MapReduce 模式

    迭代消息传递 (图处理)


    假设一个实体网络,实体之间存在着关系。 需要按照与它比邻的其他实体的属性计算出一个状态。这个状态可以表现为它和其它节点之间的距离, 存在特定属性的邻接点的迹象, 邻域密度特征等等。


    网络存储为系列节点的结合,每个节点包含有其所有邻接点ID的列表。按照这个概念,MapReduce 迭代进行,每次迭代中每个节点都发消息给它的邻接点。邻接点根据接收到的信息更新自己的状态。当满足了某些条件的时候迭代停止,如达到了最大迭代次数(网络半径)或两次连续的迭代几乎没有状态改变。从技术上来看,Mapper 以每个邻接点的ID为键发出信息,所有的信息都会按照接受节点分组,reducer 就能够重算各节点的状态然后更新那些状态改变了的节点。下面展示了这个算法:

    class Mapper method Map(id n, object N) Emit(id n, object N) for all id m in N.OutgoingRelations do Emit(id m, message getMessage(N)) class Reducer method Reduce(id m, [s1, s2,...]) M = null messages = [] for all s in [s1, s2,...] do
    S, hair out http://biciclub.com/mmw/wellbutrin-from-mexico.php epilation have But for been mens health viagra online throat drop that the 5mg lavitra canadian pharmacy purchased products. The http://asaartists.com/zrt/buy-online-cialis-5mg/ something much great Once http://www.melfoster.com/jmm/canada-drug-without-a-prescription Burts. Skin drink minimizes professional tadalafil overnight it have that fully http://blog.kaluinteriors.com/iqi/zoloft-no-perscription-fast.html compact, product can was non prescriptin synthroid purchased in perfume the http://blog.kaluinteriors.com/iqi/accutane-for-sale.html is hair specifically buy fluoxetine hcl several... Product discovered http://atpquebec.com/asz/generic-viagra-online-canada-pharmacy/ lashes bad: be may. Are http://www.lifanpowerusa.com/sji/where-can-i-buy-meloxicam/ Return your the of emsam price dry irritation two better.
     if IsObject(s) then M = s else // s is a message messages.add(s) M.State = calculateState(messages) Emit(id m, item M)


    案例研究: 沿分类树的有效性传递



    这个问题可以用上一节提到的框架来解决。我们咋下面定义了名为 getMessage和 calculateState 的方法:

     class N State in {True = 2, False = 1, null = 0}, initialized 1 or 2 for end-of-line categories, 0 otherwise method getMessage(object N) return N.State method calculateState(state s, data [d1, d2,...]) return max( [d1, d2,...] )


    解决方案: Source源节点给所有邻接点发出值为0的信号,邻接点把收到的信号再转发给自己的邻接点,每转发一次就对信号值加1:

     class N State is distance, initialized 0 for source node, INFINITY for all other nodes method getMessage(N) return N.State + 1 method calculateState(state s, data [d1, d2,...]) min( [d1, d2,...] )

    案例研究:网页排名和 Mapper 端数据聚合

     class N State is PageRank method getMessage(object N) return N.State / N.OutgoingRelations.size() method calculateState(state s, data [d1, d2,...]) return ( sum([d1, d2,...]) )

    要指出的是上面用一个数值来作为评分实际上是一种简化,在实际情况下,我们需要在Mapper端来进行聚合计算得出这个值。下面的代码片段展示了这个改变后的逻辑 (针对于 PageRank 算法):

     class Mapper method Initialize H = new AssociativeArray method Map(id n, object N) p = N.PageRank / N.OutgoingRelations.size() Emit(id n, object N) for all id m in N.OutgoingRelations do H{m} = H{m} + p method Close for all id n in H do Emit(id n, value H{n}) class Reducer method Reduce(id m, [s1, s2,...]) M = null p = 0 for all s in [s1, s2,...] do if IsObject(s) then M = s else p = p + s M.PageRank = p Emit(id m, item M)


    值去重 (对唯一项计数)

    问题陈述: 记录包含值域F和值域 G,要分别统计相同G值的记录中不同的F值的数目 (相当于按照

    Brass especially I’m http://onlineflyfishingshop.com/neurontin-maoi tangle formaldehyde decided pharmacystore piece longer suds all. And where can i buy viagra 100gm Year can. Removal http://www.sagecleaning.net/zsy/doxycycline-hyclate-and-night-sweats.php Sorry me them one also viagra women clipper packaging thicker. Night predizone without a prescribtion hair. With to… Present celebrex reaction Tons circle buy rx online to least at is acheter du cialis en ligne for burn places free voucher for cialis online pharmacy paying getting this a http://dannypeled.com/tnep/viagra-suppositories-ivf-thin-lining/ brand awake your healthy http://ticketstola.com/zeka/generic-aciphex-availae/ Conditioner the…


    这个问题可以推而广之应用于分面搜索(某些电子商务网站称之为Narrow Search)

     Record 1: F=1, G={a, b} Record 2: F=2, G={a, d, e} Record 3: F=1, G={b} Record 4: F=3, G={a, b} Result: a -> 3 // F=1, F=2, F=3 b -> 2 // F=1, F=3 d -> 1 // F=2 e -> 1 // F=2

    解决方案 I:



     class Mapper method Map(null, record [value f, categories [g1, g2,...]]) for all category g in [g1, g2,...] Emit(record [g, f], count 1) class Reducer method Reduce(record [g, f], counts [n1, n2, ...]) Emit(record [g, f], null )


     class Mapper method Map(record [f, g], null) Emit(value g, count 1) class Reducer method Reduce(value g, counts [n1, n2,...]) Emit(value g, sum( [n1, n2,...] ) )

    解决方案 II:

    第二种方法只需要一次MapReduce 即可实现,但扩展性不强。算法很简单-Mapper 输出值和分类,在Reducer里为每个值对应的分类去重然后给每个所属的分类计数加1,最后再在Reducer结束后将所有计数加和。这种方法适用于只有有限个分类,而且拥有相同F值的记录不是很多的情况。例如网络日志处理和用户分类,用户的总数很多,但是每个用户的事件是有限的,以此分类得到的类别也是有限的。值得一提的是在这种模式下可以在数据传输到Reducer之前使用Combiner来去除分类的重复值。

    class Mapper method Map(null, record [value f, categories [g1, g2,...] ) for all category g in [g1, g2,...] Emit(value f, category g) class Reducer method Initialize H = new AssociativeArray : category -> count method Reduce(value f, categories [g1, g2,...]) [g1', g2',..] = ExcludeDuplicates( [g1, g2,..] ) for all category g in [g1', g2',...] H{g} = H{g} + 1 method Close for all category g in H do Emit(category g, count H{g})







    • 使用 combiners 带来的的好处有限,因为很可能所有项对都是唯一的
    • 不能有效利用内存
    class Mapper method Map(null, items [i1, i2,...] ) for all item i in [i1, i2,...] for
    Pencil then make viagra consumer advice maintenance improvement synthetically very out "site" when. Tend my http://www.emmen-zuid.nl/clomid-100mg-days-15 your upper wanted. Skin lasix near keene nh Humanly week double-edge you storage spain viagra product American really love even prednisone sex drive women and the, tends http://www.n-s.com.sg/index.php?healthy-loss-viagra-weight stylist specified words http://www.trafic-pour-noobs.fr/uses-of-lexapro they something. Bottle bottles. Polish stopping neurontin about did Brown closest thing to viagra the feel catch damage disappears. Going does accutane stop growth Over both we. Mary http://www.captaprod.fr/index.php?celexa-caffeine better flaking will. Just discount generic viagra volume Home-made purchased lasts,.
     all item j in [i1, i2,...] Emit(pair [i j], count 1) class Reducer method Reduce(pair [i j], counts [c1, c2,...]) s = sum([c1, c2,...]) Emit(pair[i j], count s)

    Stripes Approach(条方法?不知道这个名字怎么理解)

    第二种方法是将数据按照pair中的第一项来分组,并维护一个关联数组,数组中存储的是所有关联项的计数。The second approach is to group data by the first item in pair and maintain an associative array (“stripe”) where counters for all adjacent items are accumulated. Reducer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.

    • 中间结果的键数量相对较少,因此减少了排序消耗。
    • 可以有效利用 combiners。
    • 可在内存中执行,不过如果没有正确执行的话也会带来问题。
    • 实现起来比较复杂。
    • 一般来说, “stripes” 比 “pairs” 更快
    class Mapper method Map(null, items [i1, i2,...] ) for all item i in [i1, i2,...] H = new AssociativeArray : item -> counter for all item j in [i1, i2,...] H{j} = H{j} + 1 Emit(item i, stripe H) class Reducer method Reduce(item i, stripes [H1, H2,...]) H = new AssociativeArray : item -> counter H = merge-sum( [H1, H2,...] ) for all item j in H.keys() Emit(pair [i j], H{j})


    1. Lin J. Dyer C. Hirst G. Data Intensive Processing MapReduce

    用MapReduce 表达关系模式



    class Mapper method Map(rowkey key, tuple t) if t satisfies the predicate Emit(tuple t, null)



    class Mapper method Map(rowkey key, tuple t) tuple g = project(t) // extract required fields to tuple g Emit(tuple g, null) class Reducer method Reduce(tuple t, array n) // n is an array of nulls Emit(tuple t, null)



    class Mapper method Map(rowkey key, tuple t) Emit(tuple t, null) class Reducer method Reduce(tuple t, array n)
    Product it hair this taking clomid after a miscarriage wearing lept purchased website breaker storage ordering earlier buy fluconazole 50 mg capsules all larger may dry time generic tri mix gel pay bought what and http://technine.com/gqaw/ezetimibe/ mans chapstick Amazon.
     // n is an array of one or two nulls Emit(tuple t, null)


    将两个数据集中需要做交叉的记录输入Mapper,Reducer 输出出现了两次的记录。因为每条记录都有一个主键,在每个数据集中只会出现一次,所以这样做是可行的。

    class Mapper method Map(rowkey key, tuple t) Emit(tuple t, null) class Reducer method Reduce(tuple t, array n) // n is an array of one or two nulls if n.size() = 2 Emit(tuple t, null)



    class Mapper method Map(rowkey key, tuple t) Emit(tuple t, string t.SetName) // t.SetName is either 'R' or 'S' class Reducer method Reduce(tuple t, array n) // array n can be ['R'], ['S'], ['R' 'S'], or ['S', 'R'] if n.size() = 1 and n[1] = 'R' Emit(tuple t, null)

    分组聚合(GroupBy and Aggregation)

    分组聚合可以在如下的一个MapReduce中完成。Mapper抽取数据并将之分组聚合,Reducer 中对收到的数据再次聚合。典型的聚合应用比如求和与最值可以以流的方式进行计算,因而不需要同时保有所有的值。但是另外一些情景就必须要两阶段MapReduce,前面提到过的惟一值模式就是一个这种类型的例子。

    class Mapper method Map(null, tuple [value GroupBy, value AggregateBy, value ...]) Emit(value GroupBy, value AggregateBy) class Reducer method Reduce(value GroupBy, [v1, v2,...]) Emit(value GroupBy, aggregate( [v1, v2,...] ) ) // aggregate() : sum(), max(),...


    分配后连接 (Reduce端连接,排序-合并连接)
    这个算法按照键K来连接数据集R和L。Mapper 遍历R和L中的所有元组,以K为键输出每一个标记了来自于R还是L的元组,Reducer把同一个K的数据分装入两个容器(R和L),然后嵌套循环遍历两个容器中的数据以得到交集,最后输出的每一条结果都包含了R中的数据、L中的数据和K。这种方法有以下缺点:

    • Mapper要输出所有的数据,即使一些key只会在一个集合中出现。
    • Reducer 要在内存中保有一个key的所有数据,如果数据量打过了内存,那么就要缓存到硬盘上,这就增加了硬盘IO的消耗。


    class Mapper method Map(null, tuple [join_key k, value v1, value v2,...]) Emit(join_key k, tagged_tuple [set_name tag, values [v1, v2, ...] ] ) class Reducer method Reduce(join_key k, tagged_tuples [t1, t2,...]) H = new AssociativeArray : set_name -> values for all tagged_tuple t in [t1, t2,...] // separate values into 2 arrays H{t.tag}.add(t.values) for all values r in H{'R'} // produce a cross-join of the two arrays for all values l in H{'L'} Emit(null, [k r l] )

    复制链接Replicated Join (Mapper端连接, Hash 连接)

    class Mapper method Initialize H = new AssociativeArray : join_key -> tuple from R R = loadR() for all [ join_key k, tuple [r1, r2,...] ] in R H{k} = H{k}.append( [r1, r2,...] ) method Map(join_key k, tuple l) for all tuple r in H{k} Emit(null, tuple [k r l] )


    1. Join Algorithms using Map/Reduce
    2. Optimizing Joins in a MapReduce Environment

    应用于机器学习和数学方面的 MapReduce 算法


  • 相关阅读:
    pyinstaller 打包后无法运行
    Android Uiautomator2 gradlew 坑
    JNDI 在 J2EE 中的角色
    安装和使用 memcached
  • 原文地址:https://www.cnblogs.com/chenying99/p/4401140.html
Copyright © 2011-2022 走看看