zoukankan      html  css  js  c++  java
  • spark教程(14)-共享变量

    spark 使用的架构是无共享的,数据分布在不同节点,每个节点有独立的 CPU、内存,不存在全局的内存使得变量能够共享,驱动程序和任务之间通过消息共享数据

    举例来说,如果一个 RDD 操作使用了驱动程序中的变量,spark 会将这个变量的副本和 task 一起发送给 executor 中的执行者,对该变量的更新只存在于 task 的内部,并不会回传给驱动程序;

    如果这个任务分为多个阶段,每个阶段开始时,驱动程序会把 变量 发送给 worker;

    在实际场景中,驱动程序在 task 间共享一个巨大(如 100M)的查找表,并且该 task 有多个(如 10个)阶段,spark 会在每个阶段开始时,给每个 task 发送一份数据,那就是 1000M 的传输和存储

    显然浪费资源且效率低下;

    除此之外,有时我们需要不同节点的多个任务更新一个全局变量,然而这个变量并不回传给驱动程序,又如何同步更新呢?

    共享变量

    spark 提供了共享变量的概率满足上述需求

    spark 提供了两种共享变量:广播变量 和 累加器

    上图描述了共享变量的工作原理

    共享变量的作用在于 减少网络传输,减少内存消耗;

    其实是一种 spark 编程的优化方法

    spark 三大数据结构

    RDD:分布式数据集

    广播变量:分布式只读共享变量

    累加器:分布式只写共享变量

    广播变量

    spark 只会给 worker 发送一次广播变量(序列化的),并且将它反序列化后存储在 executor 的内存中;

    如果任务分为多个阶段,且每个阶段使用相同的变量,那么广播变量就无需每个阶段都传输数据,spark 会将传输过来的数据序列化后存储在 executor 的内存中,在任务开始前反序列化广播变量即可;

     

    使用场景

    1. 需要共享一个很大的数据集

    2. 任务分为多个阶段,每个阶段使用相同的数据

    使用方法

    SparkContext.broadcast 方法用于创建一个广播变量;

    broadcast 输入一个普通变量,返回一个 Broadcast 实例;

    调用 Broadcast 实例的 value 属性可以获取变量值;

     

    实例分析

    import time
    from pyspark import SparkContext
    factor1 = ['sssssssssssssssssss']*100000       # 待广播的变量
    factor2 = range(100000)
    
    if __name__ == '__main__':
        time.clock()
        sc = SparkContext()
    
        listRdd = sc.parallelize(factor2)
    
        ### 正确利用广播变量          # 测试:1.40s
        brodacastvalue = sc.broadcast(factor1)
        out = listRdd.map(lambda s: brodacastvalue.value[s-1]).collect()
        
        ### 不正确利用广播变量          # 测试:1.59s
        brodacastvalue = sc.broadcast(factor1).value        
        out = listRdd.map(lambda s: brodacastvalue[s-1]).collect()  # 这里注意了,并没有调用 广播变量,而是调用了它的 value,相当于直接调用了原来的数据
    
        ### 不利用广播变量         # 测试:1.59s
        out = listRdd.map(lambda s: factor1[s-1]).collect()
    
        print out
        print(time.clock())

    可以看到广播变量优化了效率,特别是替代 join 操作 

    累加器

    累加器使得多个任务能够操作同一个全局变量, 并且能够回传给驱动程序;

    它常用于计数、求和和聚合操作;

    spark 支持数值型的累加,也支持自定义类型的累加;

    使用方法

    SparkContext.accumulator 创建一个累加器变量;

    它接受两个参数:累加器的初值,累加器的名字,在 spark UI 中可看到,第二个参数可选

    在 task 中只能使用累加器变量的 add 属性或者 += 来更新该变量;

    在 驱动程序 中才能调用累加器变量的 value 属性,在 task 中不可以;

    实例

    from pyspark import SparkContext
    
    sc = SparkContext(appName='leijiaqi')
    accum = sc.accumulator(10)      # 初值,名字
    # sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.add(x))     # 调用 add 属性更新变量
    
    def myadd(x):       
        global accum
        accum+=x
        return accum
    sc.parallelize([1, 2, 3, 4]).foreach(myadd)     # 使用 += 方法更新变量
    print(accum.value)  # 20  = 10+1+2+3+4
    
    # sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.value)        # 在 task 中调用 value 报错:Exception: Accumulator.value cannot be accessed inside tasks

    总结

    累加器和广播变量使用非常复杂,本文仅介绍基本用法,二者结合使用可以满足很多复杂场景

    参考资料:

    https://blog.csdn.net/chongxin1/article/details/78048134

    https://www.jianshu.com/p/687db128ff2f  python 版

  • 相关阅读:
    Codeforces #250 (Div. 2) B. The Child and Set
    linux下又一次定位svn url方法
    查看hive版本号
    好947 Mybatis 配置resultMap 带參数查询Map 注意selectOne数据库返回结果一条数据库 否则会报错
    csdn加入暂时会话功能
    第二十五天 慵懒的投射在JDBC上的暖阳 —Hibernate的使用(四)
    lzma 知识点滴
    golang 登录
    docker入门
    创建和管理表(10)
  • 原文地址:https://www.cnblogs.com/yanshw/p/11702828.html
Copyright © 2011-2022 走看看