一般想在spark算子中使用外部变量,并改变外部变量的值,都是使用累加器来实现,因为在spark算子中引用的外部变量,其实是变量的副本,在算子中对其值进行修改,只是改变副本的值,外部的变量还是没有变。
累加器accumulator代码:
val accum = sc.collectionAccumulator[mutable.Map[String, String]]("My Accumulator") fileRdd.foreach(input => { val str = input._1 + "/t" + input._2 + "/t" + input._3 accum.add(mutable.Map(input._1 -> str)) }) println(accum.value)
累加器返回的结果:
[Map(imsi1 -> imsi1/t2018-07-29 11:22:23/tzd-A), Map(imsi2 -> imsi2/t2018-07-29 11:22:24/tzd-A), Map(imsi3 -> imsi3/t2018-07-29 11:22:25/tzd-A)]
广播变量broadcast代码:
val fileRdd = sc.parallelize(Array(("imsi1","2018-07-29 11:22:23","zd-A"),("imsi2","2018-07-29 11:22:24","zd-A"),("imsi3","2018-07-29 11:22:25","zd-A"))) val result = mutable.Map.empty[String,String] val resultBroadCast = sc.broadcast(result) fileRdd.foreach(input=>{ val str = (input._1+" "+input._2+" "+input._3).toString; resultBroadCast.value += (input._1.toString -> str) }) resultBroadCast.value.foreach(println(_)) //返回3