zoukankan      html  css  js  c++  java
  • Spark2.3(四十三):Spark Broadcast总结



    • 如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。
    • 如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。



    A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
    Broadcast variables are created from a variable v by calling SparkContext.broadcast(T, scala.reflect.ClassTag<T>). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The interpreter session below shows this:

         scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
         broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int} = Broadcast(0)
         scala> broadcastVar.value
         res0: Array[Int] = Array(1, 2, 3)

    After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).


    package org.apache.spark.broadcast
    import java.io.Serializable
    import scala.reflect.ClassTag
    import org.apache.spark.SparkException
    import org.apache.spark.internal.Logging
    import org.apache.spark.util.Utils
     * A broadcast variable. 
     * 。。。。。。
     * @param id A unique identifier for the broadcast variable.
     * @tparam T Type of the data contained in the broadcast variable.
    abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging {
       * Flag signifying whether the broadcast variable is valid(that is, not already destroyed) or not.
       * 指示广播变量是否有效的标志(即,尚未销毁)或未销毁。
      @volatile private var _isValid = true
      private var _destroySite = ""
      /** Get the broadcasted value. */
      def value: T = {
       * Asynchronously delete cached copies of this broadcast on the executors. If the broadcast is used after this is called, it will need to be re-sent to each executor.
       * 异步删除执行器上此广播的缓存副本。如果在调用后使用广播,则需要将其重新发送给每个执行者。 
      def unpersist() {
        unpersist(blocking = false)
       * Delete cached copies of this broadcast on the executors. If the broadcast is used after this is called, it will need to be re-sent to each executor.
       * 删除执行器上此广播的缓存副本。如果在调用后使用广播,则需要将其重新发送给每个执行者。
       * @param blocking Whether to block until unpersisting has completed 是否阻止直到unpersist完成
      def unpersist(blocking: Boolean) {
       * Destroy all data and metadata related to this broadcast variable. Use this with caution; once a broadcast variable has been destroyed, it cannot be used again. This method blocks until destroy has completed
       * 销毁与此广播变量相关的所有数据和元数据。小心使用;广播变量一旦被销毁,就不能再使用。此方法在销毁完成之前阻止
      def destroy() {
        destroy(blocking = true)
       * Destroy all data and metadata related to this broadcast variable. Use this with caution; once a broadcast variable has been destroyed, it cannot be used again.
       * 销毁与此广播变量相关的所有数据和元数据。小心使用;广播变量一旦被销毁,就不能再使用。
       * @param blocking Whether to block until destroy has completed 是否阻止直到销毁完成
      private[spark] def destroy(blocking: Boolean) {
        _isValid = false
        _destroySite = Utils.getCallSite().shortForm
        logInfo("Destroying %s (from %s)".format(toString, _destroySite))
       * Whether this Broadcast is actually usable. This should be false once persisted state is removed from the driver.
       * 广播变量是否实际可用。一旦持久化状态从driver中被移除将会返回false.
      private[spark] def isValid: Boolean = {
       * Actually get the broadcasted value. Concrete implementations of Broadcast class must define their own way to get the value.
       * 实际获取广播值。广播类的具体实现必须定义自己的方法来获取值。
      protected def getValue(): T
       * Actually unpersist the broadcasted value on the executors. Concrete implementations of Broadcast class must define their own logic to unpersist their own data.
      protected def doUnpersist(blocking: Boolean)
       * Actually destroy all data and metadata related to this broadcast variable. Implementation of Broadcast class must define their own logic to destroy their own state.
      protected def doDestroy(blocking: Boolean)
      /** Check if this broadcast is valid. If not valid, exception is thrown. */
      protected def assertValid() {
        if (!_isValid) {
          throw new SparkException("Attempted to use %s after it was destroyed (%s) ".format(toString, _destroySite))
      override def toString: String = "Broadcast(" + id + ")"

    2)调用unpersist(),unpersist(boolean blocking),destroy(),distroy(boolean blocking)方法这些方法必须在driver端调用。

    参考《Spark2.2(三十三):Spark Streaming和Spark Structured Streaming更新broadcast总结(一)》、《Spark2.3(四十二):Spark Streaming和Spark Structured Streaming更新broadcast总结(二)





  • 相关阅读:
    ASP.NET 自制时间控件
    ORACLE 函数汇总之单记录函数
    Servers IIS 重启命令
    ASP.NET 两个Dropdownlist控件联动
    ASP.NET datagridview控件表头定义
    python Image 安装
    ssh 不需要密码的链接
    [Redis] redis 相关的博客
    [emacs] python代码折叠
    linux python 链接 oracle
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/10613035.html
Copyright © 2011-2022 走看看