1.累加器介绍
累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。
主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,只能累加,不能减少。累加器只能在Driver端构建,并只能从Driver端读取结果,在Task端只能进行累加。
Spark原生地只支持数字类型的累加器,编程者可以自定义累加器进行新类型的支持。
如果创建累加器时指定了名字,可以在Spark的UI界面看到。
在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。
spark的transformation操作累加器值不会改变,只有出发action操作累加器值才会改变。
2.为什么累加器只能在task端进行累加?
在Task节点,准确的就是说在executor上。每个Task都会有一个累加器的变量,被序列化传输到executor端运行之后再返回过来都是独立运行的。如果在Task端去获取值的话,只能获取到当前Task的,Task与Task之间不会有影响。
3.累加器类型
// LongAccumulator: 数值型累加
LongAccumulator longAccumulator = sc.longAccumulator("long-account");
// DoubleAccumulator: 小数型累加
DoubleAccumulator doubleAccumulator = sc.doubleAccumulator("double-account");
// CollectionAccumulator:集合累加
CollectionAccumulator<Integer> collectionAccumulator = sc.collectionAccumulator("double-account");
4.累加器简单使用
我们在过滤掉RDD中奇数的同时进行计数,最后计算剩下整数的和。
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]") val sc = new SparkContext(sparkConf) val accum = sc.longAccumulator("longAccum") //统计奇数的个数 val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{ if(n%2!=0) accum.add(1L) n%2==0 }).reduce(_+_) println("sum: "+sum) println("accum: "+accum.value) sc.stop()
结果为:
sum: 20
accum: 5
这是结果正常的情况,但是在使用累加器的过程中如果对于spark的执行过程理解的不够深入就会遇到两类典型的错误:少加(或者没加)、多加。
少加的情况:
val accum = sc.longAccumulator("longAccum") val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{ accum.add(1L) n+1 }) println("accum: "+accum.value)
执行完毕,打印的值是多少呢?答案是0,因为累加器不会改变spark的lazy的计算模型,即在打印的时候像map这样的transformation还没有真正的执行,从而累加器的值也就不会更新。
多加的情况:
val accum = sc.longAccumulator("longAccum") val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{ accum.add(1L) n+1 }) numberRDD.count println("accum1:"+accum.value) numberRDD.reduce(_+_) println("accum2: "+accum.value)
结果我们得到了:
accum1:9
accum2: 18
我们虽然只在map里进行了累加器加1的操作,但是两次得到的累加器的值却不一样,这是由于count和reduce都是action类型的操作,触发了两次作业的提交,所以map算子实际上被执行了了两次,在reduce操作提交作业后累加器又完成了一轮计数,所以最终累加器的值为18。究其原因是因为count虽然促使numberRDD被计出来,但是由于没有对其进行缓存,所以下次再次需要使用numberRDD这个数据集是,还需要从并行化数据集的部分开始执行计算。解释到这里,这个问题的解决方法也就很清楚了,就是在count之前调用numberRDD的cache方法(或persist),这样在count后数据集就会被缓存下来,reduce操作就会读取缓存的数据集而无需从头开始计算了。改成如下代码即可:
val accum = sc.longAccumulator("longAccum") val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{ accum.add(1L) n+1 }) numberRDD.cache().count println("accum1:"+accum.value) numberRDD.reduce(_+_) println("accum2: "+accum.value)
这次两次打印的值就会保持一致了。
5.自定义累加器
字符串,对象进行累加使用自定义累加器
package com.test
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.util.AccumulatorV2 import org.junit.Test import scala.collection.mutable /** * 累加器 */ class Accumulator { @Test def acc(): Unit = { val conf = new SparkConf().setMaster("local[6]").setAppName("accumulator") val sc = new SparkContext(conf) val numAcc = new NumAccumulator sc.register(numAcc,"num") sc.parallelize(Seq("1", "2", "3")) .foreach(item => numAcc.add(item)) println(numAcc.value) sc.stop() } } class NumAccumulator extends AccumulatorV2[String, Set[String]] { private val nums: mutable.Set[String] = mutable.Set() /** * 告诉spark框架累加器对象是否为空 * * @return */ override def isZero: Boolean = { nums.isEmpty } /** * 提供spark框架一个拷贝的累加器 * @return */ override def copy(): AccumulatorV2[String, Set[String]] = { val newAccumulator = new NumAccumulator() nums.synchronized{ newAccumulator.nums ++= this.nums } newAccumulator } /** * 帮助 spark 框架清理累加器的内容 */ override def reset(): Unit = { nums.clear() } /** * 外部传入要累加的内容, 在这个方法中进行累加 * @param v */ override def add(v: String): Unit = { nums += v } /** * 累加器在进行累加的时候,可能每个分布式节点都有一个实例 * 在最后Driver进行一次合并, 把所有实例的内容合并起来, 会调用merge方法进行合并 * @param other */ override def merge(other: AccumulatorV2[String, Set[String]]): Unit = { nums ++= other.value } /** * 提供外部的累加结果 * 为什么一定要给不可变的,因为外部可能在进行修改,如果是可变的集合,其外部的修改会影响内部的值 * @return */ override def value: Set[String] = { nums.toSet } }
6.累加器的使用陷阱
我们都知道,spark中的一系列transform操作会构成一串长的任务链,此时需要通过一个action操作来触发,accumulator也是一样。因此在一个action操作之前,你调用value方法查看其数值,肯定是没有任何变化的。
如果程序中有两次 action操作,就会触发两次transform操作,相应地,累加器就会加两次。问题代码如下:
val accum= sc.accumulator(0, "Error Accumulator") val data = sc.parallelize(1 to 10) //用accumulator统计偶数出现的次数,同时偶数返回0,奇数返回1 val newData = data.map{x => { if(x%2 == 0){ accum += 1 0 }else 1 }} //使用action操作触发执行 newData.count //此时accum的值为5,是我们要的结果 accum.value //继续操作,查看刚才变动的数据,foreach也是action操作 newData.foreach(println) //上个步骤没有进行累计器操作,可是累加器此时的结果已经是10了 //这并不是我们想要的结果 accum.value
累加器陷阱解决办法:将任务之间的依赖关系切断,再次执行action操作就可以了
val accum= sc.accumulator(0, "Error Accumulator") val data = sc.parallelize(1 to 10) //代码和上方相同 val newData = data.map{x => {...}} //使用cache缓存数据,切断依赖。 newData.cache.count //有问题checkpoint //此时accum的值为5 accum.value newData.foreach(println) //此时的accum依旧是5 accum.value
调用cache,persist方法的时候会将之前的依赖切除,后续的累加器就不会再被之前的transfrom操作影响到了。
缓存不会切断依赖链,checkpoint才能切断依赖链