zoukankan      html  css  js  c++  java
  • 【Spark篇】--Spark中的宽窄依赖和Stage的划分

    一、前述

    RDD之间有一系列的依赖关系,依赖关系又分为窄依赖和宽依赖。

    Spark中的Stage其实就是一组并行的任务,任务是一个个的task 。

    二、具体细节

    • 窄依赖

    RDD和子RDD partition之间的关系是一对一的。或者父RDD一个partition只对应一个子RDD的partition情况下的父RDD和子RDD partition关系是多对一的。不会有shuffle的产生。父RDD一个分区去到子RDD的一个分区

     

    • 宽依赖

    RDD与子RDD partition之间的关系是一对多。会有shuffle的产生。父RDD的一个分区的数据去到子RDD的不同分区里面。

    其实区分宽窄依赖主要就是看父RDD的一个Partition的流向,要是流向一个的话就是窄依赖,流向多个的话就是宽依赖。看图理解:

    • Stage概念

    Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每个stage包含一个或多个task任务。然后将这些task以taskSet的形式提交给TaskScheduler运行。     stage是由一组并行的task组成。

     

    • stage切割规则

     切割规则:从后往前遇到宽依赖就切割stage。

     

     

     

    • stage计算模式

        pipeline管道计算模式,pipeline只是一种计算思想,模式。

     

    备注:图中几个理解点:

       1、Spark的pipeLine的计算模式,相当于执行了一个高阶函数f3(f2(f1(textFile))) !+!+!=3 也就是来一条数据然后计算一条数据,把所有的逻辑走完,然后落地,准确的说一个task处理遗传分区的数据 因为跨过了不同的逻辑的分区。而MapReduce是 1+1=2,2+1=3的模式,也就是计算完落地,然后在计算,然后再落地到磁盘或内存,最后数据是落在计算节点上,按reduce的hash分区落地。所以这也是比Mapreduce快的原因,完全基于内存计算。

       2、管道中的数据何时落地:shuffle write的时候,对RDD进行持久化的时候。

       3.   Stage的task并行度是由stage的最后一个RDD的分区数来决定的 。一般来说,一个partiotion对应一个task,但最后reduce的时候可以手动改变reduce的个数,也就是分区数,即改变了并行度。例如reduceByKey(XXX,3),GroupByKey(4),union由的分区数由前面的相加。

       4.、如何提高stage的并行度:reduceBykey(xxx,numpartiotion),join(xxx,numpartiotion)

    • 测试验证pipeline计算模式
      import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
      import java.util.Arrays
      
      object PipelineTest {
        def main(args: Array[String]): Unit = {
          val conf = new SparkConf()
          conf.setMaster("local").setAppName("pipeline");
          val sc = new SparkContext(conf)
          val rdd = sc.parallelize(Array(1,2,3,4))
          val rdd1 = rdd.map { x => {
            println("map--------"+x)
            x
          }}
          val rdd2 = rdd1.filter { x => {
            println("fliter********"+x)
            true
          } }
          rdd2.collect()
          sc.stop()
        }
      }
      

       

      可见是按照所有的逻辑将数据一条条的执行。!!!

     

     

  • 相关阅读:
    区别@ControllerAdvice 和@RestControllerAdvice
    Cannot determine embedded database driver class for database type NONE
    使用HttpClient 发送 GET、POST、PUT、Delete请求及文件上传
    Markdown语法笔记
    Property 'sqlSessionFactory' or 'sqlSessionTemplate' are required
    Mysql 查看连接数,状态 最大并发数(赞)
    OncePerRequestFilter的作用
    java连接MySql数据库 zeroDateTimeBehavior
    Intellij IDEA 安装lombok及使用详解
    ps -ef |grep xxx 输出的具体含义
  • 原文地址:https://www.cnblogs.com/LHWorldBlog/p/8415541.html
Copyright © 2011-2022 走看看