zoukankan      html  css  js  c++  java
  • spark-RDD缓存,checkpoint机制,有向无环图,stage

    spark-RDD缓存,checkpoint机制,有向无环图,stage

    1.RDD依赖关系

    • RDD依赖关系有2种不同类型,窄依赖和宽依赖。

    • 窄依赖(narrow dependency):是指每个父RDD的Partition最多被子RDD一个Partition使用。就好像独生子女一样。窄依赖的算子包括:map,filter,flatMap等。如下图 :1对1 , 多对1

    • 宽依赖(wide dependency):多个子RDD的Partition会依赖统一个父RDD的Partition。就好像超生。宽依赖常见算子包括:reduceByKey,groupBy,groupByKey,sortBy,sortByKey等。 宽依赖会产生shuffle,如下图: 多对多,1对多

    • 相比于宽依赖,窄依赖对优化很有利 ,主要基于以下两点:
    1.宽依赖往往对应着shuffle操作( 多对多,汇总,多节点),需要在运行过程中将同一个父RDD的分区传入到不同的子RDD分区中,中间可能涉及多个节点之间的数据传输;而窄依赖的每个父RDD的分区只会传入到一个子RDD分区中,通常可以在一个节点内完成转换。
    
    2.当RDD分区丢失时(某个节点故障),spark会对数据进行重算。
    	a. 对于窄依赖,由于父RDD的一个分区只对应一个子RDD分区,这样只需要重算和子RDD分区对应的父RDD分区即可,所以这个重算对数据的利用率是100%的;
    	b. 对于宽依赖,重算的父RDD分区对应多个子RDD分区,这样实际上父RDD 中只有一部分的数据是被用于恢复这个丢失的子RDD分区的,另一部分对应子RDD的其它未丢失分区,这就造成了多余的计算;更一般的,宽依赖中子RDD分区通常来自多个父RDD分区,极端情况下,所有的父RDD分区都要进行重新计算。
    

    2.lineage(血统)

    • 血统就是将RDD与RDD之间依赖关系进行记录,如果当某个RDD分区数据丢失后,可以通过这种记录下来的关系进行重新计算,恢复得到的数据,这是spark带的容错机制。

    3.RDD缓存

    • 我们后期可以把RDD数据缓存起来,后续其他的job需要用到该RDD的结果数据,可以直接从缓存得到避免重复计算。魂村可以加快数据访问。

    • RDD设置缓存方式有2种:

      1. cache: 默认把数据存储到内存中,本质是调用presist() 默认存储级别是MEMORY_ONLY
      2. presist:可以把数据保存在内存或者磁盘中,它内部可以有封装缓存级别,这些缓存级别都被定义在一个Object中(StorageLevel中设置存储种类)
    • 进入 spark shell 演示

      spark-shell --master spark://1.0.0.155:7077 --executor-memory 1g --total-executor-cores 2
      
    • cache使用

      # 从hdfs读取
      scala> val rdd1 = sc.textFile("/u.txt")
      # 计入缓存
      scala> rdd1.cache
      # 此时查看http://linux01:4040/Storage/ 是没有任何缓存信息,这是因为在使用cache时候需要action触发
      scala> rdd1.collect
      # 可以看到如下图
      

      ![image-20210622111814117](C:UsersXu jkAppDataRoamingTypora ypora-user-imagesimage-20210622111814117.png)

      # 你可以继续进行算子操作
      scala> val rdd2 = rdd1.flatMap(_.split(" "))
      # 通过触发action,从缓存拿取数据,执行算子操作
      scala> rdd2.collect
      

      当退出spark-shell缓存也随之消失

    • presist使用

      # 虽然设置内存和磁盘的级别,但保存数据量较小,是不会分配到磁盘上的。
      scala> rdd2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER_2)
      scala> rdd2.collect
      # 如果想直接保存到磁盘,更改级别。
      scala> val rdd3 = rdd2.map(x=>(x,1))
      scala> rdd3.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
      scala> rdd3.collect
      
    • 从rdd1->rdd2->rdd3-> ..rddn 每一个步骤,如果设置缓存它会从缓存中拿取数据,而不是通过计算后再执行下一个算子操作。

    • 缓存之后生命周期

      当任务结束,缓存数据也随之消失
      
    • 缓存数据的清除

      1.自动清除
      	程序执行完毕,自动清除
      2.手动清除
      	scala> rdd1.unpersist(true) // 默认为true,表示阻塞删除
      
    • 关于缓存设置应用场景

      1.当某个RDD的数据被使用多次,可以设置缓存
      	val rdd1 = sc.textFile("words.txt")
      	rdd1.cache
      	val rdd2=rdd1.flatMap(_.split(" "))
      	val rdd3=rdd1.map((_,1))
      	rdd2.collect
      	rdd3.collect
      2.当某个RDD它是经过大量复杂算子操作,计算周期时间很长,将它设置缓存。
      

    4.RDD的checkpoint机制

    • 当对RDD数据进行缓存,保存在内存或磁盘中,后续就可以直接从内存或者磁盘中获取得到,但是不安全。

      • cache:在内存中,虽然后期操作速度比较快,直接从内存中获取,但是不安全,比如服务器突然挂掉,或者进程终止,它都会导致数据丢失。
      • persist: 它可以保存数据到磁盘中,虽然速度慢,相对cache安全一点,但也不是特别安全,假如系统管理员误操作删除导致磁盘损坏,导致数据丢失。
    • 而checkpoint机制它提供一种相对更加可靠数据持久方式,它把数据保存在分布式文件系统上,比如HDFS上,它利用HDFS高可用,高容错(多副本)来保证数据安全性。

    • checkpoint的使用

    # hdfs创建checkponit目录
    scala> sc.setCheckpointDir("/checkpoint")
    # 此时查看hdfs 多了一个checkpoint
    [root@linux01 data]# hdfs dfs -ls /
    drwxr-xr-x   - root supergroup          0 2021-06-22 13:18 /checkpoint
    # 读出文件
    scala> val rdd1=sc.textFile("/u.txt")
    # 对rdd1进行checkpoint
    scala> rdd1.checkpoint
    # 算子操作
    scala> val rdd2 = rdd1.flatMap(_.split(" "))
    # 触发action 才会触发checkpoint
    scala> rdd2.collect
    # 查看hdfs保存文件,可以看到多了part-00000和part-00001两个文件
    [root@linux01 data]# hdfs dfs -ls /checkpoint/e5a6cb9f-373c-44ec-8730-7eda0e6067dc/rdd-3
    	part-00000
    	part-00001
    
    • http://linux01:4040/jobs/ job任务看到会有2个job任务完成,其中一个就是checkpoint,一个是job任务。

    5.cache , presist,checkpoint三者之间区别

    cache和presist分别可以把RDD数据缓存在内存或者本地磁盘,后续要触发cache和presist持久化操作。需要有一个action,它不会开启其他新的job,一个action对应一个job。在运行的过程到程序结束后,对应的缓存数据就自动消失了。它不会改变RDD的依赖关系。
    
    checkpoint:可以把数据持久写入hdfs上,后续要触发checkpoint操作,需要有一个action、任务在运行过程到程序结束之后,对应缓存数据不会消失,它会改变rdd的依赖关系。后续数据丢失了不能再通过血统进行数据恢复。
    	checkpoint操作要执行需要一个action操作,一个action操作对应后续的一个job,该job执行完成之后,它会再次单独开启另一个job来执行rdd1.checkpoint操作。
    	
    所以checkpoint执行action会开启2个job,而cache,presist 只会开启1个job
    
    • 数据恢复顺序:
    cache -> checkpoint -> 重新计算
    

    6.有向无环图生成

    • DAG(Directed Acyclic Graph)叫做有向无环图(有方向,无闭环,代表着数据的流向),原始RDD通过一系列的转换形成了DAG

    • 当我们执行一个单词统计的job任务时候,登录到:http://linux01:4040/jobs/可以查看到DAG图,如下图:

    sc.textFile("/u.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
    

    • 该方向就是RDD算子操作顺序,这里它把DAG图划分成了不同的stage(调度阶段)。

    7.stage是什么?怎么划分

    • stage表示不同的调度阶段,一个spark中的job 会对应很多个stage(调度阶段)。

    • 为什么要划分stage?

    由于在同一个stage中,没有宽依赖,都是窄依赖,后期spark的任务是以task线程方式去运行的,一个分区就对应一个task,在同一个stage中有很多可以并行运行的task。
    
    • 如何划分stage?
    1、拿到DAG有向无环图之后,从最后一个RDD往前推,首先创建一个stage,然后把当前RDD加入到本stage中。它是最后一个stage。
    2、在往前推的过程中,如果遇到窄依赖,就把该RDD加入到stage中,如果遇到宽依赖,就从宽依赖切开,当前一个stage也就结束了。
    3、然后重新创建一个新的stage,还是按照第二个步骤往前推,一直到最开始RDD。
    
    • stage与stage之间的关系?
    划分stage之后,每一个stage中有很多可以并行运行的task,后期它会把每个stage中这些可以并行运行的task封装在一个taskSet集合中。它会把taskSet集合中的task线程提交到worker节点上的executor进程中运行。
    
    • 宽依赖是划分stage的依据,后面stage中task输入数据是前面stage中task输出结果数据。
  • 相关阅读:
    73. Set Matrix Zeroes
    289. Game of Live
    212. Word Search II
    79. Word Search
    142. Linked List Cycle II
    141. Linked List Cycle
    287. Find the Duplicate Number
    260. Single Number III
    137. Single Number II
    Oracle EBS中有关Form的触发器的执行顺序
  • 原文地址:https://www.cnblogs.com/xujunkai/p/14919490.html
Copyright © 2011-2022 走看看