出现的原因: 因为在spark中每一个task都会分到不同的节点去计算运行,如果需要将多个节点的数据累加到同一个变量中,则使用累加器
数字累加器
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import org.apache.spark.{SparkConf, SparkContext} /** * @Author: 唐松怀 * @Date: 2020/3/15 14:22 */ //数字累加器 object rdd01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("wordcount").setMaster("local[2]") val sc = new SparkContext(conf) val visitor01=sc.parallelize(Array(("Bob",13),("joe",18),("Jack",24),("Billy",89))) val plus_always=sc.longAccumulator("this is a always_plus tool!") visitor01.foreach( iter=>if (iter._2>20){ plus_always.add(1) } ) println(plus_always.value) } }
集合累加器
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import org.apache.spark.{SparkConf, SparkContext} /** * @Author: 唐松怀 * @Date: 2020/3/15 15:59 */ //集合累加器 object rdd02 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("wordcount").setMaster("local[2]") val sc = new SparkContext(conf) case class User(name:String,telephone:String) val collect_plus=Array(User("Alice","15837312345") ,User("Bob","13937312666") ,User("Thomas","13637312345") ,User("Tom","18537312777") ,User("Boris","13837312998") ) val rdd05=sc.parallelize(collect_plus,2) val user_plustool=sc.collectionAccumulator[User]("用户累加器") rdd05.foreach(user => { val teletephone=user.telephone.reverse if (teletephone(0)==teletephone(1) && teletephone(0)==teletephone(2)){ user_plustool.add(user) // println(user_plustool) } } ) println(user_plustool) } }
自定义累加器: