zoukankan      html  css  js  c++  java
  • stage的划分

    stage的划分是以shuffle操作作为边界的,遇到一个宽依赖就分一个stage

    一个Job会被拆分为多组Task,每组任务被称为一个Stage就像Map Stage, Reduce Stage。Stage的划分在RDD的论文中有详细的介绍,简单的说是以shuffle和result这两种类型来划分。在Spark中有两类task,一类是shuffleMapTask,一类是resultTask,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。比如 rdd.parallize(1 to 10).foreach(println) 这个操作没有shuffle,直接就输出了,那么只有它的task是resultTask,stage也只有一个;如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 这个job因为有reduce,所以有一个shuffle过程,那么reduceByKey之前的是一个stage,执行shuffleMapTask,输出shuffle所需的数据,reduceByKey到最后是一个stage,直接就输出结果了。如果job中有多次shuffle,那么每个shuffle之前都是一个stage.
    会根据RDD之间的依赖关系将DAG图划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中,而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算。之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中

    举例如下:

    scala> import java.net.URL import java.net.URL

    scala>  val weblogrdd=sc.textFile("hdfs://localhost:9000/spark/log/web.log")

    weblogrdd: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/spark/log/web.log MapPartitionsRDD[99] at textFile at <console>:26

    scala> val bb=weblogrdd.map(_.split(" ")).map(x=>{val url=new URL(x(1));val path=url.getPath().substring(1);(path,x(0))}).map((_,1))

    bb: org.apache.spark.rdd.RDD[((String, String), Int)] = MapPartitionsRDD[104] at map at <console>:28

    scala> val cc=bb.reduceByKey(_+_)

    cc: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[105] at reduceByKey at <console>:30

    scala> val dd=cc.groupBy(_._1._1).mapValues(_.toList.sortBy(_._2).reverse.take(2))

    dd: org.apache.spark.rdd.RDD[(String, List[((String, String), Int)])] = MapPartitionsRDD[108] at mapValues at <console>:32

    scala> dd.collect

    res43: Array[(String, List[((String, String), Int)])] = Array((car,List(((car,a10002),5), ((car,10001),1))), (movie,List(((movie,a10001),5), ((movie,a10002),2))), (book,List(((book,a10001),3), ((book,a10002),1))), (music,List(((music,a10001),2), ((music,a10002),1))), (yule,List(((yule,a10002),4), ((yule,a10001),2))))

    spark中stage划分和提交的具体流程,其核心思想在于宽依赖划分stage 以及递归提交stage任务

    ------------------------------------------------------------------------------------------------------------------------------------------

    scala> val mm=sc.makeRDD(List(("wang",2),("zhang",20),("wang",52)))

    mm: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[118] at makeRDD at <console>:26

    scala> val nn=sc.makeRDD(List(("wang",31),("zhang",25),("wang",88)))

    nn: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[119] at makeRDD at <console>:26

    scala> val mn=mm.join(nn)

    mn: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[122] at join at <console>:30

    scala> mn.collect

    res46: Array[(String, (Int, Int))] = Array((zhang,(20,25)), (wang,(2,31)), (wang,(2,88)), (wang,(52,31)), (wang,(52,88)))

    --------------------------------------------------------------------------------------------

    scala> val mm=sc.makeRDD(List(("wang",2),("zhang",20),("wang",52)))

    mm: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[128] at makeRDD at <console>:26

    scala> val nn=sc.makeRDD(List(("wang",31),("zhang",25),("wang",88)))

    nn: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[129] at makeRDD at <console>:26

    scala> val gmm=mm.groupByKey()

    gmm: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[130] at groupByKey at <console>:28

    scala> val gnn=nn.groupByKey()

    gnn: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[131] at groupByKey at <console>:28

    scala> val gmn=gmm join gnn

    gmn: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[134] at join at <console>:34

    scala> gmn.collect

    res51: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((zhang,(CompactBuffer(20),CompactBuffer(25))), (wang,(CompactBuffer(2, 52),CompactBuffer(31, 88))))

  • 相关阅读:
    使用Java ImageIO类进行批量图片格式转换(转载)
    ORA00980 同义词转换不再有效(ORA00980: synonym translation is no longer valid) (转)
    如何截取字符串
    不能执行已释放 script 的代码(个人碰到的问题)
    查询某个用户下的表
    js验证密码强度
    查看表空间的sql语句
    JavaScript变量提升、作用域
    PL/SQL developer 显示所有数据
    js只能输入数字,小数点(整理)
  • 原文地址:https://www.cnblogs.com/playforever/p/9405988.html
Copyright © 2011-2022 走看看