zoukankan      html  css  js  c++  java
  • Spark共享变量和累加器的基本原理与用途

    累加器:分布式共享只写变量

      1. 把Executor端的信息聚合到Driver端

      2. 在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的新的副本

        每个task更新副本的值之后,传回Driver端进行merge(合并)

      3.原理类似于mapreduce,分布式改变,然后聚合这些改变

    自定义累加器:

    package com.hch.acc
    import org.apache.spark.util.AccumulatorV2
    import org.apache.spark.{SparkConf, SparkContext}
    import scala.collection.mutable
    /**
     * @author Joey413
     */
    object Spark03_Acc_WordCount {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Acc_WordCount")
        val sc: SparkContext = new SparkContext(sparkConf)
    
        val rdd = sc.makeRDD(List("judy", "joey", "rose"))
    
        // 累加器
        // 创建累加器对象
        val wcAcc = new MyAccumulator
        // 向spark进行注册
        sc.register(wcAcc, "wordCountAcc")
    
        rdd.foreach(
          word => {
            // 使用累加器
            wcAcc.add(word)
          }
        )
    
        println(wcAcc.value)
    
        sc.stop()
      }
    
      /**
       * 自定义数据累加器:WordCount
       * 1. 继承AccumulatorV2 定义泛型
       *      IN : 累加器输入的数据类型 String
       *      OUT : 累加器返回的数据类型 Map
       */
      class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
    
        private var wcMap = mutable.Map[String, Long]()
    
        // 判断累加器是否为空
        override def isZero: Boolean = {
          wcMap.isEmpty
        }
    
        // 拷贝累加器
        override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
          new MyAccumulator()
        }
    
        // 重置累加器
        override def reset(): Unit = {
          wcMap.clear()
        }
    
        // 获取累加器需要计算的值
        override def add(word: String): Unit = {
          val newCnt = wcMap.getOrElse(word, 0L) + 1
    
          wcMap.update(word, newCnt)
        }
    
        // Driver端合并多个累加器
        override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    
          val map1 = this.wcMap
          val map2 = other.value
    
          map2.foreach{
            case (word, count) => {
              // map1数据累加
              val newCount = map1.getOrElse(word, 0L) + count
              // 更新map1数据更新
              map1.update(word, newCount)
            }
          }
        }
    
        // 累加器结果
        override def value: mutable.Map[String, Long] = {
          wcMap
        }
      }
    }
    

      

    广播变量: 分布式共享只读变量

      1. 用来分发较大的对象,以供一个或多个Spark操作使用

      2. 在多个并行操作中使用同一个变量,但是Spark会为每个任务分别发送

      3. 闭包数据以task为单位发送,每个任务包含闭包数据(冗余数据过多消耗内存,降低性能)

      4. Executor 即相当于一个JVM进程,启动时会自动分配内存,将任务中的闭包数据放置在Executor的内存中达到共享目的

      5. Spark中的广播变量可以将闭包的数保存到Executor的内存中(只读 = 不可修改)

    广播变量案例:

    package com.hch.acc
    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.{SparkConf, SparkContext}
    import scala.collection.mutable
    /**
     * @author Joey413
     */
    object Spark05_Bc {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("BC2")
        val sc: SparkContext = new SparkContext(sparkConf)
    
        val rdd1 = sc.makeRDD(List(
          ("a", 1), ("b", 2), ("c", 3)
        ))
    
        val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))
    
        // 封装广播变量
        val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
    
        rdd1.map {
          case (w, c) => {
            // 从广播变量中取出值 访问广播变量
            val i = bc.value.getOrElse(w, 0)
            (w, (c, i))
          }
        }.collect().foreach(println)
    
        sc.stop()
      }
    }
    

      

  • 相关阅读:
    Visual Studio 2010使用Visual Assist X的方法
    SQL Server 2000 评估版 升级到 SQL Server 2000 零售版
    双网卡多网络单主机同时访问
    开发即过程!立此纪念一个IT新名词的诞生
    delphi dxBarManager1 目录遍历 转为RzCheckTree2树
    5320 软件集合
    delphi tree 从一个表复制到另一个表
    DELPHI 排课系统课表
    长沙金思维 出现在GOOGLE的 金思维 相关搜索里啦!!
    如何在DBGrid的每一行前加一个单选框?
  • 原文地址:https://www.cnblogs.com/joey-413/p/14091124.html
Copyright © 2011-2022 走看看