通过继承AccumulatorV2可以实现自定义累加器。
官方案例可参考:http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators
下面是我自己写的一个统计卡种数量的案例。
package com.shuai7boy.myscalacode import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.util.AccumulatorV2 case class Card(var card1Count: Int, var card2Count: Int) class CalcCardCount extends AccumulatorV2[Card, Card] { var result = new Card(0, 0) /** * * 判断,这个要和reset设定值一致 * * @return */ override def isZero: Boolean = { result.card1Count == 0 && result.card2Count == 0 } /** * * 复制一个新的对象 * * @return */ override def copy(): AccumulatorV2[Card, Card] = { val newCalcCardCount = new CalcCardCount() newCalcCardCount.result = this.result newCalcCardCount } /** * * 重置每个分区的数值 */ override def reset(): Unit = { result.card1Count = 0 result.card2Count = 0 } /** * 每个分区累加自己的数值 * * @param v */ override def add(v: Card): Unit = { result.card1Count += v.card1Count result.card2Count += v.card2Count } /** * * 合并分区值,求得总值 * * @param other */ override def merge(other: AccumulatorV2[Card, Card]): Unit = other match { case o: CalcCardCount => { result.card1Count += o.result.card1Count result.card2Count += o.result.card2Count } } //返回结果 override def value: Card = result } object CardCount { def main(args: Array[String]) { val conf = new SparkConf().setAppName("calcCardCountDemo").setMaster("local") val sc = new SparkContext(conf) val cc = new CalcCardCount sc.register(cc) val cardList = sc.parallelize(List[String]("card1 1", "card1 3", "card1 7", "card2 5", "card2 2"), 2) val cardMapRDD = cardList.map(card => { var cardInfo = new Card(0, 0) card.split(" ")(0) match { case "card1" => cardInfo = Card(card.split(" ")(1).toInt, 0) case "card2" => cardInfo = Card(0, card.split(" ")(1).toInt) case _ => Card(0, 0) } cc.add(cardInfo) }) cardMapRDD.count() //执行action,触发上面的累加操作 println("card1总数量为:" + cc.result.card1Count + ",card2总数量为:" + cc.result.card2Count) } }
打印结果是:
card1总数量为:11,card2总数量为:7
通过上面代码,就可以同时统计两个变量的值了,当然如果需要更多,可以扩展。默认的累加器只实现了一个。