zoukankan      html  css  js  c++  java
  • Spark 累加器使用

    1.使用foreach碰到了问题

    没看过累加器的时候,写了这么个代码,发现map里头foreach完了还是0啊?咋回事啊?

     1 def calNrOfEachDataMap(data:RDD[String],neededDataMap:Set[Map[Int,String]]): Map[Map[Int,String],Int] ={              
     2   var ans:Map[Map[Int,String],Int] = Map[Map[Int,String],Int]()                                                       
     3   neededDataMap.foreach(m=>{                                                                                          
     4     if(!ans.contains(m)) ans += (m->0)                                                                                
     5   })                                                                                                                  
     6   data.foreach(sampleStr=>{                                                                                           
     7     val sampleArray = sampleStr.split(",")                                                                            
     8     neededDataMap.foreach(m=>{                                                                                        
     9       if(dataFitMap(sampleArray,m)){                                                                                  
    10         ans(m) = ans(m)+1                                                                                             
    11       }                                                                                                               
    12     })                                                                                                                
    13   })                                                                                                                  
    14   ans                                                                                                                 
    15 }                                                                                                                     

    原因:

    在spark算子中引用的外部变量,其实是变量的副本,在算子中对其值进行修改,只是改变副本的值,外部的变量还是没有变。

    通俗易懂的讲就是foreach里的变量带不出来的,除非用map,将结果作为rdd返回

    解决方法:

    1.map代替foreach。2.broadcast广播变量。3.accumulater累加器

     我个人通过用mapPartition做了,但是借机会学点Accumulater的东西吧。

    Accumulator

    Spark内置了三种类型的Accumulator,分别是LongAccumulator用来累加整数型,DoubleAccumulator用来累加浮点型,CollectionAccumulator用来累加集合元素。

    import org.apache.spark.util.CollectionAccumulator;
    import org.apache.spark.util.DoubleAccumulator;
    import org.apache.spark.util.LongAccumulator;

    定义的时候如下定义,要定义名字,我们就能再sparkUI里头看到了(2.0以前的版本采用的accumulator,2.0之后被废除了,所以通过自定义AccumulatorV2来操作)(java scala混一块了见谅)

            val accum = sc.collectionAccumulator[mutable.Map[String, String]]("My Accumulator")
            // LongAccumulator: 数值型累加
            LongAccumulator longAccumulator = sc.longAccumulator("long-account");
            // DoubleAccumulator: 小数型累加
            DoubleAccumulator doubleAccumulator = sc.doubleAccumulator("double-account"); 

    用的时候就这么用

    longAccumulator.add(x);
    doubleAccumulator.add(x);
    accum.add(mutable.Map(input._1 -> str))

    监控的时候。。。

            System.out.println("longAccumulator: " + longAccumulator.value());
            System.out.println("doubleAccumulator: " + doubleAccumulator.value());
            // 注意,集合中元素的顺序是无法保证的,多运行几次发现每次元素的顺序都可能会变化
            System.out.println("collectionAccumulator: " + collectionAccumulator.value());

    自定义Accumulator

    1. 继承AccumulatorV2,实现相关方法

    2. 创建自定义Accumulator的实例,然后在SparkContext上注册它

    它们都有一个共同的特点,就是最终的结果不受累加数据顺序的影响(对于CollectionAccumulator来说,可以简单的将结果集看做是一个无序Set),看到网上有博主举例子StringAccumulator,这个就是一个错误的例子,就相当于开了一百个线程,每个线程随机sleep若干毫秒然后往StringBuffer中追加字符,最后追加出来的字符串是无法被预测的。总结一下就是累加器的最终结果应该不受累加顺序的影响,否则就要重新审视一下这个累加器的设计是否合理。

    Accumulator陷阱

    我们继续把wordcount拿出来鞭尸(假设accu放在map里头,看到一个句号就加一,来算有几句话)。由于各个rdd之间有依赖,比如在转换算子之中,如果不persist,reduceBykey一次,map就会被调用一次wordcount只调用一次reduce,所以没问题。但实际算法里头,可能多次调用map,这样,吊一次,加一次,得到的最终结果,就是错的。

    解决方法:通过Cache / persist等将数据固定下来,下次用到就不会再调用accu了

    举个栗子:

    tf.map(......).cache()

  • 相关阅读:
    BZOJ_1002_[FJOI2007]_轮状病毒_(递推+高精)
    BZOJ_1001_狼抓兔子_(平面图求最小割+对偶图求最短路)
    BZOJ_1588_&_Codevs_1296_[HNOI2002]_营业额统计(平衡树/set)
    hdu3873 有约束条件的最短路
    尺取法 poj3061 poj3320
    费马小定理与欧拉公式
    uva 571 素数的性质
    uva10791 uva10780(分解质因数)
    勾股数组及其应用uva106
    hdu3501
  • 原文地址:https://www.cnblogs.com/tillnight1996/p/12494670.html
Copyright © 2011-2022 走看看