zoukankan      html  css  js  c++  java
  • 什么是RDD

    RDDResilient Distributed Dataset)叫做弹性分布式数据集RDD,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。

    RDD包含很多分区,由一系列分区构成,一个分区构成一个逻辑分片。

    1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。

    2)一个计算每个分区的函数。SparkRDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的compute函数会对迭代器进行复合,不需要保存每次计算的结果。

    3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。宽依赖和窄依赖。

    4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-valueRDD,才会有Partitioner,非key-valueRDDParititioner的值是NonePartitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量

    5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

    1.1. 创建RDD

    1)由一个已经存在的Scala集合创建。

    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

    2)由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFSCassandraHBase

    val rdd2 = sc.textFile("hdfs://node1.beicai.cn:9000/words.txt")

    2.3. RDD编程API

    2.3.1. Transformation

    RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

    常用的Transformation

    转换

    含义

    map(func)

    返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

    filter(func)

    返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

    flatMap(func)

    类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

    mapPartitions(func)

    类似于map,但独立地在RDD的每一个分片上运行,因此在类型为TRDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]

    mapPartitionsWithIndex(func)

    类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为TRDD上运行时,func的函数类型必须是

    (Int, Interator[T]) => Iterator[U]

    sample(withReplacement, fraction, seed)

    根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子

    union(otherDataset)

    对源RDD和参数RDD求并集后返回一个新的RDD

    intersection(otherDataset)

    对源RDD和参数RDD求交集后返回一个新的RDD

    distinct([numTasks]))

    对源RDD进行去重后返回一个新的RDD

    groupByKey([numTasks])

    在一个(K,V)RDD上调用,返回一个(K, Iterator[V])RDD

    reduceByKey(func, [numTasks])

    在一个(K,V)RDD上调用,返回一个(K,V)RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置

    aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

    sortByKey([ascending], [numTasks])

    在一个(K,V)RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)RDD

    sortBy(func,[ascending], [numTasks])

    sortByKey类似,但是更灵活

    join(otherDataset, [numTasks])

    在类型为(K,V)(K,W)RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))RDD

    cogroup(otherDataset, [numTasks])

    在类型为(K,V)(K,W)RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

    cartesian(otherDataset)

    笛卡尔积

    pipe(command, [envVars])

    coalesce(numPartitions)

    repartition(numPartitions)

    repartitionAndSortWithinPartitions(partitioner)

     

    2.3.2. Action

    动作

    含义

    reduce(func)

    通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的

    collect()

    在驱动程序中,以数组的形式返回数据集的所有元素

    count()

    返回RDD的元素个数

    first()

    返回RDD的第一个元素(类似于take(1)

    take(n)

    返回一个由数据集的前n个元素组成的数组

    takeSample(withReplacement,num, [seed])

    返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

    takeOrdered(n[ordering])

    saveAsTextFile(path)

    将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

    saveAsSequenceFile(path

    将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

    saveAsObjectFile(path

    countByKey()

    针对(K,V)类型的RDD,返回一个(K,Int)map,表示每一个key对应的元素个数。

    foreach(func)

    在数据集的每一个元素上,运行函数func进行更新。

    2.3.4练习

    启动spark-shell

    /usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-shell --master spark://node1.beicai.cn:7077

    常用transformation举例:

    Map返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

    Filter:返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

    flatMap:map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 举例:对原RDD中的每个元素x产生y个元素(从1yy为元素x的值)

    Partitions 显示分区,一般与length一起使用

    union求并集,注意类型要一致

    #intersection求交集

    Join 操作,根据key 聚合 ,有joinleftjoinrightjoin

    GroupByKey: 根据可以进行分组

    Cogroupjoin很像,但是会先在每个集合中聚合

    Cartesian 求笛卡尔积

    #spark action  

    val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)

    #collect 是把rdd转换为  数组

    rdd1.collect

    #reduce  前面这个_ 是一个累计值,后面是每一个数

    val rdd2 = rdd1.reduce(_+_)

    #count  就是有个数

    rdd1.count

    #top  

    rdd1.top(2)

    #take  

    rdd1.take(2)

    #first(similer to take(1))

    rdd1.first

    #takeOrdered  用法很相似,但是的反着的

    rdd1.takeOrdered(3)

    //想要了解更多,访问下面的地址

    http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

    2.4 RDD的依赖关系

    RDD和它依赖的父RDDs)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

    2.4.1  窄依赖

    窄依赖指的是每一个父RDDPartition最多被子RDD的一个Partition使用

    总结:窄依赖我们形象的比喻为独生子女-----也叫非shuffle算子

    2.4.2 宽依赖

    宽依赖指的是多个子RDDPartition会依赖同一个父RDDPartition

    总结:窄依赖我们形象的比喻为超生--也叫shuffle算子

    2.4.3 Lineage

    RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDDLineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

    2.4.4 RDD的缓存

    Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存个数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。

    2.4.5 RDD的缓存方式

    成就人
  • 相关阅读:
    WRF WPS预处理
    CVS安装
    Linux窗口最小化消失,任务栏上无法找到的解决方法
    NCARG安装配置出现error while loading shared libraries: libg2c.so.0问题额解决办法
    Netcdf安装
    Embedding R-generated Interactive HTML pages in MS PowerPoint(转)
    The leaflet package for online mapping in R(转)
    Some 3D Graphics (rgl) for Classification with Splines and Logistic Regression (from The Elements of Statistical Learning)(转)
    What does a Bayes factor feel like?(转)
    Weka算法介绍
  • 原文地址:https://www.cnblogs.com/pingzizhuanshu/p/9146714.html
Copyright © 2011-2022 走看看