zoukankan      html  css  js  c++  java
  • spark-RDD缓存,checkpoint机制,有向无环图,stage

    spark-RDD缓存,checkpoint机制,有向无环图,stage

    1.RDD依赖关系

    • RDD依赖关系有2种不同类型,窄依赖和宽依赖。

    • 窄依赖(narrow dependency):是指每个父RDD的Partition最多被子RDD一个Partition使用。就好像独生子女一样。窄依赖的算子包括:map,filter,flatMap等。如下图 :1对1 , 多对1

    • 宽依赖(wide dependency):多个子RDD的Partition会依赖统一个父RDD的Partition。就好像超生。宽依赖常见算子包括:reduceByKey,groupBy,groupByKey,sortBy,sortByKey等。 宽依赖会产生shuffle,如下图: 多对多,1对多

    • 相比于宽依赖,窄依赖对优化很有利 ,主要基于以下两点:
    1.宽依赖往往对应着shuffle操作( 多对多,汇总,多节点),需要在运行过程中将同一个父RDD的分区传入到不同的子RDD分区中,中间可能涉及多个节点之间的数据传输;而窄依赖的每个父RDD的分区只会传入到一个子RDD分区中,通常可以在一个节点内完成转换。
    
    2.当RDD分区丢失时(某个节点故障),spark会对数据进行重算。
    	a. 对于窄依赖,由于父RDD的一个分区只对应一个子RDD分区,这样只需要重算和子RDD分区对应的父RDD分区即可,所以这个重算对数据的利用率是100%的;
    	b. 对于宽依赖,重算的父RDD分区对应多个子RDD分区,这样实际上父RDD 中只有一部分的数据是被用于恢复这个丢失的子RDD分区的,另一部分对应子RDD的其它未丢失分区,这就造成了多余的计算;更一般的,宽依赖中子RDD分区通常来自多个父RDD分区,极端情况下,所有的父RDD分区都要进行重新计算。
    

    2.lineage(血统)

    • 血统就是将RDD与RDD之间依赖关系进行记录,如果当某个RDD分区数据丢失后,可以通过这种记录下来的关系进行重新计算,恢复得到的数据,这是spark带的容错机制。

    3.RDD缓存

    • 我们后期可以把RDD数据缓存起来,后续其他的job需要用到该RDD的结果数据,可以直接从缓存得到避免重复计算。魂村可以加快数据访问。

    • RDD设置缓存方式有2种:

      1. cache: 默认把数据存储到内存中,本质是调用presist() 默认存储级别是MEMORY_ONLY
      2. presist:可以把数据保存在内存或者磁盘中,它内部可以有封装缓存级别,这些缓存级别都被定义在一个Object中(StorageLevel中设置存储种类)
    • 进入 spark shell 演示

      spark-shell --master spark://1.0.0.155:7077 --executor-memory 1g --total-executor-cores 2
      
    • cache使用

      # 从hdfs读取
      scala> val rdd1 = sc.textFile("/u.txt")
      # 计入缓存
      scala> rdd1.cache
      # 此时查看http://linux01:4040/Storage/ 是没有任何缓存信息,这是因为在使用cache时候需要action触发
      scala> rdd1.collect
      # 可以看到如下图
      

      ![image-20210622111814117](C:UsersXu jkAppDataRoamingTypora ypora-user-imagesimage-20210622111814117.png)

      # 你可以继续进行算子操作
      scala> val rdd2 = rdd1.flatMap(_.split(" "))
      # 通过触发action,从缓存拿取数据,执行算子操作
      scala> rdd2.collect
      

      当退出spark-shell缓存也随之消失

    • presist使用

      # 虽然设置内存和磁盘的级别,但保存数据量较小,是不会分配到磁盘上的。
      scala> rdd2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER_2)
      scala> rdd2.collect
      # 如果想直接保存到磁盘,更改级别。
      scala> val rdd3 = rdd2.map(x=>(x,1))
      scala> rdd3.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
      scala> rdd3.collect
      
    • 从rdd1->rdd2->rdd3-> ..rddn 每一个步骤,如果设置缓存它会从缓存中拿取数据,而不是通过计算后再执行下一个算子操作。

    • 缓存之后生命周期

      当任务结束,缓存数据也随之消失
      
    • 缓存数据的清除

      1.自动清除
      	程序执行完毕,自动清除
      2.手动清除
      	scala> rdd1.unpersist(true) // 默认为true,表示阻塞删除
      
    • 关于缓存设置应用场景

      1.当某个RDD的数据被使用多次,可以设置缓存
      	val rdd1 = sc.textFile("words.txt")
      	rdd1.cache
      	val rdd2=rdd1.flatMap(_.split(" "))
      	val rdd3=rdd1.map((_,1))
      	rdd2.collect
      	rdd3.collect
      2.当某个RDD它是经过大量复杂算子操作,计算周期时间很长,将它设置缓存。
      

    4.RDD的checkpoint机制

    • 当对RDD数据进行缓存,保存在内存或磁盘中,后续就可以直接从内存或者磁盘中获取得到,但是不安全。

      • cache:在内存中,虽然后期操作速度比较快,直接从内存中获取,但是不安全,比如服务器突然挂掉,或者进程终止,它都会导致数据丢失。
      • persist: 它可以保存数据到磁盘中,虽然速度慢,相对cache安全一点,但也不是特别安全,假如系统管理员误操作删除导致磁盘损坏,导致数据丢失。
    • 而checkpoint机制它提供一种相对更加可靠数据持久方式,它把数据保存在分布式文件系统上,比如HDFS上,它利用HDFS高可用,高容错(多副本)来保证数据安全性。

    • checkpoint的使用

    # hdfs创建checkponit目录
    scala> sc.setCheckpointDir("/checkpoint")
    # 此时查看hdfs 多了一个checkpoint
    [root@linux01 data]# hdfs dfs -ls /
    drwxr-xr-x   - root supergroup          0 2021-06-22 13:18 /checkpoint
    # 读出文件
    scala> val rdd1=sc.textFile("/u.txt")
    # 对rdd1进行checkpoint
    scala> rdd1.checkpoint
    # 算子操作
    scala> val rdd2 = rdd1.flatMap(_.split(" "))
    # 触发action 才会触发checkpoint
    scala> rdd2.collect
    # 查看hdfs保存文件,可以看到多了part-00000和part-00001两个文件
    [root@linux01 data]# hdfs dfs -ls /checkpoint/e5a6cb9f-373c-44ec-8730-7eda0e6067dc/rdd-3
    	part-00000
    	part-00001
    
    • http://linux01:4040/jobs/ job任务看到会有2个job任务完成,其中一个就是checkpoint,一个是job任务。

    5.cache , presist,checkpoint三者之间区别

    cache和presist分别可以把RDD数据缓存在内存或者本地磁盘,后续要触发cache和presist持久化操作。需要有一个action,它不会开启其他新的job,一个action对应一个job。在运行的过程到程序结束后,对应的缓存数据就自动消失了。它不会改变RDD的依赖关系。
    
    checkpoint:可以把数据持久写入hdfs上,后续要触发checkpoint操作,需要有一个action、任务在运行过程到程序结束之后,对应缓存数据不会消失,它会改变rdd的依赖关系。后续数据丢失了不能再通过血统进行数据恢复。
    	checkpoint操作要执行需要一个action操作,一个action操作对应后续的一个job,该job执行完成之后,它会再次单独开启另一个job来执行rdd1.checkpoint操作。
    	
    所以checkpoint执行action会开启2个job,而cache,presist 只会开启1个job
    
    • 数据恢复顺序:
    cache -> checkpoint -> 重新计算
    

    6.有向无环图生成

    • DAG(Directed Acyclic Graph)叫做有向无环图(有方向,无闭环,代表着数据的流向),原始RDD通过一系列的转换形成了DAG

    • 当我们执行一个单词统计的job任务时候,登录到:http://linux01:4040/jobs/可以查看到DAG图,如下图:

    sc.textFile("/u.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
    

    • 该方向就是RDD算子操作顺序,这里它把DAG图划分成了不同的stage(调度阶段)。

    7.stage是什么?怎么划分

    • stage表示不同的调度阶段,一个spark中的job 会对应很多个stage(调度阶段)。

    • 为什么要划分stage?

    由于在同一个stage中,没有宽依赖,都是窄依赖,后期spark的任务是以task线程方式去运行的,一个分区就对应一个task,在同一个stage中有很多可以并行运行的task。
    
    • 如何划分stage?
    1、拿到DAG有向无环图之后,从最后一个RDD往前推,首先创建一个stage,然后把当前RDD加入到本stage中。它是最后一个stage。
    2、在往前推的过程中,如果遇到窄依赖,就把该RDD加入到stage中,如果遇到宽依赖,就从宽依赖切开,当前一个stage也就结束了。
    3、然后重新创建一个新的stage,还是按照第二个步骤往前推,一直到最开始RDD。
    
    • stage与stage之间的关系?
    划分stage之后,每一个stage中有很多可以并行运行的task,后期它会把每个stage中这些可以并行运行的task封装在一个taskSet集合中。它会把taskSet集合中的task线程提交到worker节点上的executor进程中运行。
    
    • 宽依赖是划分stage的依据,后面stage中task输入数据是前面stage中task输出结果数据。
  • 相关阅读:
    MSMQ实现自定义序列化存储
    流程部署的查询、删除、流程
    使用trello管理你的项目
    CentOS 6.4 编译安装 gcc 4.8.1
    jquery实现无限滚动瀑布流实现原理
    系统分析员备考之经济管理篇(二)
    架构、架构师和架构设计
    ASP.net Web API综合示例
    c中函数参数传递
    准备抽象NHibernate和EntityFramework
  • 原文地址:https://www.cnblogs.com/xujunkai/p/14919490.html
Copyright © 2011-2022 走看看