zoukankan      html  css  js  c++  java
  • 共享变量

    默认情况下,如果在一个算子函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只能操作自己的那份变量副本。如果多个task想要共享某个变量,那么这种方式是做不到的。

    Spark为此提供了两种共享变量,一种是Broadcast Variable(广播变量),另一种是Accumulator(累加变量)。Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。

    共享广播变量(只读)

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("shared")
        val sc = new SparkContext(conf)
    
        val factor = 3
    
        // 创建广播变量 factorBroadcast
        val factorBroadcast = sc.broadcast(factor)
    
        // Array(1,2,3,4,5)
        val data = Array(1 to 5:_*)
    
        val rdd = sc.parallelize(data,2)
    
        // factorBroadcast.value 获取广播变量的值
        rdd.map(num => num * factorBroadcast.value ).foreach(println)
        
        sc.stop()
    }
    
    3
    6
    
    9
    12
    15
    

    共享累加变量(只写)

    def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setMaster("local").setAppName("shared")
        val sc = new SparkContext(conf)
    
        // 创建共享累加变量
        var accumulator = sc.accumulator(0)
        // Array(1,2,3,4,5)
        val data = Array(1 to 5: _*)
    
        val rdd = sc.parallelize(data, 2)
    
        // accumulator.add(num) 累加rdd的数据
        rdd.foreach(num => accumulator += num)
        
        // 报错! 算子在Excutor端执行,不可读累加广播变量
        //    rdd.foreach(num => println(accumulator.value))
    
        // 在driver端可以获取共享累加变量的值
        println("共享累加变量的值为:" + accumulator.value)
    
        sc.stop()
    }
    
    共享累加变量的值为:15
    
  • 相关阅读:
    必须了解的经典排序算法整理
    浅谈Code Review
    NOIP2018提高组省一冲奖班模测训练(六)
    NOIP2018提高组省一冲奖班模测训练(五)
    NOIP2018提高组金牌训练营——动态规划专题
    poj 3074
    搜索中的剪枝
    bitset骚操作
    NOIP 2017 宝藏
    prim求最小生成树
  • 原文地址:https://www.cnblogs.com/studyNotesSL/p/11432902.html
Copyright © 2011-2022 走看看