zoukankan      html  css  js  c++  java
  • Spark中自定义累加器

    通过继承AccumulatorV2可以实现自定义累加器。

    官方案例可参考:http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators

    下面是我自己写的一个统计卡种数量的案例。

    package com.shuai7boy.myscalacode
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.util.AccumulatorV2
    
    case class Card(var card1Count: Int, var card2Count: Int)
    
    class CalcCardCount extends AccumulatorV2[Card, Card] {
      var result = new Card(0, 0)
    
      /** *
       * 判断,这个要和reset设定值一致
       *
       * @return
       */
      override def isZero: Boolean = {
        result.card1Count == 0 && result.card2Count == 0
      }
    
      /** *
       * 复制一个新的对象
       *
       * @return
       */
      override def copy(): AccumulatorV2[Card, Card] = {
        val newCalcCardCount = new CalcCardCount()
        newCalcCardCount.result = this.result
        newCalcCardCount
      }
    
      /** *
       * 重置每个分区的数值
       */
      override def reset(): Unit = {
        result.card1Count = 0
        result.card2Count = 0
      }
    
      /**
       * 每个分区累加自己的数值
       *
       * @param v
       */
      override def add(v: Card): Unit = {
        result.card1Count += v.card1Count
        result.card2Count += v.card2Count
      }
    
      /** *
       * 合并分区值,求得总值
       *
       * @param other
       */
      override def merge(other: AccumulatorV2[Card, Card]): Unit = other match {
        case o: CalcCardCount => {
          result.card1Count += o.result.card1Count
          result.card2Count += o.result.card2Count
        }
    
      }
    
      //返回结果
      override def value: Card = result
    }
    
    
    object CardCount {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("calcCardCountDemo").setMaster("local")
        val sc = new SparkContext(conf)
        val cc = new CalcCardCount
        sc.register(cc)
        val cardList = sc.parallelize(List[String]("card1 1", "card1 3", "card1 7", "card2 5", "card2 2"), 2)
        val cardMapRDD = cardList.map(card => {
          var cardInfo = new Card(0, 0)
          card.split(" ")(0) match {
            case "card1" => cardInfo = Card(card.split(" ")(1).toInt, 0)
            case "card2" => cardInfo = Card(0, card.split(" ")(1).toInt)
            case _ => Card(0, 0)
          }
          cc.add(cardInfo)
        })
        cardMapRDD.count() //执行action,触发上面的累加操作
        println("card1总数量为:" + cc.result.card1Count + ",card2总数量为:" + cc.result.card2Count)
      }
    }

    打印结果是:

    card1总数量为:11,card2总数量为:7 

    通过上面代码,就可以同时统计两个变量的值了,当然如果需要更多,可以扩展。默认的累加器只实现了一个。 

  • 相关阅读:
    hdu 5101 Select
    hdu 5100 Chessboard
    cf B. I.O.U.
    cf C. Inna and Dima
    cf B. Inna and Nine
    cf C. Counting Kangaroos is Fun
    Radar Installation 贪心
    spfa模板
    Sequence
    棋盘问题
  • 原文地址:https://www.cnblogs.com/shun7man/p/12764840.html
Copyright © 2011-2022 走看看