zoukankan      html  css  js  c++  java
  • spark学习进度14(spark逻辑图和物理图和运行过程)

    逻辑图:

    表达的是什么:

    逻辑图就是数据处理和存储的过程表达

    什么是RDD之间的依赖关系:

     什么是关系(依赖关系) ?

    • 从算子视角上来看, splitRDD 通过 map 算子得到了 tupleRDD, 所以 splitRDD 和 tupleRDD 之间的关系是 map

      但是仅仅这样说, 会不够全面, 从细节上来看, RDD 只是数据和关于数据的计算, 而具体执行这种计算得出结果的是一个神秘的其它组件, 所以, 这两个 RDD 的关系可以表示为 splitRDD 的数据通过 map 操作, 被传入 tupleRDD, 这是它们之间更细化的关系

      但是 RDD 这个概念本身并不是数据容器, 数据真正应该存放的地方是 RDD 的分区, 所以如果把视角放在数据这一层面上的话, 直接讲这两个 RDD 之间有关系是不科学的, 应该从这两个 RDD 的分区之间的关系来讨论它们之间的关系那这些分区之间是什么关系?如果仅仅说 splitRDD 和 tupleRDD 之间的话, 那它们的分区之间就是一对一的关系

    • 但是 tupleRDD 到 reduceRDD 呢? tupleRDD 通过算子 reduceByKey 生成 reduceRDD, 而这个算子是一个 Shuffle 操作, Shuffle 操作的两个 RDD 的分区之间并不是一对一, reduceByKey 的一个分区对应 tupleRDD 的多个分区

      RDD 之间的依赖关系详解

      窄依赖

      对于 cartesian 来说, 依赖关系如下

      20190520144103

      上述图形中清晰展示如下现象

      1. rddC 中的分区数量是两个父 RDD 的分区数量之乘积

      2. rddA 中每个分区对应 rddC 中的两个分区 (因为 rddB 中有两个分区), rddB 中的每个分区对应 rddC 中的三个分区 (因为 rddA 有三个分区)

      它们之间是窄依赖, 事实上在 cartesian 中也是 NarrowDependency 这个所有窄依赖的父类的唯一一次直接使用, 为什么呢?

      因为所有的分区之间是拷贝关系, 并不是 Shuffle 关系

      • rddC 中的每个分区并不是依赖多个父 RDD 中的多个分区

      • rddC 中每个分区的数量来自一个父 RDD 分区中的所有数据, 是一个 FullDependence, 所以数据可以直接从父 RDD 流动到子 RDD

      • 不存在一个父 RDD 中一部分数据分发过去, 另一部分分发给其它的 RDD

      宽依赖

      在 ShuffleDependency 的类声明上如下写到

      Represents a dependency on the output of a shuffle stage.

      上面非常清楚的说道, 宽依赖就是 Shuffle 中的依赖关系, 换句话说, 只有 Shuffle 产生的地方才是宽依赖

      那么宽窄依赖的判断依据就非常简单明确了, 是否有 Shuffle ?

      举个 reduceByKey 的例子, rddB = rddA.reduceByKey( (curr, agg) ⇒ curr + agg ) 会产生如下的依赖关系

      20190520151040
      • rddB 的每个分区都几乎依赖 rddA 的所有分区

      • 对于 rddA 中的一个分区来说, 其将一部分分发给 rddB 的 p1, 另外一部分分发给 rddB 的 p2, 这不是数据流动, 而是分发

      • 什么是SHUFFLE:

      •  如何分辨宽窄依赖 ?

        其实分辨宽窄依赖的本身就是在分辨父子 RDD 之间是否有 Shuffle, 大致有以下的方法

        • 如果是 Shuffle, 两个 RDD 的分区之间不是单纯的数据流动, 而是分发和复制

        • 一般 Shuffle 的子 RDD 的每个分区会依赖父 RDD 的多个分区

        但是这样判断其实不准确, 如果想分辨某个算子是否是窄依赖, 或者是否是宽依赖, 则还是要取决于具体的算子, 例如想看 cartesian 生成的是宽依赖还是窄依赖, 可以通过如下步骤

        1. 查看 map 算子生成的 RDD

          20190520155245
        2. 进去 RDD 查看 getDependence 方法

          20190520155314

          常见的窄依赖类型:

          一对一窄依赖

          其实 RDD 中默认的是 OneToOneDependency, 后被不同的 RDD 子类指定为其它的依赖类型, 常见的一对一依赖是 map 算子所产生的依赖, 例如 rddB = rddA.map(…​)

          20190520160405
          • 每个分区之间一一对应, 所以叫做一对一窄依赖

          Range 窄依赖

          Range 窄依赖其实也是一对一窄依赖, 但是保留了中间的分隔信息, 可以通过某个分区获取其父分区, 目前只有一个算子生成这种窄依赖, 就是 union 算子, 例如 rddC = rddA.union(rddB)

          20190520161043
          • rddC 其实就是 rddA 拼接 rddB 生成的, 所以 rddC 的 p5 和 p6 就是 rddB 的 p1 和 p2

          • 所以需要有方式获取到 rddC 的 p5 其父分区是谁, 于是就需要记录一下边界, 其它部分和一对一窄依赖一样

          多对一窄依赖

          多对一窄依赖其图形和 Shuffle 依赖非常相似, 所以在遇到的时候, 要注意其 RDD 之间是否有 Shuffle 过程, 比较容易让人困惑, 常见的多对一依赖就是重分区算子 coalesce, 例如 rddB = rddA.coalesce(2, shuffle = false), 但同时也要注意, 如果 shuffle = true 那就是完全不同的情况了

          20190520161621
          • 因为没有 Shuffle, 所以这是一个窄依赖

          再谈宽窄依赖的区别

          宽窄依赖的区别非常重要, 因为涉及了一件非常重要的事情: 如何计算 RDD ?

          宽窄以来的核心区别是: 窄依赖的 RDD 可以放在一个 Task 中运行

          物理图:

          物理图的作用:

          问题一: 物理图的意义是什么?

          物理图解决的其实就是 RDD 流程生成以后, 如何计算和运行的问题, 也就是如何把 RDD 放在集群中执行的问题

          Snipaste 2019 05 23 14 00 33

          问题二: 如果要确定如何运行的问题, 则需要先确定集群中有什么组件

          • 首先集群中物理元件就是一台一台的机器

          • 其次这些机器上跑的守护进程有两种: MasterWorker

            • 每个守护进程其实就代表了一台机器, 代表这台机器的角色, 代表这台机器和外界通信

            • 例如我们常说一台机器是 Master, 其含义是这台机器中运行了一个 Master 守护进程, 如果一台机器运行了 Master 的同时又运行了 Worker, 则说这台机器是 Master 也可以, 说它是 Worker 也行

          • 真正能运行 RDD 的组件是: Executor, 也就是说其实 RDD 最终是运行在 Executor 中的, 也就是说, 无论是 Master 还是 Worker 其实都是用于管理 Executor 和调度程序的

          结论是 RDD 一定在 Executor 中计算, 而 Master 和 Worker 负责调度和管理 Executor

           

          如何划分阶段 ?

          为了减少执行任务, 减少数据暂存和交换的机会, 所以需要创建管道, 让数据沿着管道流动, 其实也就是原先每个 RDD 都有一组 Task, 现在改为所有的 RDD 共用一组 Task, 但是也有问题, 问题如下

          20190521114717

          就是说, 在 Shuffle 处, 必须断开管道, 进行数据交换, 交换过后, 继续流动, 所以整个流程可以变为如下样子

          20190521115759

          把 Task 断开成两个部分, Task4 可以从 Task 1, 2, 3 中获取数据, 后 Task4 又作为管道, 继续让数据在其中流动

          但是还有一个问题, 说断开就直接断开吗? 不用打个招呼的呀? 这个断开即没有道理, 也没有规则, 所以可以为这个断开增加一个概念叫做阶段, 按照阶段断开, 阶段的英文叫做 Stage, 如下

          20190521120501

          所以划分阶段的本身就是设置断开点的规则, 那么该如何划分阶段呢?

          1. 第一步, 从最后一个 RDD, 也就是逻辑图中最右边的 RDD 开始, 向前滑动 Stage 的范围, 为 Stage0

          2. 第二步, 遇到 ShuffleDependency 断开 Stage, 从下一个 RDD 开始创建新的 Stage, 为 Stage1

          3. 第三步, 新的 Stage 按照同样的规则继续滑动, 直到包裹所有的 RDD

          总结来看, 就是针对于宽窄依赖来判断, 一个 Stage 中只有窄依赖, 因为只有窄依赖才能形成数据的 Pipeline.

          如果要进行 Shuffle 的话, 数据是流不过去的, 必须要拷贝和拉取. 所以遇到 RDD 宽依赖的两个 RDD 时, 要切断这两个 RDD 的 Stage.

           

          这样一个 RDD 依赖的链条, 我们称之为 RDD 的血统, 其中有宽依赖也有窄依赖

          数据怎么流动 ?

          val sc = ...
          
          val textRDD = sc.parallelize(Seq("Hadoop Spark", "Hadoop Flume", "Spark Sqoop"))
          val splitRDD = textRDD.flatMap(_.split(" "))
          val tupleRDD = splitRDD.map((_, 1))
          val reduceRDD = tupleRDD.reduceByKey(_ + _)
          val strRDD = reduceRDD.map(item => s"${item._1}, ${item._2}")
          
          strRDD.collect.foreach(item => println(item))

          上述代码是这个章节我们一直使用的代码流程, 如下是其完整的逻辑执行图

          20190521161456

          如果放在集群中运行, 通过 WebUI 可以查看到如下 DAG 结构

          20190521161337

          Step 1: 从 ResultStage 开始执行

          最接近 Result 部分的 Stage id 为 0, 这个 Stage 被称之为 ResultStage

          由代码可以知道, 最终调用 Action 促使整个流程执行的是最后一个 RDDstrRDD.collect, 所以当执行 RDD 的计算时候, 先计算的也是这个 RDD

          Step 2: RDD 之间是有关联的

          前面已经知道, 最后一个 RDD 先得到执行机会, 先从这个 RDD 开始执行, 但是这个 RDD 中有数据吗 ? 如果没有数据, 它的计算是什么? 它的计算是从父 RDD 中获取数据, 并执行传入的算子的函数

          简单来说, 从产生 Result 的地方开始计算, 但是其 RDD 中是没数据的, 所以会找到父 RDD 来要数据, 父 RDD 也没有数据, 继续向上要, 所以, 计算从 Result 处调用, 但是从整个逻辑图中的最左边 RDD 开始, 类似一个递归的过程

          20190521162302

          运行过程:

          逻辑图

          是什么 怎么生成 具体怎么生成

          val textRDD = sc.parallelize(Seq("Hadoop Spark", "Hadoop Flume", "Spark Sqoop"))
          val splitRDD = textRDD.flatMap(_.split(" "))
          val tupleRDD = splitRDD.map((_, 1))
          val reduceRDD = tupleRDD.reduceByKey(_ + _)
          val strRDD = reduceRDD.map(item => s"${item._1}, ${item._2}")

          逻辑图如何生成

          上述代码在 Spark Application 的 main 方法中执行, 而 Spark Application 在 Driver 中执行, 所以上述代码在 Driver 中被执行, 那么这段代码执行的结果是什么呢?

          一段 Scala 代码的执行结果就是最后一行的执行结果, 所以上述的代码, 从逻辑上执行结果就是最后一个 RDD, 最后一个 RDD 也可以认为就是逻辑执行图, 为什么呢?

          例如 rdd2 = rdd1.map(…​) 中, 其实本质上 rdd2 是一个类型为 MapPartitionsRDD 的对象, 而创建这个对象的时候, 会通过构造函数传入当前 RDD 对象, 也就是父 RDD, 也就是调用 map 算子的 rdd1rdd1 是 rdd2 的父 RDD

          20190521165818

          一个 RDD 依赖另外一个 RDD, 这个 RDD 又依赖另外的 RDD, 一个 RDD 可以通过 getDependency 获得其父 RDD, 这种环环相扣的关系, 最终从最后一个 RDD 就可以推演出前面所有的 RDD

          逻辑图是什么, 干啥用

          逻辑图其实本质上描述的就是数据的计算过程, 数据从哪来, 经过什么样的计算, 得到什么样的结果, 再执行什么计算, 得到什么结果

          可是数据的计算是描述好了, 这种计算该如何执行呢?

          物理图

          数据的计算表示好了, 该正式执行了, 但是如何执行? 如何执行更快更好更酷? 就需要为其执行做一个规划, 所以需要生成物理执行图

          strRDD.collect.foreach(item => println(item))

          上述代码其实就是最后的一个 RDD 调用了 Action 方法, 调用 Action 方法的时候, 会请求一个叫做 DAGScheduler 的组件, DAGScheduler 会创建用于执行 RDD 的 Stage 和 Task

          DAGScheduler 是一个由 SparkContext 创建, 运行在 Driver 上的组件, 其作用就是将由 RDD 构建出来的逻辑计划, 构建成为由真正在集群中运行的 Task 组成的物理执行计划, DAGScheduler 主要做如下三件事

          1. 帮助每个 Job 计算 DAG 并发给 TaskSheduler 调度

          2. 确定每个 Task 的最佳位置

          3. 跟踪 RDD 的缓存状态, 避免重新计算

          从字面意思上来看, DAGScheduler 是调度 DAG 去运行的, DAG 被称作为有向无环图, 其实可以将 DAG 理解为就是 RDD 的逻辑图, 其呈现两个特点: RDD 的计算是有方向的, RDD 的计算是无环的, 所以 DAGScheduler 也可以称之为 RDD Scheduler, 但是真正运行在集群中的并不是 RDD, 而是 Task 和 StageDAGScheduler 负责这种转换

          Job 是什么 ?

          Job 什么时候生成 ?

          当一个 RDD 调用了 Action 算子的时候, 在 Action 算子内部, 会使用 sc.runJob() 调用 SparkContext 中的 runJob 方法, 这个方法又会调用 DAGScheduler 中的 runJob, 后在 DAGScheduler 中使用消息驱动的形式创建 Job

          简而言之, Job 在 RDD 调用 Action 算子的时候生成, 而且调用一次 Action 算子, 就会生成一个 Job, 如果一个 SparkApplication 中调用了多次 Action 算子, 会生成多个 Job 串行执行, 每个 Job 独立运作, 被独立调度, 所以 RDD 的计算也会被执行多次

          Job 是什么 ?

          如果要将 Spark 的程序调度到集群中运行, Job 是粒度最大的单位, 调度以 Job 为最大单位, 将 Job 拆分为 Stage 和 Task 去调度分发和运行, 一个 Job 就是一个 Spark 程序从 读取 → 计算 → 运行 的过程

          一个 Spark Application 可以包含多个 Job, 这些 Job 之间是串行的, 也就是第二个 Job 需要等待第一个 Job 的执行结束后才会开始执行

          Job 和 Stage 的关系

          Job 是一个最大的调度单位, 也就是说 DAGScheduler 会首先创建一个 Job 的相关信息, 后去调度 Job, 但是没办法直接调度 Job, 比如说现在要做一盘手撕包菜, 不可能直接去炒一整颗包菜, 要切好撕碎, 再去炒

          为什么 Job 需要切分 ?
          20190521161456
          • 因为 Job 的含义是对整个 RDD 血统求值, 但是 RDD 之间可能会有一些宽依赖

          • 如果遇到宽依赖的话, 两个 RDD 之间需要进行数据拉取和复制

            如果要进行拉取和复制的话, 那么一个 RDD 就必须等待它所依赖的 RDD 所有分区先计算完成, 然后再进行拉取

          • 由上得知, 一个 Job 是无法计算完整个 RDD 血统的

          如何切分 ?

          创建一个 Stage, 从后向前回溯 RDD, 遇到 Shuffle 依赖就结束 Stage, 后创建新的 Stage 继续回溯. 这个过程上面已经详细的讲解过, 但是问题是切分以后如何执行呢, 从后向前还是从前向后, 是串行执行多个 Stage, 还是并行执行多个 Stage

          问题一: 执行顺序

          在图中, Stage 0 的计算需要依赖 Stage 1 的数据, 因为 reduceRDD 中一个分区可能需要多个 tupleRDD 分区的数据, 所以 tupleRDD 必须先计算完, 所以, 应该在逻辑图中自左向右执行 Stage

          问题二: 串行还是并行

          还是同样的原因, Stage 0 如果想计算, Stage 1 必须先计算完, 因为 Stage 0 中每个分区都依赖 Stage 1 中的所有分区, 所以 Stage 1 不仅需要先执行, 而且 Stage 1 执行完之前 Stage 0 无法执行, 它们只能串行执行

          总结

          • 一个 Stage 就是物理执行计划中的一个步骤, 一个 Spark Job 就是划分到不同 Stage 的计算过程

          • Stage 之间的边界由 Shuffle 操作来确定

            • Stage 内的 RDD 之间都是窄依赖, 可以放在一个管道中执行

            • 而 Shuffle 后的 Stage 需要等待前面 Stage 的执行

          Stage 有两种

          • ShuffMapStage, 其中存放窄依赖的 RDD

          • ResultStage, 每个 Job 只有一个, 负责计算结果, 一个 ResultStage 执行完成标志着整个 Job 执行完毕

          Stage 和 Task 的关系

          20190521120501

          前面我们说到 Job 无法直接执行, 需要先划分为多个 Stage, 去执行 Stage, 那么 Stage 可以直接执行吗?

          • 第一点: Stage 中的 RDD 之间是窄依赖

            因为 Stage 中的所有 RDD 之间都是窄依赖, 窄依赖 RDD 理论上是可以放在同一个 Pipeline(管道, 流水线) 中执行的, 似乎可以直接调度 Stage 了? 其实不行, 看第二点

          • 第二点: 别忘了 RDD 还有分区

            一个 RDD 只是一个概念, 而真正存放和处理数据时, 都是以分区作为单位的

            Stage 对应的是多个整体上的 RDD, 而真正的运行是需要针对 RDD 的分区来进行的

          • 第三点: 一个 Task 对应一个 RDD 的分区

            一个比 Stage 粒度更细的单元叫做 TaskStage 是由 Task 组成的, 之所以有 Task 这个概念, 是因为 Stage 针对整个 RDD, 而计算的时候, 要针对 RDD 的分区

            假设一个 Stage 中有 10 个 RDD, 这些 RDD 中的分区各不相同, 但是分区最多的 RDD 有 30 个分区, 而且很显然, 它们之间是窄依赖关系

            那么, 这个 Stage 中应该有多少 Task 呢? 应该有 30 个 Task, 因为一个 Task 计算一个 RDD 的分区. 这个 Stage 至多有 30 个分区需要计算

          • 总结

            • 一个 Stage 就是一组并行的 Task 集合

            • Task 是 Spark 中最小的独立执行单元, 其作用是处理一个 RDD 分区

            • 一个 Task 只可能存在于一个 Stage 中, 并且只能计算一个 RDD 的分区

          TaskSet

          梳理一下这几个概念, Job > Stage > TaskJob 中包含 Stage 中包含 Task

          而 Stage 中经常会有一组 Task 需要同时执行, 所以针对于每一个 Task 来进行调度太过繁琐, 而且没有意义, 所以每个 Stage 中的 Task 们会被收集起来, 放入一个 TaskSet 集合中

          • 一个 Stage 有一个 TaskSet

          • TaskSet 中 Task 的个数由 Stage 中的最大分区数决定

          整体执行流程

          20190522015026
           
           

          高级特性:

          闭包:

          class Closure {
            @Test
            def test(): Unit = {
              val areaFunction = closure()
              val area = areaFunction(2)
              println(area)
            }
          
            def closure(): Int => Double = {
              val factor = 3.14
              val areaFunction = (r: Int) => math.pow(r, 2) * factor
              areaFunction
            }
          
          }

           什么是闭包?

          val areaFunction = closure()
          areaFunction()

          通过 closure 返回的函数 areaFunction 就是一个闭包, 其函数内部的作用域并不是 test 函数的作用域, 这种连带作用域一起打包的方式, 我们称之为闭包, 在 Scala 中

          Scala 中的闭包本质上就是一个对象, 是 FunctionX 的实例

          全局累加器:

          一个小问题

          var count = 0
          
          val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
          val sc = new SparkContext(config)
          
          sc.parallelize(Seq(1, 2, 3, 4, 5))
            .foreach(count += _)
          
          println(count)

          上面这段代码是一个非常错误的使用, 请不要仿照, 这段代码只是为了证明一些事情

          先明确两件事, var count = 0 是在 Driver 中定义的, foreach(count += _) 这个算子以及传递进去的闭包运行在 Executor 中

          这段代码整体想做的事情是累加一个变量, 但是这段代码的写法却做不到这件事, 原因也很简单, 因为具体的算子是闭包, 被分发给不同的节点运行, 所以这个闭包中累加的并不是 Driver 中的这个变量

          全局累加器

          Accumulators(累加器) 是一个只支持 added(添加) 的分布式变量, 可以在分布式环境下保持一致性, 并且能够做到高效的并发.

          原生 Spark 支持数值型的累加器, 可以用于实现计数或者求和, 开发者也可以使用自定义累加器以实现更高级的需求

          val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
          val sc = new SparkContext(config)
          
          val counter = sc.longAccumulator("counter")
          
          sc.parallelize(Seq(1, 2, 3, 4, 5))
            .foreach(counter.add(_))
          
          // 运行结果: 15
          println(counter.value)

          注意点:

          • Accumulator 是支持并发并行的, 在任何地方都可以通过 add 来修改数值, 无论是 Driver 还是 Executor

          • 只能在 Driver 中才能调用 value 来获取数值

          在 WebUI 中关于 Job 部分也可以看到 Accumulator 的信息, 以及其运行的情况

           
          package cn.itcast.spark.rdd
          
          import org.apache.spark.{SparkConf, SparkContext}
          import org.apache.spark.util.AccumulatorV2
          import org.junit.Test
          
          import scala.collection.mutable
          
          class Accumulator {
          
            /**
              * RDD -> (1, 2, 3, 4, 5) -> Set(1,2,3,4,5)
              */
            @Test
            def acc(): Unit = {
              val config = new SparkConf().setAppName("acc").setMaster("local[6]")
              val sc = new SparkContext(config)
          
              val numAcc = new NumAccumulator()
              // 注册给 Spark
              sc.register(numAcc, "num")
          
              sc.parallelize(Seq("1", "2", "3"))
                .foreach(item => numAcc.add(item))
          
              println(numAcc.value)
          
              sc.stop()
            }
          }
          
          class NumAccumulator extends AccumulatorV2[String, Set[String]] {//累加前后的数据类型
            private val nums: mutable.Set[String] = mutable.Set()//mutable保证是可变的
          
            /**
              * 告诉 Spark 框架, 这个累加器对象是否是空的
              */
            override def isZero: Boolean =  {
              nums.isEmpty
            }
          
            /**
              * 提供给 Spark 框架一个拷贝的累加器
              * @return
              */
            override def copy(): AccumulatorV2[String, Set[String]] = {
              val newAccumulator = new NumAccumulator()
              nums.synchronized {
                newAccumulator.nums ++= this.nums//++=是将两个集合加
              }
              newAccumulator
            }
          
            /**
              * 帮助 Spark 框架, 清理累加器的内容
              */
            override def reset(): Unit = {
              nums.clear()
            }
          
            /**
              * 外部传入要累加的内容, 在这个方法中进行累加
              */
            override def add(v: String): Unit = {
              nums += v
            }
          
            /**
              * 累加器在进行累加的时候, 可能每个分布式节点都有一个实例
              * 在最后 Driver 进行一次合并, 把所有的实例的内容合并起来, 会调用这个 merge 方法进行合并
              */
            override def merge(other: AccumulatorV2[String, Set[String]]): Unit = {
              nums ++= other.value
            }
          
            /**
              * 提供给外部累加结果
              * 为什么一定要给不可变的, 因为外部有可能再进行修改, 如果是可变的集合, 其外部的修改会影响内部的值
              */
            override def value: Set[String] = {
              nums.toSet
            }
          }

          广播变量:

           广播变量的作用

          广播变量允许开发者将一个 Read-Only 的变量缓存到集群中每个节点中, 而不是传递给每一个 Task 一个副本.

          • 集群中每个节点, 指的是一个机器

          • 每一个 Task, 一个 Task 是一个 Stage 中的最小处理单元, 一个 Executor 中可以有多个 Stage, 每个 Stage 有多个 Task

          所以在需要跨多个 Stage 的多个 Task 中使用相同数据的情况下, 广播特别的有用

          7eb422ef368aec2a1e60636b0f9dfd77

          广播变量的API

          方法名描述

          id

          唯一标识

          value

          广播变量的值

          unpersist

          在 Executor 中异步的删除缓存副本

          destroy

          销毁所有此广播变量所关联的数据和元数据

          toString

          字符串表示

          使用广播变量的一般套路

          可以通过如下方式创建广播变量

          val b = sc.broadcast(1)

          如果 Log 级别为 DEBUG 的时候, 会打印如下信息

          DEBUG BlockManager: Put block broadcast_0 locally took  430 ms
          DEBUG BlockManager: Putting block broadcast_0 without replication took  431 ms
          DEBUG BlockManager: Told master about block broadcast_0_piece0
          DEBUG BlockManager: Put block broadcast_0_piece0 locally took  4 ms
          DEBUG BlockManager: Putting block broadcast_0_piece0 without replication took  4 ms

          创建后可以使用 value 获取数据

          b.value

          获取数据的时候会打印如下信息

          DEBUG BlockManager: Getting local block broadcast_0
          DEBUG BlockManager: Level for block broadcast_0 is StorageLevel(disk, memory, deserialized, 1 replicas)

          广播变量使用完了以后, 可以使用 unpersist 删除数据

          b.unpersist

          删除数据以后, 可以使用 destroy 销毁变量, 释放内存空间

          b.destroy

          销毁以后, 会打印如下信息

          DEBUG BlockManager: Removing broadcast 0
          DEBUG BlockManager: Removing block broadcast_0_piece0
          DEBUG BlockManager: Told master about block broadcast_0_piece0
          DEBUG BlockManager: Removing block broadcast_0

          使用 value 方法的注意点

          方法签名 value: T

          在 value 方法内部会确保使用获取数据的时候, 变量必须是可用状态, 所以必须在变量被 destroy 之前使用 value 方法, 如果使用 value 时变量已经失效, 则会爆出以下错误

          org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at <console>:27)
            at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144)
            at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:69)
            ... 48 elided

          使用 destroy 方法的注意点

          方法签名 destroy(): Unit

          destroy 方法会移除广播变量, 彻底销毁掉, 但是如果你试图多次 destroy 广播变量, 则会爆出以下错误

          org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at <console>:27)
            at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144)
            at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:107)
            at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:98)
            ... 48 elided
           
           
          package cn.itcast.spark.rdd
          
          import org.apache.spark.{SparkConf, SparkContext}
          import org.junit.Test
          
          class Broadcast {
          
            /**
              * 资源占用比较大, 有十个对应的 value
              */
            @Test
            def bc1(): Unit = {
              // 数据, 假装这个数据很大, 大概一百兆
              val v = Map("Spark" -> "http://spark.apache.cn", "Scala" -> "http://www.scala-lang.org")
          
              val config = new SparkConf().setMaster("local[6]").setAppName("bc")
              val sc = new SparkContext(config)
          
              // 将其中的 Spark 和 Scala 转为对应的网址
              val r = sc.parallelize(Seq("Spark", "Scala"))
              val result = r.map(item => v(item)).collect()
          
              println(result)
            }
          
            /**
              * 使用广播, 大幅度减少 value 的复制
              */
            @Test
            def bc2(): Unit = {
              // 数据, 假装这个数据很大, 大概一百兆
              val v = Map("Spark" -> "http://spark.apache.cn", "Scala" -> "http://www.scala-lang.org")
          
              val config = new SparkConf().setMaster("local[6]").setAppName("bc")
              val sc = new SparkContext(config)
          
              // 创建广播
              val bc = sc.broadcast(v)
          
              // 将其中的 Spark 和 Scala 转为对应的网址
              val r = sc.parallelize(Seq("Spark", "Scala"))
          
              // 在算子中使用广播变量代替直接引用集合, 只会复制和executor一样的数量
              // 在使用广播之前, 复制 map 了 task 数量份
              // 在使用广播以后, 复制次数和 executor 数量一致
              val result = r.map(item => bc.value(item)).collect()
          
              result.foreach(println(_))
            }
          }
           
           
           
           
  • 相关阅读:
    Redis集群搭建步骤
    JS性能优化
    javaweb中实现在线人数统计
    tomcat在linux中启动慢的解决方案
    Redis高可用架构
    bjpowernode课程体系及题库
    java 相关
    码农翻身全年文章精华
    Spring源码深度解析
    PHPSTROM快捷键备份
  • 原文地址:https://www.cnblogs.com/dazhi151/p/14261779.html
Copyright © 2011-2022 走看看