默认情况下,如果在一个算子函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只能操作自己的那份变量副本。如果多个task想要共享某个变量,那么这种方式是做不到的。
Spark为此提供了两种共享变量,一种是Broadcast Variable(广播变量),另一种是Accumulator(累加变量)。Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。
共享广播变量(只读)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("shared")
val sc = new SparkContext(conf)
val factor = 3
// 创建广播变量 factorBroadcast
val factorBroadcast = sc.broadcast(factor)
// Array(1,2,3,4,5)
val data = Array(1 to 5:_*)
val rdd = sc.parallelize(data,2)
// factorBroadcast.value 获取广播变量的值
rdd.map(num => num * factorBroadcast.value ).foreach(println)
sc.stop()
}
3
6
9
12
15
共享累加变量(只写)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("shared")
val sc = new SparkContext(conf)
// 创建共享累加变量
var accumulator = sc.accumulator(0)
// Array(1,2,3,4,5)
val data = Array(1 to 5: _*)
val rdd = sc.parallelize(data, 2)
// accumulator.add(num) 累加rdd的数据
rdd.foreach(num => accumulator += num)
// 报错! 算子在Excutor端执行,不可读累加广播变量
// rdd.foreach(num => println(accumulator.value))
// 在driver端可以获取共享累加变量的值
println("共享累加变量的值为:" + accumulator.value)
sc.stop()
}
共享累加变量的值为:15