zoukankan      html  css  js  c++  java
  • Spark计算模型RDD

    RDD弹性分布式数据集

    RDD概述

      RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将数据缓存在内存中,后续的查询能够重用这些数据,这极大地提升了查询速度。

      Resilient:RDD中的数据可以存储在内存中或者磁盘中。 

      Distributed:RDD中的数据是分布式存储的,可用于分布式计算。

      Dataset:一个数据集合,用于存放数据的。

    (1)    传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算中要进行大量的磁盘IO操作。RDD正是解决这一缺点的抽象方法。

    (2)     RDD是Spark提供的最重要的抽象的概念,它是一种具有容错机制的特殊集合,可以分布在集群的节点上,以函数式编程来操作集合,进行各种并行操作。可以把RDD的结果数据进行缓存,方便进行多次重用,避免重复计算。

    RDD是一种具有容错性、基于内存计算的抽象方法,RDD是Spark Core的底层核心,Spark则是这个抽象方法的实现。

    RDD五大属性

      * - A list of partitions
        一个rdd有多个分区
      * - A function for computing each split
        作用在每一个分区中函数
      * - A list of dependencies on other RDDs
        一个rdd会依赖于很多其他RDD,这里就涉及到rdd的依赖关系
      * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
        (可选项) 针对于kv类型的rdd才会有分区函数(必须要产生shuffle),分区函数就决定了数据会流入到子rdd的那些分区中
      * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
        (可选项)一个列表,存储每个Partition的优先位置.数据的本地性,数据位置最优(spark在进行任务分配的时候,会优先考虑存有数据的worker节点来进行任务计算)
        一个文件有3个block块
        block1-----》node1
        block2-----》node2
        block3-----》node3
               node4
               node5

     

    创建RDD

    • 1、通过sparkContext调用parallelize,从一个已经存在的scala集合构建

      • sc.parallelize(List(1,2,3,4,5))

    • 2、读取外部数据源

      • val rdd1=sc.textFile("/words.txt")

    • 3、从一个已经存在rdd经过对应的算子操作生成新的rdd

      • val rdd2=rdd1.flatMap(_.split(" "))

    RDD编程API

    RDD的算子分类

      Transformation(转换):根据数据集创建一个新的数据集,计算后返回一个新RDD;例如:一个rdd进行map操作后生了一个新的rdd

      Action(动作):对rdd结果计算后返回一个数值value给驱动程序;

      例如:collect算子将数据集的所有元素收集完成返回给驱动程序。

    Transformation

      RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

    常用的Transformation:

    转换

    含义

    map(func)

    返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

    filter(func)

    返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

    flatMap(func)

    类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

    mapPartitions(func)

    类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]

    mapPartitionsWithIndex(func)

    类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是

    (Int, Interator[T]) => Iterator[U]

    union(otherDataset)

    对源RDD和参数RDD求并集后返回一个新的RDD

    intersection(otherDataset)

    对源RDD和参数RDD求交集后返回一个新的RDD

    distinct([numTasks]))

    对源RDD进行去重后返回一个新的RDD

    groupByKey([numTasks])

    在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD

    reduceByKey(func, [numTasks])

    在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置

    sortByKey([ascending], [numTasks])

    在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

    sortBy(func,[ascending], [numTasks])

    与sortByKey类似,但是更灵活

    join(otherDataset, [numTasks])

    在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

    cogroup(otherDataset, [numTasks])

    在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

    coalesce(numPartitions)

    减少 RDD 的分区数到指定值。

    repartition(numPartitions)

    重新给 RDD 分区

    repartitionAndSortWithinPartitions(partitioner)

     

    重新给 RDD 分区,并且每个分区内以记录的 key 排序

    Action

      触发整个任务真正的运行

    动作

    含义

    reduce(func)

    reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。

    collect()

    在驱动程序中,以数组的形式返回数据集的所有元素

    count()

    返回RDD的元素个数

    first()

    返回RDD的第一个元素(类似于take(1))

    take(n)

    返回一个由数据集的前n个元素组成的数组

    takeOrdered(n, [ordering])

    返回自然顺序或者自定义顺序的前 n 个元素

    saveAsTextFile(path)

    将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

    saveAsSequenceFile(path) 

    将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

    saveAsObjectFile(path) 

    将数据集的元素,以 Java 序列化的方式保存到指定的目录下

    countByKey()

    针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

    foreach(func)

    在数据集的每一个元素上,运行函数func

    foreachPartition(func)

    在数据集的每一个分区上,运行函数func

    RDD常用的算子操作

      Spark Rdd的所有算子操作,请见《sparkRDD函数详解》https://www.cnblogs.com/jifengblog/p/9369258.html

      启动spark-shell 进行测试:

    spark-shell --master spark://node1:7077 

    练习1:map、filter

    //通过并行化生成rdd

    val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))

    //对rdd1里的每一个元素乘2然后排序

    val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)

    //过滤出大于等于5的元素

    val rdd3 = rdd2.filter(_ >= 5)

    //将元素以数组的方式在客户端显示

    rdd3.collect

    练习2:flatMap

    val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))

    //将rdd1里面的每一个元素先切分在压平

    val rdd2 = rdd1.flatMap(_.split(" "))

    rdd2.collect

    练习3:交集、并集

    val rdd1 = sc.parallelize(List(5, 6, 4, 3))

    val rdd2 = sc.parallelize(List(1, 2, 3, 4))

    //求并集

    val rdd3 = rdd1.union(rdd2)

    //求交集

    val rdd4 = rdd1.intersection(rdd2)

    //去重

    rdd3.distinct.collect

    rdd4.collect

    练习4:join、groupByKey

    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))

    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

    //求join

    val rdd3 = rdd1.join(rdd2)

    rdd3.collect

    //求并集

    val rdd4 = rdd1 union rdd2

    rdd4.collect

    //按key进行分组

    val rdd5=rdd4.groupByKey

    rdd5.collect

    练习5:cogroup

    val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))

    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("jim", 2)))

    //cogroup

    val rdd3 = rdd1.cogroup(rdd2)

    //注意cogroup与groupByKey的区别

    rdd3.collect

    练习6:reduce

    val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))

    //reduce聚合

    val rdd2 = rdd1.reduce(_ + _)

    rdd2.collect

    练习7:reduceByKey、sortByKey

    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))

    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))

    val rdd3 = rdd1.union(rdd2)

    //按key进行聚合

    val rdd4 = rdd3.reduceByKey(_ + _)

    rdd4.collect

    //按value的降序排序

    val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))

    rdd5.collect

    练习8:repartition、coalesce

    val rdd1 = sc.parallelize(1 to 10,3)

    //利用repartition改变rdd1分区数

    //减少分区

    rdd1.repartition(2).partitions.size

    //增加分区

    rdd1.repartition(4).partitions.size

    //利用coalesce改变rdd1分区数

    //减少分区

    rdd1.coalesce(2).partitions.size

      注意:repartition可以增加和减少rdd中的分区数,coalesce只能减少rdd分区数,增加rdd分区数不会生效。

    RDD的依赖关系

      RDD和它依赖的父RDD的关系有两种不同的类型,即窄依赖(narrow dependency)宽依赖(wide dependency)

    窄依赖

      窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用

      总结:窄依赖我们形象的比喻为独生子女

    宽依赖

      宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition

      总结:宽依赖我们形象的比喻为超生

    Lineage(血统)

      RDD只支持粗粒度转换,即只记录单个块上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区

    RDD的缓存

      Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或者缓存数据集。当持久化某个RDD后,每一个节点都将把计算分区结果保存在内存中,对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。

      RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

      通过查看源码发现cache最终也是调用了persist方法默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

    DAG的生成

      DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就形成了DAG,根据RDD之间依赖关系的不同将DAG划分成不同的Stage(调度阶段)。对于窄依赖,partition的转换处理在一个Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据

    Spark任务调度

      各个RDD之间存在着依赖关系,这些依赖关系就形成有向无环图DAG,DAGScheduler对这些依赖关系形成的DAG进行Stage划分,划分的规则很简单,从后往前回溯,遇到窄依赖加入本stage,遇见宽依赖进行Stage切分。完成了Stage的划分。DAGScheduler基于每个Stage生成TaskSet,并将TaskSet提交给TaskScheduler。TaskScheduler 负责具体的task调度,最后在Worker节点上启动task。

    DAGScheduler

    (1)DAGScheduler对DAG有向无环图进行Stage划分。

    (2)记录哪个RDD或者 Stage 输出被物化(缓存),通常在一个复杂的shuffle之后,通常物化一下(cache、persist),方便之后的计算。

    (3)重新提交shuffle输出丢失的stage(stage内部计算出错)给TaskScheduler

    (4)将 Taskset 传给底层调度器

      a)– spark-cluster TaskScheduler

      b)– yarn-cluster YarnClusterScheduler

      c)– yarn-client YarnClientClusterScheduler

    TaskScheduler

    (1)为每一个TaskSet构建一个TaskSetManager 实例管理这个TaskSet 的生命周期

    (2)数据本地性决定每个Task最佳位置

    (3)提交 taskset( 一组task) 到集群运行并监控

    (4)推测执行,碰到计算缓慢任务需要放到别的节点上重试

    (5)重新提交Shuffle输出丢失的Stage给DAGScheduler

    RDD容错机制之checkpoint

     checkpoint是什么

    1)Spark 在生产环境下经常会面临transformation的RDD非常多(例如一个Job中包含1万个RDD)或者具体transformation的RDD本身计算特别复杂或者耗时(例如计算时长超过1个小时),这个时候就要考虑对计算结果数据持久化保存;

    2)Spark是擅长多步骤迭代的,同时擅长基于Job的复用,这个时候如果能够对曾经计算的过程产生的数据进行复用,就可以极大的提升效率;

    3)如果采用persist把数据放在内存中,虽然是快速的,但是也是最不可靠的;如果把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏,系统管理员可能清空磁盘

    4)Checkpoint的产生就是为了相对而言更加可靠的持久化数据,在Checkpoint的时候可以指定把数据放在本地,并且是多副本的方式,但是在生产环境下是放在HDFS上,这就天然的借助了HDFS高容错、高可靠的特征来完成了最大化的可靠的持久化数据的方式;

      假如进行一个1万个算子操作,在9000个算子的时候persist,数据还是有可能丢失的,但是如果checkpoint,数据丢失的概率几乎为0。

    checkpoint原理机制

    (1)RDD使用cache机制从内存中读取数据,如果数据没有读到,会使用checkpoint机制读取数据。此时如果没有checkpoint机制,那么就需要找到父RDD重新计算数据了,因此checkpoint是个很重要的容错机制。checkpoint就是对于一个RDD chain(链)如果后面需要反复使用某些中间结果RDD,可能因为一些故障导致该中间数据丢失,那么就可以针对该RDD启动checkpoint机制,使用checkpoint首先需要调用sparkContext的setCheckpointDir方法,设置一个容错文件系统目录,比如hdfs,然后对RDD调用checkpoint方法。之后在RDD所处的job运行结束后,会启动一个单独的job来将checkpoint过的数据写入之前设置的文件系统持久化,进行高可用。所以后面的计算在使用该RDD时,如果数据丢失了,但是还是可以从它的checkpoint中读取数据,不需要重新计算。

    (2)persist或者cache与checkpoint的区别在于,前者持久化只是将数据保存在BlockManager中但是其lineage是不变的,但是后者checkpoint执行完后,rdd已经没有依赖RDD,只有一个checkpointRDD,checkpoint之后,RDD的lineage就改变了persist或者cache持久化的数据丢失的可能性更大,因为可能磁盘或内存被清理,但是checkpoint的数据通常保存到hdfs上,放在了高容错文件系统。

    总结:

    • checkpoint就是提供了一个相对而言更加可靠的持久化数据方式

    • checkpoint使用

      • sc.setCheckpointDir("hdfs文件目录")

      • rdd1.checkpoint

      • 后面也需要一个action算子操作,才会触发checkpoint数据写入到HDFS

    • cache/persist/checkpoint

      • 区别

        • 三者都可以对数据进行持久化保存

        • cache、persist他们不会改变rdd的血统

        • checkpoint会改变血统

    • 当前一个rdd数据丢失了,如何得到

      • 首先看一下你有没有设置cache,如果有,直接从内存中获取得到,如果没有,接下来看一下你有没有设置checkpoint操作,如果有,直接获取得到,如果也没有,那么这个时候只能够通过lineage血统重新计算恢复。

    Spark运行架构

     Spark运行基本流程

     

    1)      构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;

    2)     资源管理器分配Executor资源并启动Executor,Executor运行情况将随着心跳发送到资源管理器上;

    3)    SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor。

    4)    Task在Executor上运行,运行完毕释放所有资源。

    Spark运行架构特点

    • 每个Application获取专属的executor进程,该进程在Application期间一直驻留,并以多线程方式运行tasks。
    • Spark任务与资源管理器无关,只要能够获取executor进程,并能保持相互通信就可以了。
    • 提交SparkContext的Client应该靠近Worker节点(运行Executor的节点),最好是在同一个Rack里,因为Spark程序运行过程中SparkContext和Executor之间有大量的信息交换;如果想在远程集群中运行,最好使用RPC将SparkContext提交给集群,不要远离Worker运行SparkContext。
    • Task采用了数据本地性和推测执行的优化机制。
  • 相关阅读:
    关于WP7的Loaded事件[转]
    皮皮书屋的变态验证码
    近期学习内容for mobile
    一个js问题引发的同时吐槽
    powerdesigner 概念模型转物理模型时的丢表问题
    偶的处女文近期学习计划
    web布局实现圆角,兼容所有的浏览器
    最近面试asp.net碰到的一些题
    网站推广心得
    兼容ie6的png格式图片的背景透明问题
  • 原文地址:https://www.cnblogs.com/jifengblog/p/9369233.html
Copyright © 2011-2022 走看看