zoukankan      html  css  js  c++  java
  • Spark 广播变量BroadCast

     

    一、 广播变量

     

    广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。 Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。

    二、为什么使用广播变量

    假如我们要共享的变量map,1M
    在默认的,task执行的算子中,使用了外部的变量,每个task都会获取一份变量的副本
    在什么情况下,会出现性能上的恶劣的影响呢?
    1000个task。大量task的确都在并行运行。这些task里面都用到了占用1M内存的map,那么首先,map会拷贝1000份副本,通过网络传输到各个task中去,给task使用。总计有1G的数据,会通过网络传输。网络传输的开销,不容乐观啊!!!网络传输,也许就会消耗掉你的spark作业运行的总时间的一小部分。
    map副本,传输到了各个task上之后,是要占用内存的。1个map的确不大,1M;1000个map分布在你的集群中,一下子就耗费掉1G的内存。对性能会有什么影响呢?不必要的内存的消耗和占用,就导致了,你在进行RDD持久化到内存,也许就没法完全在内存中放下;就只能写入磁盘,最后导致后续的操作在磁盘IO上消耗性能;
    你的task在创建对象的时候,也许会发现堆内存放不下所有对象,也许就会导致频繁的垃圾回收器的回收,GC。GC的时候,一定是会导致工作线程停止,也就是导致Spark暂停工作那么一点时间。频繁GC的话,对Spark作业的运行的速度会有相当可观的影响。
     
    如果说,task使用大变量(1m~100m),明知道会导致性能出现恶劣的影响。那么我们怎么来解决呢?
    广播,Broadcast,将大变量广播出去。而不是直接使用。
     
    广播变量的好处,不是每个task一份变量副本,而是变成每个节点的executor才一份副本。这样的话,就可以让变量产生的副本大大减少。
    广播变量,初始的时候,就在Drvier上有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的
    BlockManager中,尝试获取变量副本;如果本地没有,BlockManager,也许会从远程的Driver上面去获取变量副本;也有可能从距离比较近的其他
    节点的Executor的BlockManager上去获取,并保存在本地的BlockManager中;BlockManager负责管理某个Executor对应的内存和磁盘上的数据,
    此后这个executor上的task,都会直接使用本地的BlockManager中的副本。

    优点:
        不是每个task一份副本,而是变成每个节点Executor上一个副本。

    1.举例来说:

    50个Executor 1000个task。 
    一个map10M 

    默认情况下,1000个task 1000个副本

    1000 * 10M = 10 000M = 10 G

    10G的数据,网络传输,在集群中,耗费10G的内存资源

    如果使用 广播变量,

    50个Executor ,50个副本,10M*50 = 500M的数据

    网络传输,而且不一定是从Drver传输到各个节点,还可能是从就近的节点 
    的Executor的BlockManager上获取变量副本,网络传输速度大大增加。

    之前 10000M 现在 500M

    20倍网络传输性能的消耗。20倍内存消耗的减少。

    三、如何使用

    开始使用broadcast变量,使用完后,程序结束记得释放

      sc = SparkContext(appName=AppNames.JOURNEY_AGGREGATOR_APP_NAME)
        broadCastForLog = None
        try:
            broadCastForLog = ELogForDistributedApp.setLogConf2BroadCast(sc)
            elogging.initLogFromDict(broadCastForLog.value)
        except StandardError:
            pass
    
    .......
        #执行完程序逻辑,记得释放该变量
    
        if broadCastForLog is not None:
            broadCastForLog.unpersist(False)

    #获取要被共享的大变量,这里是log配置

     

    class ELogForDistributedApp(object):
    
        LOGHDFSPATH = "/user/hdfs/test/logging/logging_hdfs.json"
        @staticmethod
        def setLogConf2BroadCast(sc):
            logFilePath = ELogForDistributedApp.LOGHDFSPATH
            if sc is not None:
                configDict = HDFSOperation.getConfigFromHDFS(logFilePath,sc)
                broadCast = sc.broadcast(configDict)
                #globals()['broadCast'] = broadCast
                #elogging.initLogFromDict(broadCast.value)
                return broadCast
                #print broadCast.value
            else:
                return None

     

        def initLogFromDict(self):
            elogging.initLogFromDict(self.eloggingConfig)

    从hdfs中找到相应配置文件

    class HDFSOperation(object):
    
        @staticmethod
        def getConfigFromHDFS(hdfsPath,sc):
            if sc is not None:
                filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
                hadoop_configuration = sc._jsc.hadoopConfiguration()
                fs =filesystem_class.get(hadoop_configuration)
                path_class = sc._gateway.jvm.org.apache.hadoop.fs.Path
                pathObj = path_class(hdfsPath)
                try:
                    hdfsInStream = fs.open(pathObj)
                    bufferedReader_class = sc._gateway.jvm.java.io.BufferedReader
                    inputStreamReader_class = sc._gateway.jvm.java.io.InputStreamReader
                    bufferedReader = bufferedReader_class(inputStreamReader_class(hdfsInStream))
                except IOError,msg:
                    print str(msg)
                    return None
    
            else:
                return None
            configStr = ''
            while True:
                tmpStr = bufferedReader.readLine()
                if tmpStr == None:
                    break
                configStr += tmpStr
            try:
                confDict = json.loads(configStr)
            except IOError,msg:
                print str(msg)
                return None
            return confDict

    参考文档

    1. Spark Programming Guide1.6.3
    2. How can I update a broadcast variable in spark streaming?
    3. Spark踩坑记——共享变量

  • 相关阅读:
    经典游戏--24点--c++代码实现和总体思路(简单暴力向)
    这么多TiDB负载均衡方案总有一款适合你
    思杰( Citrix)证书的正确处置方式
    从本质彻底精通Git——4个模型1个周期1个史观1个工作流
    HttpClient客户端网络编程——高可用、高并发
    Netty之数据编码
    维吉尼亚密码加解密原理及其实现
    无网环境安装docker之--rpm
    centos7无网环境安装docker
    CentOS8.3最小化安装后安装GNome桌面
  • 原文地址:https://www.cnblogs.com/ITtangtang/p/7967902.html
Copyright © 2011-2022 走看看