zoukankan      html  css  js  c++  java
  • 【Spark学习笔记】广播变量和累加器

    一、广播变量(调优操作)

    使用广播变量是个调优操作,不使用广播变量可能会造成Executor端内存溢出。

    1.普通变量定义

    val rdd1: RDD[String] = sc.parallelize(Array[String]("Java", "C", "Python", "Hadoop", "Spark"),2)
    // driver每发送一个task到executor,就将此变量发送给executor一次,占用executor内存
    val blackList = List("C", "Python")
    
    val rdd2: RDD[String] = rdd1.filter(!blackList.contains(_))
    
    rdd2.foreach(println)
    

    RDD算子部分是在Executor端执行,其他的在Driver端执行。

    blackList端创建的,但是因为需要在Executor端使用,所以Driver会把blackList以task的形式发送到Executor端。如果有很多个task,Executor端就会有很多个blackList,可能会造成Executor内存溢出。

    不使用广播变量:

    img

    使用广播变量:

    img

    2.广播变量定义

    val sc = new SparkContext(conf)
    
    val rdd1: RDD[String] = sc.parallelize(Array[String]("Java", "C", "Python", "Hadoop", "Spark"),2)
    // driver每发送一个task到executor,就将此变量发送给executor一次,占用executor内存
    val blackList = List("C", "Python")
    // 使用广播变量,driver将只发送一次到executor,在executor上管理
    val broadCast = sc.broadcast(blackList)
    
    val rdd2: RDD[String] = rdd1.filter(!broadCast.value.contains(_))
    
    rdd2.foreach(println)
    

    注意:广播变量在Driver端定义赋初值,在Driver端可以修改广播变量的值,在Executor端无法修改。

    二、累加器(正确操作)

    累加器不是调优操作,而是正确操作,如果不这么做,就是错的。

    1.普通累加器

    val rdd1: RDD[String] = sc.parallelize(Array[String]("Java", "C", "Python", "Hadoop", "Spark"),2)
    // 结果为0
    var sum = 0
    // 必须得使用累加器
    val accumulator = sc.longAccumulator
    
    rdd1.foreach(_ => {
      sum += 1
      println(s"Executor -- i = $sum")
      accumulator.add(1)
      println(s"Executor -- accumulator = $accumulator")
    
    })
    println(s"sum:$sum")
    println(s"accumulator:${accumulator.value}")
    

    注意:累加器在Driver端定义赋初值,在Executor端更新。新版的Spark在Executor端也可以读取累加器的值

    2.自定义累加器

    自定义实体类,封装多个属性,实现累加

    case class PersonInfo(var personCount: Int, var ageCount: Int)
    

    自定义累加器需要继承AccumulatorV2

    class MyAcc extends AccumulatorV2[PersonInfo, PersonInfo] {
    
      private var resultInfo = new PersonInfo(0, 0)
    
      /** 判断reset 是否是初始值,一定和reset保持一致 */
      override def isZero: Boolean = {
        resultInfo.personCount == 0 && resultInfo.ageCount == 0
      }
    
      override def copy(): AccumulatorV2[PersonInfo, PersonInfo] = {
        val myAcc = new MyAcc
        myAcc.resultInfo = this.resultInfo
        myAcc
      }
    
      override def reset(): Unit = {
        resultInfo = new PersonInfo(0, 0)
      }
    
      /** 作用在Executor端 */
      override def add(v: PersonInfo): Unit = {
        resultInfo.personCount += v.personCount
        resultInfo.ageCount += v.ageCount
      }
    
      /** Driver端的result与Executor中的每个result进行聚合 */
      override def merge(other: AccumulatorV2[PersonInfo, PersonInfo]): Unit = {
        resultInfo.personCount += other.asInstanceOf[MyAcc].resultInfo.personCount
        resultInfo.ageCount += other.asInstanceOf[MyAcc].resultInfo.ageCount
      }
    
      override def value: PersonInfo = resultInfo
    }
    

    使用自定义累加器

    def main(args: Array[String]): Unit = {
    
      val conf = new SparkConf()
      conf.setMaster("local")
      conf.setAppName("自定义累加器")
    
      val sc = new SparkContext(conf)
    
      sc.setLogLevel("error")
    
      // 自定义累加器
      val myAcc = new MyAcc
      sc.register(myAcc)
    
      val rdd1: RDD[String] = sc.parallelize(Array[String](
        "张三 18", "李四 20", "王五 21", "马六 16"
      ), 3)
    
      val value: RDD[Unit] = rdd1.map(elem => {
        val age = elem.split(" ")(1).toInt
        myAcc.add(new PersonInfo(1, age))
      })
      // action算子触发
      value.count()
    
      println("accumlator value = " + myAcc.value)
    
    }
    
  • 相关阅读:
    程序员如何在百忙中更有效地利用时间,如何不走岔路,不白忙(忙得要有效率,要有收获)
    最坏的不是面试被拒,而是没面试机会,以面试官视角分析哪些简历至少能有面试机会
    最近面了不少java开发,据此来说下我的感受:哪怕事先只准备1小时,成功概率也能大大提升
    Ribbon整合Eureka组件,以实现负载均衡
    时间对于程序员的价值,以及如何高效地利用时间,同时划分下勤奋度的等级
    面试过程中,可以通过提问环节的发挥,提升面试的成功率
    以技术面试官的经验分享毕业生和初级程序员通过面试的技巧(Java后端方向)
    和小鲜肉相比,老程序员该由哪些优势?同时说下我看到的老程序员的三窟
    通过软引用和弱引用提升JVM内存使用性能的方法(面试时找机会说出,一定能提升成功率)
    Spring Clould负载均衡重要组件:Ribbon中重要类的用法
  • 原文地址:https://www.cnblogs.com/yangyh11/p/14065727.html
Copyright © 2011-2022 走看看