zoukankan      html  css  js  c++  java
  • Spark中自定义累加器Accumulator

    1. 自定义累加器

    自定义累加器需要继承AccumulatorParam,实现addInPlace和zero方法。

    例1:实现Long类型的累加器

    object LongAccumulatorParam extends AccumulatorParam[Long]{
      override def addInPlace(r1: Long, r2: Long) = {
        println(s"$r1	$r2")
        r1 + r2
      }
    
      override def zero(initialValue: Long) = {
        println(initialValue)
        0
      }
    
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("testLongAccumulator"))
        val acc = sc.accumulator(0L, "LongAccumulator")
        sc.parallelize(Array(1L,2L,3L,4L,5L)).foreach(acc.add)
        println(acc.value)
        sc.stop()
      }
    } 

    例2:定义Set[String],可用于记录错误日志

    object StringSetAccumulatorParam extends AccumulatorParam[Set[String]]{
      override def addInPlace(r1: Set[String], r2: Set[String]): Set[String] = { r1 ++ r2 }
    
      override def zero(initialValue: Set[String]): Set[String] = { Set() }
    }
    
    
    object ErrorLogHostSet extends Serializable {
      @volatile private var instanceErr: Accumulator[Set[String]] = null
    
      def getInstance(sc: SparkContext): Accumulator[Set[String]] = {
        if(null == instanceErr){
          synchronized{
            if(null == instanceErr){
              instanceErr = sc.accumulator(Set[String]())(StringSetAccumulatorParam)
            }
          }
        }
        instanceErr
      }
    
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("testSetStringAccumulator"))
    
        val dataRdd = sc.parallelize(Array("a2","c4","6v","67s","3d","45s","2c6","35d","7c8d9","34dc5"))
        val errorHostSet = getInstance(sc)
    
        val a = sc.accumulableCollection("a")
    
        dataRdd.filter(ele => {
          val res = ele.contains("d")
          if(res) errorHostSet += Set(ele)
          res
        }).foreach(println)
    
        errorHostSet.value.foreach(println)
    
        sc.stop()
      }
    }

    2. AccumulableCollection使用

    object AccumulableCollectionTest {
    
      case class Employee(id: String, name: String, dept: String)
    
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("AccumulableCollectionTest").setMaster("local[4]"))
    
        val empAccu = sc.accumulableCollection(mutable.HashMap[String,Employee]())
    
        val employees = List(
          Employee("10001", "Tom", "Eng"),
          Employee("10002", "Roger", "Sales"),
          Employee("10003", "Rafael", "Sales"),
          Employee("10004", "David", "Sales"),
          Employee("10005", "Moore", "Sales"),
          Employee("10006", "Dawn", "Sales"),
          Employee("10007", "Stud", "Marketing"),
          Employee("10008", "Brown", "QA")
        )
    
        System.out.println("employee count " + employees.size)
    
        sc.parallelize(employees).foreach(e => {
          empAccu += e.id -> e
        })
    
        println("empAccumulator size " + empAccu.value.size)
        empAccu.value.foreach(entry =>
          println("emp id = " + entry._1 + " name = " + entry._2.name))
        sc.stop()
      }
    
    }
  • 相关阅读:
    Socket编程中的强制关闭与优雅关闭及相关socket选项
    怎样通过MSG_WAITALL设置阻塞时间,IO模式精细讲解: MSG_DONTWAIT 、 MSG_WAITALL
    RTSP、HTTP、HTTPS、SDP四种协议详解
    RTMP、RTSP、HTTP视频协议详解(附:直播流地址、播放软件)
    Idea连接服务器docker并部署代码到docker实现一键启动
    @Autowired注解和静态方法
    关于工具类静态方法调用@Autowired注入的service类问题
    @PostConstruct
    spring-boot-starter-mail技术总结
    使用SpringBoot发送mail邮件
  • 原文地址:https://www.cnblogs.com/mengrennwpu/p/10460541.html
Copyright © 2011-2022 走看看