zoukankan      html  css  js  c++  java
  • Spark中自定义累加器Accumulator

    1. 自定义累加器

    自定义累加器需要继承AccumulatorParam,实现addInPlace和zero方法。

    例1:实现Long类型的累加器

    object LongAccumulatorParam extends AccumulatorParam[Long]{
      override def addInPlace(r1: Long, r2: Long) = {
        println(s"$r1	$r2")
        r1 + r2
      }
    
      override def zero(initialValue: Long) = {
        println(initialValue)
        0
      }
    
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("testLongAccumulator"))
        val acc = sc.accumulator(0L, "LongAccumulator")
        sc.parallelize(Array(1L,2L,3L,4L,5L)).foreach(acc.add)
        println(acc.value)
        sc.stop()
      }
    } 

    例2:定义Set[String],可用于记录错误日志

    object StringSetAccumulatorParam extends AccumulatorParam[Set[String]]{
      override def addInPlace(r1: Set[String], r2: Set[String]): Set[String] = { r1 ++ r2 }
    
      override def zero(initialValue: Set[String]): Set[String] = { Set() }
    }
    
    
    object ErrorLogHostSet extends Serializable {
      @volatile private var instanceErr: Accumulator[Set[String]] = null
    
      def getInstance(sc: SparkContext): Accumulator[Set[String]] = {
        if(null == instanceErr){
          synchronized{
            if(null == instanceErr){
              instanceErr = sc.accumulator(Set[String]())(StringSetAccumulatorParam)
            }
          }
        }
        instanceErr
      }
    
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("testSetStringAccumulator"))
    
        val dataRdd = sc.parallelize(Array("a2","c4","6v","67s","3d","45s","2c6","35d","7c8d9","34dc5"))
        val errorHostSet = getInstance(sc)
    
        val a = sc.accumulableCollection("a")
    
        dataRdd.filter(ele => {
          val res = ele.contains("d")
          if(res) errorHostSet += Set(ele)
          res
        }).foreach(println)
    
        errorHostSet.value.foreach(println)
    
        sc.stop()
      }
    }

    2. AccumulableCollection使用

    object AccumulableCollectionTest {
    
      case class Employee(id: String, name: String, dept: String)
    
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("AccumulableCollectionTest").setMaster("local[4]"))
    
        val empAccu = sc.accumulableCollection(mutable.HashMap[String,Employee]())
    
        val employees = List(
          Employee("10001", "Tom", "Eng"),
          Employee("10002", "Roger", "Sales"),
          Employee("10003", "Rafael", "Sales"),
          Employee("10004", "David", "Sales"),
          Employee("10005", "Moore", "Sales"),
          Employee("10006", "Dawn", "Sales"),
          Employee("10007", "Stud", "Marketing"),
          Employee("10008", "Brown", "QA")
        )
    
        System.out.println("employee count " + employees.size)
    
        sc.parallelize(employees).foreach(e => {
          empAccu += e.id -> e
        })
    
        println("empAccumulator size " + empAccu.value.size)
        empAccu.value.foreach(entry =>
          println("emp id = " + entry._1 + " name = " + entry._2.name))
        sc.stop()
      }
    
    }
  • 相关阅读:
    第 425 期 Python 周刊
    第 423 期 Python 周刊
    第423期 Python 周刊
    Python Weekly 422
    第421期 Python 周刊
    第420期 Python 周刊
    LeetCode 3: 无重复字符的最长子串 Longest Substring Without Repeating Characters
    Python Weekly 419
    LeetCode 771: 宝石与石头 Jewels and Stones
    LeetCode 652: 寻找重复的子树 Find Duplicate Subtrees
  • 原文地址:https://www.cnblogs.com/mengrennwpu/p/10460541.html
Copyright © 2011-2022 走看看