copy的官方programming guide的代码,只有一个要说一下,Accumulators的更新是在action做的,transformation操作中对于Accumulators的增加不会即时更新的,只有action只会更新。
scala> val accum = sc.accumulator(0, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10 object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = { v1 += v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)