zoukankan      html  css  js  c++  java
  • Spark共享变量和累加器的基本原理与用途

    累加器:分布式共享只写变量

      1. 把Executor端的信息聚合到Driver端

      2. 在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的新的副本

        每个task更新副本的值之后,传回Driver端进行merge(合并)

      3.原理类似于mapreduce,分布式改变,然后聚合这些改变

    自定义累加器:

    package com.hch.acc
    import org.apache.spark.util.AccumulatorV2
    import org.apache.spark.{SparkConf, SparkContext}
    import scala.collection.mutable
    /**
     * @author Joey413
     */
    object Spark03_Acc_WordCount {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Acc_WordCount")
        val sc: SparkContext = new SparkContext(sparkConf)
    
        val rdd = sc.makeRDD(List("judy", "joey", "rose"))
    
        // 累加器
        // 创建累加器对象
        val wcAcc = new MyAccumulator
        // 向spark进行注册
        sc.register(wcAcc, "wordCountAcc")
    
        rdd.foreach(
          word => {
            // 使用累加器
            wcAcc.add(word)
          }
        )
    
        println(wcAcc.value)
    
        sc.stop()
      }
    
      /**
       * 自定义数据累加器:WordCount
       * 1. 继承AccumulatorV2 定义泛型
       *      IN : 累加器输入的数据类型 String
       *      OUT : 累加器返回的数据类型 Map
       */
      class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
    
        private var wcMap = mutable.Map[String, Long]()
    
        // 判断累加器是否为空
        override def isZero: Boolean = {
          wcMap.isEmpty
        }
    
        // 拷贝累加器
        override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
          new MyAccumulator()
        }
    
        // 重置累加器
        override def reset(): Unit = {
          wcMap.clear()
        }
    
        // 获取累加器需要计算的值
        override def add(word: String): Unit = {
          val newCnt = wcMap.getOrElse(word, 0L) + 1
    
          wcMap.update(word, newCnt)
        }
    
        // Driver端合并多个累加器
        override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    
          val map1 = this.wcMap
          val map2 = other.value
    
          map2.foreach{
            case (word, count) => {
              // map1数据累加
              val newCount = map1.getOrElse(word, 0L) + count
              // 更新map1数据更新
              map1.update(word, newCount)
            }
          }
        }
    
        // 累加器结果
        override def value: mutable.Map[String, Long] = {
          wcMap
        }
      }
    }
    

      

    广播变量: 分布式共享只读变量

      1. 用来分发较大的对象,以供一个或多个Spark操作使用

      2. 在多个并行操作中使用同一个变量,但是Spark会为每个任务分别发送

      3. 闭包数据以task为单位发送,每个任务包含闭包数据(冗余数据过多消耗内存,降低性能)

      4. Executor 即相当于一个JVM进程,启动时会自动分配内存,将任务中的闭包数据放置在Executor的内存中达到共享目的

      5. Spark中的广播变量可以将闭包的数保存到Executor的内存中(只读 = 不可修改)

    广播变量案例:

    package com.hch.acc
    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.{SparkConf, SparkContext}
    import scala.collection.mutable
    /**
     * @author Joey413
     */
    object Spark05_Bc {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("BC2")
        val sc: SparkContext = new SparkContext(sparkConf)
    
        val rdd1 = sc.makeRDD(List(
          ("a", 1), ("b", 2), ("c", 3)
        ))
    
        val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))
    
        // 封装广播变量
        val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
    
        rdd1.map {
          case (w, c) => {
            // 从广播变量中取出值 访问广播变量
            val i = bc.value.getOrElse(w, 0)
            (w, (c, i))
          }
        }.collect().foreach(println)
    
        sc.stop()
      }
    }
    

      

  • 相关阅读:
    .NET/C#程序开发中如何更优美地实现失败任务重试的逻辑?
    ava.security.cert.CertPathValidatorException: Path does not chain with any of the trust anchors
    mysql安装到这里不能下去了,怎么解决
    win10安装jdk8点击下一步没反应,点击下一步闪退,win10安装jdk8失败
    运行打包的jar,jar中没有主清单属性
    idea如何打包项目(java)
    关于idea(eclipse同样适用,都是一样的步骤)无法导入Javafx包的问题及解决方案:
    Intellij idea 报错:Error : java 不支持发行版本5
    Class.getResource("xxx.css")得到值为null
    jdk11安装没有jre文件夹
  • 原文地址:https://www.cnblogs.com/joey-413/p/14091124.html
Copyright © 2011-2022 走看看