zoukankan      html  css  js  c++  java
  • Spark分布式编程之全局变量专题【共享变量】

     转载自:http://www.aboutyun.com/thread-19652-1-1.html

    问题导读

    1.spark共享变量的作用是什么?
    2.什么情况下使用共享变量?
    3.如何在程序中使用共享变量?
    4.广播变量源码包含哪些内容?








    spark编程中,我们经常会遇到使用全局变量,来累加或则使用全局变量。然而对于分布式编程这个却与传统编程有着很大的区别。不可能在程序中声明一个全局变量,在分布式编程中就可以直接使用。因为代码会分发到多台机器,导致我们认为的全局变量失效。那么spark,spark Streaming该如何实现全局变量。

    一般情况下,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。这些变量被复制到每台机器上,并且这些变量在远程机器上 的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变量(broadcast variable)和累加器(accumulator)+


    1.概念

    1.1 广播变量:

    广播可以将变量发送到闭包中,被闭包使用。但是,广播还有一个作用是同步较大数据。比如你有一个IP库,可能有几G,在map操作中,依赖这个ip库。那么,可以通过广播将这个ip库传到闭包中,被并行的任务应用。广播通过两个方面提高数据共享效率:
    1,集群中每个节点(物理机器)只有一个副本,默认的闭包是每个任务一个副本;
    2,广播传输是通过BT下载模式实现的,也就是P2P下载,在集群多的情况下,可以极大的提高数据传输速率。广播变量修改后,不会反馈到其他节点。



    1.2 累加器:

    累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和总和。Spark原生地只支持数字类型的累加器,编程者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程。(对于Python还不支持) 
    累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。



    2.如何使用全局变量


    2.1 Java版本:

     

    
    
    1. package com.Streaming;
    2.  
    3. import org.apache.spark.Accumulator;
    4. import org.apache.spark.SparkConf;
    5. import org.apache.spark.api.java.JavaPairRDD;
    6. import org.apache.spark.api.java.function.Function;
    7. import org.apache.spark.broadcast.Broadcast;
    8. import org.apache.spark.streaming.Durations;
    9. import org.apache.spark.streaming.Time;
    10. import org.apache.spark.streaming.api.java.JavaStreamingContext;
    11.  
    12.  
    13. import org.apache.spark.api.java.function.FlatMapFunction;
    14. import org.apache.spark.api.java.function.Function2;
    15. import org.apache.spark.api.java.function.PairFunction;
    16. import org.apache.spark.streaming.api.java.JavaDStream;
    17. import org.apache.spark.streaming.api.java.JavaPairDStream;
    18. import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    19. import scala.Tuple2;
    20.  
    21. import java.util.*;
    22.  
    23. /**
    24.  * 利用广播进行黑名单过滤!
    25.  *
    26.  * 无论是计数器还是广播!都不是想象的那么简单!
    27.  * 联合使用非常强大!!!绝对是高端应用!
    28.  *
    29.  * 如果 联合使用扩展的话,该怎么做!!!
    30.  *
    31.  * ?
    32.  */
    33. public class BroadcastAccumulator {
    34.  
    35.     /**
    36.      * 肯定要创建一个广播List
    37.      *
    38.      * 在上下文中实例化!
    39.      */
    40.     private static volatile Broadcast<List<String>> broadcastList = null;
    41.  
    42.     /**
    43.      * 计数器!
    44.      * 在上下文中实例化!
    45.      */
    46.     private static volatile Accumulator<Integer> accumulator = null;
    47.  
    48.     public static void main(String[] args) {
    49.  
    50.         SparkConf conf = new SparkConf().setMaster("local[2]").
    51.                 setAppName("WordCountOnlieBroadcast");
    52.  
    53.         JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
    54.  
    55.  
    56.         /**
    57.          * 没有action的话,广播并不会发出去!
    58.          *
    59.          * 使用broadcast广播黑名单到每个Executor中!
    60.          */
    61.         broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));
    62.  
    63.         /**
    64.          * 全局计数器!用于统计在线过滤了多少个黑名单!
    65.          */
    66.         accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter");
    67.  
    68.  
    69.         JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);
    70.  
    71.  
    72.         /**
    73.          * 这里省去flatmap因为名单是一个个的!
    74.          */
    75.         JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
    76.             @Override
    77.             public Tuple2<String, Integer> call(String word) {
    78.                 return new Tuple2<String, Integer>(word, 1);
    79.             }
    80.         });
    81.  
    82.         JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
    83.             @Override
    84.             public Integer call(Integer v1, Integer v2) {
    85.                 return v1 + v2;
    86.             }
    87.         });
    88.  
    89.         /**
    90.          * Funtion里面 前几个参数是 入参。
    91.          * 后面的出参。
    92.          * 体现在call方法里面!
    93.          *
    94.          * 这里直接基于RDD进行操作了!
    95.          */
    96.         wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
    97.             @Override
    98.             public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {
    99.                 rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
    100.                     @Override
    101.                     public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {
    102.                         if (broadcastList.value().contains(wordPair._1)) {
    103.  
    104.                             /**
    105.                              * accumulator不应该仅仅用来计数。
    106.                              * 可以同时写进数据库或者redis中!
    107.                              */
    108.                             accumulator.add(wordPair._2);
    109.                             return false;
    110.                         }else {
    111.                             return true;
    112.                         }
    113.                     };
    114.                     /**
    115.                      * 这里真的希望 广播和计数器执行的话。要进行一个action操作!
    116.                      */
    117.                 }).collect();
    118.  
    119.                 System.out.println("广播器里面的值"+broadcastList.value());
    120.                 System.out.println("计时器里面的值"+accumulator.value());
    121.                 return null;
    122.             }
    123.         });
    124.  
    125.  
    126.         jsc.start();
    127.         jsc.awaitTermination();
    128.         jsc.close();
    129.  
    130.     }
    131.  
    132.     }
     

    2.2 Scala版本

     

    
    
    1. package com.Streaming
    2.  
    3. import java.util
    4.  
    5. import org.apache.spark.streaming.{Duration, StreamingContext}
    6. import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
    7. import org.apache.spark.broadcast.Broadcast
    8.  
    9. /**
    10.   * Created by lxh on 2016/6/30.
    11.   */
    12. object BroadcastAccumulatorStreaming {
    13.  
    14.   /**
    15.     * 声明一个广播和累加器!
    16.     */
    17.   private var broadcastList:Broadcast[List[String]]  = _
    18.   private var accumulator:Accumulator[Int] = _
    19.  
    20.   def main(args: Array[String]) {
    21.  
    22.     val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
    23.     val sc = new SparkContext(sparkConf)
    24.  
    25.     /**
    26.       * duration是ms
    27.       */
    28.     val ssc = new StreamingContext(sc,Duration(2000))
    29.    // broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
    30.     broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
    31.     accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")
    32.  
    33.     /**
    34.       * 获取数据!
    35.       */
    36.     val lines = ssc.socketTextStream("localhost",9999)
    37.  
    38.     /**
    39.       * 拿到数据后 怎么处理!
    40.       *
    41.       * 1.flatmap把行分割成词。
    42.       * 2.map把词变成tuple(word,1)
    43.       * 3.reducebykey累加value
    44.       * (4.sortBykey排名)
    45.       * 4.进行过滤。 value是否在累加器中。
    46.       * 5.打印显示。
    47.       */
    48.     val words = lines.flatMap(line => line.split(" "))
    49.  
    50.     val wordpair = words.map(word => (word,1))
    51.  
    52.     wordpair.filter(record => {broadcastList.value.contains(record._1)})
    53.  
    54.  
    55.     val pair = wordpair.reduceByKey(_+_)
    56.  
    57.     /**
    58.       *这步为什么要先foreachRDD?
    59.       *
    60.       * 因为这个pair 是PairDStream<String, Integer>
    61.       *
    62.       *   进行foreachRDD是为了?
    63.       *
    64.       */
    65. /*    pair.foreachRDD(rdd => {
    66.       rdd.filter(record => {
    67.  
    68.         if (broadcastList.value.contains(record._1)) {
    69.           accumulator.add(1)
    70.           return true
    71.         } else {
    72.           return false
    73.         }
    74.  
    75.       })
    76.  
    77.     })*/
    78.  
    79.     val filtedpair = pair.filter(record => {
    80.         if (broadcastList.value.contains(record._1)) {
    81.           accumulator.add(record._2)
    82.           true
    83.         } else {
    84.           false
    85.         }
    86.  
    87.      }).print
    88.  
    89.     println("累加器的值"+accumulator.value)
    90.  
    91.    // pair.filter(record => {broadcastList.value.contains(record._1)})
    92.  
    93.    /* val keypair = pair.map(pair => (pair._2,pair._1))*/
    94.  
    95.     /**
    96.       * 如果DStream自己没有某个算子操作。就通过转化transform!
    97.       */
    98.    /* keypair.transform(rdd => {
    99.       rdd.sortByKey(false)//TODO
    100.     })*/
    101.     pair.print()
    102.     ssc.start()
    103.     ssc.awaitTermination()
    104.  
    105.   }
    106.  
    107. }

     

     
     


    补充:除了上面提到的两种外,还有一个闭包的概念,这里补充下
    闭包 与广播变量对比
    有两种方式将数据从driver节点发送到worker节点:通过 闭包 和通过 广播变量 。闭包是随着task的组装和分发自动进行的,而广播变量则是需要程序猿手动操作的,具体地可以通过如下方式操作广播变量(假设 sc 为 SparkContext 类型的对象, bc 为 Broadcast 类型的对象):

    可通过 sc.broadcast(xxx) 创建广播变量。
    可在各计算节点中(闭包代码中)通过 bc.value 来引用广播的数据。
    bc.unpersist() 可将各executor中缓存的广播变量删除,后续再使用时数据将被重新发送。
    bc.destroy() 可将广播变量的数据和元数据一同销毁,销毁之后就不能再使用了。
    任务闭包包含了任务所需要的代码和数据,如果一个executor数量小于RDD partition的数量,那么每个executor就会得到多个同样的任务闭包,这通常是低效的。而广播变量则只会将数据发送到每个executor一次,并且可以在多个计算操作中共享该广播变量,而且广播变量使用了类似于p2p形式的非常高效的广播算法,大大提高了效率。另外,广播变量由spark存储管理模块进行管理,并以MEMORY_AND_DISK级别进行持久化存储。

    什么时候用闭包自动分发数据?情况有几种:

    数据比较小的时候。
    数据已在driver程序中可用。典型用例是常量或者配置参数。
    什么时候用广播变量分发数据?情况有几种:

    数据比较大的时候(实际上,spark支持非常大的广播变量,甚至广播变量中的元素数超过java/scala中Array的最大长度限制(2G,约21.5亿)都是可以的)。
    数据是某种分布式计算结果。典型用例是训练模型等中间计算结果。
    当数据或者变量很小的时候,我们可以在Spark程序中直接使用它们,而无需使用广播变量。

    对于大的广播变量,序列化优化可以大大提高网络传输效率,参见本文序列化优化部分。



    3.广播变量(Broadcast)源码分析


    本文基于Spark 1.0源码分析,主要探讨广播变量的初始化、创建、读取以及清除。


    类关系
    BroadcastManager类中包含一个BroadcastFactory对象的引用。大部分操作通过调用BroadcastFactory中的方法来实现。

    BroadcastFactory是一个Trait,有两个直接子类TorrentBroadcastFactory、HttpBroadcastFactory。这两个子类实现了对HttpBroadcast、TorrentBroadcast的封装,而后面两个又同时集成了Broadcast抽象类。


    BroadcastManager的初始化
    SparkContext初始化时会创建SparkEnv对象env,这个过程中会调用BroadcastManager的构造方法返回一个对象作为env的成员变量存在:

    
    
    1. val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
    构造BroadcastManager对象时会调用initialize方法,主要根据配置初始化broadcastFactory成员变量,并调用其initialize方法。
    
    
    1. val broadcastFactoryClass =
    2.          conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")
    3.  
    4.        broadcastFactory =
    5.          Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
    6.  
    7.        // Initialize appropriate BroadcastFactory and BroadcastObject
    8.        broadcastFactory.initialize(isDriver, conf, securityManager)
     
     

    两个工厂类的initialize方法都是对其相应实体类的initialize方法的调用,下面分开两个类来看。


    HttpBroadcast的initialize方法

    
    
    1. def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
    2.   synchronized {
    3.     if (!initialized) {
    4.       bufferSize = conf.getInt("spark.buffer.size", 65536)
    5.       compress = conf.getBoolean("spark.broadcast.compress", true)
    6.       securityManager = securityMgr
    7.       if (isDriver) {
    8.         createServer(conf)
    9.         conf.set("spark.httpBroadcast.uri",  serverUri)
    10.       }
    11.       serverUri = conf.get("spark.httpBroadcast.uri")
    12.       cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf)
    13.       compressionCodec = CompressionCodec.createCodec(conf)
    14.       initialized = true
    15.     }
    16.   }
    17. }
     
     

    除了一些变量的初始化外,主要做两件事情,一是createServer(只有在Driver端会做),其次是创建一个MetadataCleaner对象。

    createServer

     

    
    
    1. private def createServer(conf: SparkConf) {
    2.   broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
    3.   server = new HttpServer(broadcastDir, securityManager)
    4.   server.start()
    5.   serverUri = server.uri
    6.   logInfo("Broadcast server started at " + serverUri)
    7. }

     

     
     

    首先创建一个存放广播变量的目录,默认是

    
    
    1. conf.get("spark.local.dir",  System.getProperty("java.io.tmpdir")).split(',')(0) 

    然后初始化一个HttpServer对象并启动(封装了jetty),启动过程中包括加载资源文件,起端口和线程用来监控请求等。这部分的细节在org.apache.spark.HttpServer类中,此处不做展开。

    创建MetadataCleaner对象
    一个MetadataCleaner对象包装了一个定时计划Timer,每隔一段时间执行一个回调函数,此处传入的回调函数为cleanup:

    
    
    1. private def cleanup(cleanupTime: Long) {
    2.   val iterator = files.internalMap.entrySet().iterator()
    3.   while(iterator.hasNext) {
    4.     val entry = iterator.next()
    5.     val (file, time) = (entry.getKey, entry.getValue)
    6.     if (time < cleanupTime) {
    7.       iterator.remove()
    8.       deleteBroadcastFile(file)
    9.     }
    10.   }
    11. }
     
     

    即清楚存在吵过一定时长的broadcast文件。在时长未设定(默认情况)时,不清除:

    
    
    1. if (delaySeconds > 0) {
    2.    logDebug(
    3.      "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
    4.      "and period of " + periodSeconds + " secs")
    5.    timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
    6.  }
     
     


    TorrentBroadcast的initialize方法


    
    
    1. def initialize(_isDriver: Boolean, conf: SparkConf) {
    2.   TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
    3.   synchronized {
    4.     if (!initialized) {
    5.       initialized = true
    6.     }
    7.   }
    8. }
     
     

    Torrent在此处没做什么,这也可以看出和Http的区别,Torrent的处理方式就是p2p,去中心化。而Http是中心化服务,需要启动服务来接受请求。

    创建broadcast变量
    调用SparkContext中的 def broadcast[T: ClassTag](value: T): Broadcast[T]方法来初始化一个广播变量,实现如下:

    
    
    1. def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    2.     val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    3.     cleaner.foreach(_.registerBroadcastForCleanup(bc))
    4.     bc
    5.   }


    即调用broadcastManager的newBroadcast方法:
    
    
    1. def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
    2.   broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
    3. }

    再调用工厂类的newBroadcast方法,此处返回的是一个Broadcast对象。

    HttpBroadcastFactory的newBroadcast


    
    
    1. def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
    2.   new HttpBroadcast[T](value_, isLocal, id)

    即创建一个新的HttpBroadcast对象并返回。

    构造对象时主要做两件事情:

    
    
    1. HttpBroadcast.synchronized {
    2.    SparkEnv.get.blockManager.putSingle(
    3.      blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
    4.  }
    5.  
    6.  if (!isLocal) {
    7.    HttpBroadcast.write(id, value_)
    8.  }
     
     


    1.将变量id和值放入blockManager,但并不通知master

    2.调用伴生对象的write方法

    
    
    1. def write(id: Long, value: Any) {
    2.     val file = getFile(id)
    3.     val out: OutputStream = {
    4.       if (compress) {
    5.         compressionCodec.compressedOutputStream(new FileOutputStream(file))
    6.       } else {
    7.         new BufferedOutputStream(new FileOutputStream(file), bufferSize)
    8.       }
    9.     }
    10.     val ser = SparkEnv.get.serializer.newInstance()
    11.     val serOut = ser.serializeStream(out)
    12.     serOut.writeObject(value)
    13.     serOut.close()
    14.     files += file
    15.   }
     
     


    write方法将对象值按照指定的压缩、序列化写入指定的文件。这个文件所在的目录即是HttpServer的资源目录,文件名和id的对应关系为:

    
    
    1. case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
    2.   def name = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
    3. }


    TorrentBroadcastFactory的newBroadcast方法


    
    
    1. def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
    2.   new TorrentBroadcast[T](value_, isLocal, id)

    同样是创建一个TorrentBroadcast对象,并返回。
    
    
    1. TorrentBroadcast.synchronized {
    2.   SparkEnv.get.blockManager.putSingle(
    3.     broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
    4. }
    5.  
    6. if (!isLocal) {
    7.   sendBroadcast()
    8. }
     
     

    做两件事情,第一步和Http一样,第二步:

    
    
    1. def sendBroadcast() {
    2.   val tInfo = TorrentBroadcast.blockifyObject(value_)
    3.   totalBlocks = tInfo.totalBlocks
    4.   totalBytes = tInfo.totalBytes
    5.   hasBlocks = tInfo.totalBlocks
    6.  
    7.   // Store meta-info
    8.   val metaId = BroadcastBlockId(id, "meta")
    9.   val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
    10.   TorrentBroadcast.synchronized {
    11.     SparkEnv.get.blockManager.putSingle(
    12.       metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
    13.   }
    14.  
    15.   // Store individual pieces
    16.   for (<- 0 until totalBlocks) {
    17.     val pieceId = BroadcastBlockId(id, "piece" + i)
    18.     TorrentBroadcast.synchronized {
    19.       SparkEnv.get.blockManager.putSingle(
    20.         pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
    21.     }
    22.   }
    23. }
     
     


    可以看出,先将元数据信息缓存到blockManager,再将块信息缓存过去。开头可以看到有一个分块动作,是调用伴生对象的blockifyObject方法:

    
    
    1. def blockifyObject[T](obj: T): TorrentInfo


    此方法将对象obj分块(默认块大小为4M),返回一个TorrentInfo对象,第一个参数为一个TorrentBlock对象(包含blockID和block字节数组)、块数量以及obj的字节流总长度。

    元数据信息中的blockId为广播变量id+后缀,value为总块数和总字节数。

    数据信息是分块缓存,每块的id为广播变量id加后缀及块变好,数据位一个TorrentBlock对象

    读取广播变量的值
    通过调用bc.value来取得广播变量的值,其主要实现在反序列化方法readObject中

    HttpBroadcast的反序列化


    
    
    1. HttpBroadcast.synchronized {
    2.      SparkEnv.get.blockManager.getSingle(blockId) match {
    3.        case Some(x) => value_ = x.asInstanceOf[T]
    4.        case None => {
    5.          logInfo("Started reading broadcast variable " + id)
    6.          val start = System.nanoTime
    7.          value_ = HttpBroadcast.read[T](id)
    8.          /*
    9.           * We cache broadcast data in the BlockManager so that subsequent tasks using it
    10.           * do not need to re-fetch. This data is only used locally and no other node
    11.           * needs to fetch this block, so we don't notify the master.
    12.           */
    13.          SparkEnv.get.blockManager.putSingle(
    14.            blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
    15.          val time = (System.nanoTime - start) / 1e9
    16.          logInfo("Reading broadcast variable " + id + " took " + time + " s")
    17.        }
    18.      }
    19.    }
     
     



    首先查看blockManager中是否已有,如有则直接取值,否则调用伴生对象的read方法进行读取:

    
    
    1. def read[T: ClassTag](id: Long): T = {
    2.     logDebug("broadcast read server: " +  serverUri + " id: broadcast-" + id)
    3.     val url = serverUri + "/" + BroadcastBlockId(id).name
    4.  
    5.     var uc: URLConnection = null
    6.     if (securityManager.isAuthenticationEnabled()) {
    7.       logDebug("broadcast security enabled")
    8.       val newuri = Utils.constructURIForAuthentication(new URI(url), securityManager)
    9.       uc = newuri.toURL.openConnection()
    10.       uc.setAllowUserInteraction(false)
    11.     } else {
    12.       logDebug("broadcast not using security")
    13.       uc = new URL(url).openConnection()
    14.     }
    15.  
    16.     val in = {
    17.       uc.setReadTimeout(httpReadTimeout)
    18.       val inputStream = uc.getInputStream
    19.       if (compress) {
    20.         compressionCodec.compressedInputStream(inputStream)
    21.       } else {
    22.         new BufferedInputStream(inputStream, bufferSize)
    23.       }
    24.     }
    25.     val ser = SparkEnv.get.serializer.newInstance()
    26.     val serIn = ser.deserializeStream(in)
    27.     val obj = serIn.readObject[T]()
    28.     serIn.close()
    29.     obj
    30.   }
     
     

    使用serverUri和block id对应的文件名直接开启一个HttpConnection将中心服务器上相应的数据取过来,使用配置的压缩和序列化机制进行解压和反序列化。

    这里可以看到,所有需要用到广播变量值的executor都需要去driver上pull广播变量的内容。

    取到值后,缓存到blockManager中,以便下次使用。

    TorrentBroadcast的反序列化

     

    
    
    1. private def readObject(in: ObjectInputStream) {
    2.     in.defaultReadObject()
    3.     TorrentBroadcast.synchronized {
    4.       SparkEnv.get.blockManager.getSingle(broadcastId) match {
    5.         case Some(x) =>
    6.           value_ = x.asInstanceOf[T]
    7.  
    8.         case None =>
    9.           val start = System.nanoTime
    10.           logInfo("Started reading broadcast variable " + id)
    11.  
    12.           // Initialize @transient variables that will receive garbage values from the master.
    13.           resetWorkerVariables()
    14.  
    15.           if (receiveBroadcast()) {
    16.             value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
    17.  
    18.             /* Store the merged copy in cache so that the next worker doesn't need to rebuild it.
    19.              * This creates a trade-off between memory usage and latency. Storing copy doubles
    20.              * the memory footprint; not storing doubles deserialization cost. Also,
    21.              * this does not need to be reported to BlockManagerMaster since other executors
    22.              * does not need to access this block (they only need to fetch the chunks,
    23.              * which are reported).
    24.              */
    25.             SparkEnv.get.blockManager.putSingle(
    26.               broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
    27.  
    28.             // Remove arrayOfBlocks from memory once value_ is on local cache
    29.             resetWorkerVariables()
    30.           } else {
    31.             logError("Reading broadcast variable " + id + " failed")
    32.           }
    33.  
    34.           val time = (System.nanoTime - start) / 1e9
    35.           logInfo("Reading broadcast variable " + id + " took " + time + " s")
    36.       }
    37.     }
    38.   }

     



    和Http一样,都是先查看blockManager中是否已经缓存,若没有,则调用receiveBroadcast方法:
    
    
    1. def receiveBroadcast(): Boolean = {
    2.     // Receive meta-info about the size of broadcast data,
    3.     // the number of chunks it is divided into, etc.
    4.     val metaId = BroadcastBlockId(id, "meta")
    5.     var attemptId = 10
    6.     while (attemptId > 0 && totalBlocks == -1) {
    7.       TorrentBroadcast.synchronized {
    8.         SparkEnv.get.blockManager.getSingle(metaId) match {
    9.           case Some(x) =>
    10.             val tInfo = x.asInstanceOf[TorrentInfo]
    11.             totalBlocks = tInfo.totalBlocks
    12.             totalBytes = tInfo.totalBytes
    13.             arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
    14.             hasBlocks = 0
    15.  
    16.           case None =>
    17.             Thread.sleep(500)
    18.         }
    19.       }
    20.       attemptId -= 1
    21.     }
    22.     if (totalBlocks == -1) {
    23.       return false
    24.     }
    25.  
    26.     /*
    27.      * Fetch actual chunks of data. Note that all these chunks are stored in
    28.      * the BlockManager and reported to the master, so that other executors
    29.      * can find out and pull the chunks from this executor.
    30.      */
    31.     val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(+ 1).toList)
    32.     for (pid <- recvOrder) {
    33.       val pieceId = BroadcastBlockId(id, "piece" + pid)
    34.       TorrentBroadcast.synchronized {
    35.         SparkEnv.get.blockManager.getSingle(pieceId) match {
    36.           case Some(x) =>
    37.             arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
    38.             hasBlocks += 1
    39.             SparkEnv.get.blockManager.putSingle(
    40.               pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
    41.  
    42.           case None =>
    43.             throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
    44.         }
    45.       }
    46.     }
    47.  
    48.     hasBlocks == totalBlocks
    49.   }
     
     

    和写数据一样,同样是分成两个部分,首先取元数据信息,再根据元数据信息读取实际的block信息。注意这里都是从blockManager中读取的,这里贴出blockManager.getSingle的分析。

    调用栈中最后到BlockManager.doGetRemote方法,中间有一条语句:

    
    
    1. val locations = Random.shuffle(master.getLocations(blockId))
     
     

    即将存有这个block的节点信息随机打乱,然后使用:

    
    
    1. val data = BlockManagerWorker.syncGetBlock(
    2.        GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))

    来获取。

    从这里可以看出,Torrent方法首先将广播变量数据分块,并存到BlockManager中;每个节点需要读取广播变量时,是分块读取,对每一块都读取其位置信息,然后随机选一个存有此块数据的节点进行get;每个节点读取后会将包含的快信息报告给BlockManagerMaster,这样本地节点也成为了这个广播网络中的一个peer。

    与Http方式形成鲜明对比,这是一个去中心化的网络,只需要保持一个tracker即可,这就是p2p的思想。



    广播变量的清除


    广播变量被创建时,紧接着有这样一句代码:
    
    
    1. cleaner.foreach(_.registerBroadcastForCleanup(bc)) 


    cleaner是一个ContextCleaner对象,会将刚刚创建的广播变量注册到其中,调用栈为:
    
    
    1. def registerBroadcastForCleanup[T](broadcast: Broadcast[T]) {
    2.   registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
    3. }
    
    
    1. private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) {
    2.   referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)
     

    等出现广播变量被弱引用时(关于弱引用,可以参考:http://blog.csdn.net/lyfi01/article/details/6415726),则会执行

    
    
    1. cleaner.foreach(_.start())

    start方法中会调用keepCleaning方法,会遍历注册的清理任务(包括RDD、shuffle和broadcast),依次进行清理:
    
    
    1. private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
    2.     while (!stopped) {
    3.       try {
    4.         val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
    5.           .map(_.asInstanceOf[CleanupTaskWeakReference])
    6.         reference.map(_.task).foreach { task =>
    7.           logDebug("Got cleaning task " + task)
    8.           referenceBuffer -= reference.get
    9.           task match {
    10.             case CleanRDD(rddId) =>
    11.               doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
    12.             case CleanShuffle(shuffleId) =>
    13.               doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
    14.             case CleanBroadcast(broadcastId) =>
    15.               doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
    16.           }
    17.         }
    18.       } catch {
    19.         case e: Exception => logError("Error in cleaning thread", e)
    20.       }
    21.     }
    22.   }
     
     


    doCleanupBroadcast调用以下语句:

    
    
    1. broadcastManager.unbroadcast(broadcastId, true, blocking)


    然后是:
    
    
    1. def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
    2.   broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
    3. }


    每个工厂类调用其对应实体类的伴生对象的unbroadcast方法。



    HttpBroadcast中的变量清除


    
    
    1. def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
    2.    SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
    3.    if (removeFromDriver) {
    4.      val file = getFile(id)
    5.      files.remove(file)
    6.      deleteBroadcastFile(file)
    7.    }
    8.  }
     
     



    1是删除blockManager中的缓存,2是删除本地持久化的文件

    TorrentBroadcast中的变量清除

    
    
    1. def unpersist(id:Long, removeFromDriver:Boolean, blocking:Boolean)=synchronized{
    2.   SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
    3. }

    小结
    Broadcast可以使用在executor端多次使用某个数据的场景(比如说字典),Http和Torrent两种方式对应传统的CS访问方式和P2P访问方式,当广播变量较大或者使用较频繁时,采用后者可以减少driver端的压力。


    参考:
    http://blog.csdn.net/asongoficeandfire/article/details/37584643

    https://endymecy.gitbooks.io/spa ... ared-variables.html

    上一篇: 无

  • 相关阅读:
    [原创]二路归并排序针对数组的场景(C++版)
    [原创]装饰模式(java版)
    [原创]Java中Map根据值(value)进行排序实现
    [原创]适配器模式(java版)
    信了你的邪
    String和Date转换
    电商运营面试题
    springCloud发送请求多对象参数传递问题
    JS实现页面以年月日时分秒展示时间
    java三种注释以及参数涵义(转)
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723825.html
Copyright © 2011-2022 走看看