zoukankan      html  css  js  c++  java
  • RDD

    1、RDD概念:

      RDD 叫做 弹性分布式数据集,是spark中最基本的数据抽象。代表着一个可分区、元素可并行计算、不可变的数据集合。

    RDD特点:自动容错、位置感知性调度、可伸缩性,允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。

    2、RDD原理:

    (1)分区partition:RDD的基本组成单位,每个分区partition都会被一个计算任务task处理,并决定并行计算的粒度。分区partition在创建RDD的时候可以指定,若不指定就采用默认的分区数,默认是程序所分配到的CPU Core的数目。

    (2)计算分区数据函数compute:RDD抽象类要求其所有子类都必须实现compute方法,目的是计算该分区中的数据,compute函数负责的是父RDD分区数据到子RDD分区数据的变换逻辑。

      RDD抽象类要求其所有子类都必须实现compute方法,该方法介绍的参数之一是一个Partition对象,目的是计算该分区中的数据。以MapPartitionsRDD类为例,其compute方法如下

    override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

      MapPartitionsRDD类的compute方法调用当前RDD内的第一个父RDD的iterator方法,该方法的目的是拉取父RDD对应分区的数据,iterator方法会返回一个迭代器对象,迭代器内部存储的每一个元素即父RDD对应分区内的数据记录。
      RDD的粗粒度转换体现在map方法上,f函数是map转换操作函数,RDD会对一个分区(而不是一条一条数据记录)内的数据执行单的的操作f,最终返回包含所有经过转换过的数据记录的新迭代器,即新的分区。
      其他RDD子类的compute方法与之类似,在需要用用到父RDD的分区数据时,就会调用iterator方法,然后根据需求在得到的数据上执行相应的操作。换句话说,compute函数负责的是父RDD分区数据到子RDD分区数据的变换逻辑。

    https://blog.csdn.net/jiangpeng59/article/details/53213694

    (3)分区函数 Partitioner:spark有两种分区函数,一种是 HashPartitioner、RangePartitioner,只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

     (4)分区位置列表:这个列表存储着每个分区partition所在的hdfs数据块block的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

     3、WordCount粗图解RDD:

     

    4、RDD的创建方式

    (1)读取文件生成:

    val file = sc.textFile("/spark/hello.txt")

    val lines: RDD[String] = sc.textFile(args(0))

     

    (2) 并行化方式创建RDD

    a、parallelize()

    scala> val array = Array(1,2,3,4,5)
    array: Array[Int] = Array(1, 2, 3, 4, 5)
    
    scala> val rdd = sc.parallelize(array)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:26
    
    scala> 

    将java的list转成scala RDD

    val summaries: util.List[S3ObjectSummary] = objectListing.getObjectSummaries
    val scalaArray: Array[S3ObjectSummary] = Array()
    val newRDD: RDD[S3ObjectSummary] = sc.parallelize(summaries.toArray(scalaArray))

    b、makeRDD()

    val seq = List(("American Person", List("Tom", "Jim")), ("China Person", List("LiLei", "HanMeiMei")), ("Color Type", List("Red", "Blue")))  
    val rdd2 = sc.makeRDD(seq) 

      这两种创建方式的区别:

      当调用parallelize()方法的时候,不指定分区数的时候,使用系统给出的分区数,而调用makeRDD()方法的时候,会为每个集合对象创建最佳分区,而这对后续的调用优化很有帮助。

      两种创建方式及区别博客:https://www.iteye.com/blog/xiaotutu365-2379890

    (3)其他方式:

    读取数据库等等其他的操作。也可以生成RDD。

    RDD可以通过其他的RDD转换而来的。

    参考博客:https://www.cnblogs.com/frankdeng/p/9301653.html

    5、RDD对象,RDD的实现类

    1、MapPartitionRDD:是RDD类的一种实现,在word count程序中经常出现。

    org.apache.spark.rdd
    Class MapPartitionsRDD<U,T>

    Object   org.apache.spark.rdd.RDD<U>     org.apache.spark.rdd.MapPartitionsRDD<U,T>
    public class MapPartitionsRDD<U,T> extends RDD<U>

    All Implemented Interfaces: java.io.Serializable, Logging

    2、ParallelCollectionRDD:利用并行化方式,将集合创建为RDD的时候生的的对象

    public class ParallelCollectionRDD<T> extends RDD<T>

    参考官网MapPartitionsRDDhttp://spark.apache.org/docs/1.2.2/api/java/org/apache/spark/rdd/MapPartitionsRDD.html

    各种RDD的实现:http://spark.apache.org/docs/1.2.2/api/java/org/apache/spark/rdd/package-summary.html

    6、RDD的Lineage血统

    1、RDD血统:数据容错,发生错误,可以进行重算恢复。Lineage记录的是特定数据的 Transformation 转换操作。

      为了保证RDD中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。

      相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升

    2、宽窄依赖的Lineage容错

    对与Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上其祖先追溯看是否可以重试(这就是lineage,血统的意思)。

    Narrow Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。

    在RDD计算,通过checkpoint进行容错,做checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错,默认是logging the updates方式,通过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDD的lineage(血统)来重新计算生成丢失的分区数据。


    参考博客:https://blog.csdn.net/u013063153/article/details/73865123

  • 相关阅读:
    K3/Cloud点按钮打开单据,列表,动态表单,简单账表和直接Sql报表示例
    K3/Cloud点击按钮打开第三方URL
    List排序
    ClientScriptManager与ScriptManager向客户端注册脚本的区别
    NameValueCollection详解
    Delegate。。
    asp.net gridview ...
    asp json
    合并两个rs结果输出
    asp 数组
  • 原文地址:https://www.cnblogs.com/guoyu1/p/12091197.html
Copyright © 2011-2022 走看看