zoukankan      html  css  js  c++  java
  • Spark RDD编程核心

    一句话说,在Spark中对数据的操作其实就是对RDD的操作,而对RDD的操作不外乎创建、转换、调用求值。

    什么是RDD

      RDD(Resilient Distributed Dataset),弹性分布式数据集。

      它定义了如何在集群的每个节点上操作数据的一系列命令,而不是指真实的数据,Spark通过RDD可以对每个节点的多个分区进行并行的数据操作。

      之所以称弹性,是因为其有高容错性。默认情况下,Spark会在每一次行动操作后进行RDD重计算,如想在多个行动操作中使用RDD,可以将其缓存(以分区的方式持久化)到集群中每台机器的内存或者磁盘中。当一台机器失效无法读取RDD数据时,可通过此特性重算丢掉的分区,从而恢复数据,此过程对用户透明。

      

    如何创建RDD

      可通过以下几种方式创建RDD:

    • 通过读取外部数据集 (本地文件系统/HDFS/...)
    • 通过读取集合对象    (List/Set/...)
    • 通过已有的RDD生成新的RDD    

    Spark对RDD操作方式

      Spark对RDD的操作分两种,即转换操作(Transformation)和行动操作(Action)。

      • 转换操作:不触发实际计算,返回一个新的RDD,例如对数据的匹配操作map和过滤操作filter,惰性求值
      • 行动操作:会触发实际计算,会向驱动器返回结果或将结果写到外部系统。

      如何区别两种操作?

        看返回值类型,返回RDD类型的为转换操作,返回其他数据类型的是行动操作。

      何为惰性求值?

        Spark在执行转换操作时不会触发实际的计算,而等到执行行动操作时才会实际计算。

      为何会有惰性求值?

        我们应把RDD看做是Spark通过转换操作后构建出来的一套定义如何计算数据的指令列表,而非存放着数据的数据集。

        如果每经过一次转换操作都触发计算,将会有系统负担,而惰性求值会将多个转换操作合并到一起,抵消不必要的步骤后,在最后必要的时才进行运算,获得性能的提升同时又减轻系统运算负担。如涉及多次转换操作时情景需求如下,我想找  转换1:深圳市的人  > 转换2:南山区的人> 转换3:腾讯大厦的人    ==惰性求值、合并操作==>腾讯大厦的人。 

    转换操作

      1. 基本转换操作,以{1,2,3,3}为例,f代表函数

    函数名 目的 示例 结果
    map(f) 将函数应用于每一个元素中,返回值构成新的RDD rdd.map(x=>x+1) {2,3,4,4}
    flatMap(f) 将函数应用于每一个元素中,并把元素中迭代器内所有内容一并生成新的RDD,常用于切分单词 rdd.flatMap(x=>x.to(3)) {1,2,3,,2,3,3,3}
    filter(f) 过滤元素 rdd.filter(x=>x!=1)  {2,3,3}
    distinct() 元素去重 rdd.distinct()  {1,2,3}
    sample( withReplacement, fraction , [seed] ) 元素采样,以及是否需要替换 rdd.sample(false,0.5)  不确定值,不确定数目

      2. 集合转换操作,以{1,2,3}{3,4,5}为例,rdd代表已生成的RDD实例

    函数名 目的 示例 结果
    union(rdd) 合并两个RDD所有元素(不去重) rdd1.union(rdd2) {1,2,3,3,4,5}
    intersection(rdd) 求两个RDD的交集 rdd1.intersection(rdd2) {3}
    substract(rdd) 移除在RDD2中存在的RDD1元素 rdd1.substract(rdd2) {1,2}
    cartesian(rdd) 求两个RDD的笛卡尔积 rdd1.cartesian(rdd2) {(1,3),(1,4),(1,5)...(3,5)}

     

    行动操作

       基本行动操作,以{1,2,3,3}为例,f代表函数

    函数名 目的 示例 结果
    collect() 收集并返回RDD中所有元素 rdd.collect() {1,2,3,3}
    count() RDD中元素的个数 rdd.count() 4
    countByValue() 各元素出现的个数 rdd.countByValue() {(1,1),(2,1),(3,2)}
    take(num) 从RDD中返回num个元素 rdd.take(2) {1,2}
    top(num) 返回最前面的num个元素 rdd.take(2) {3,3}
    takeOrdered(num,[ordering]) 按提供的顺序返回前num个元素 rdd.takeOrdered(2,[myOrdering]) {3,3}
    takeSample(withReplacement, num ,[seed]) 返回任意元素 takeSample(false,1) 不确定值
    reduce(f) 并行整合RDD中所有元素,返回一个同一类型元素 rdd.reduce((x,y) => x+y ) 9
    fold(zeroValue)(f) 与reduce一样,不过需要提供初始值 rdd.fold(0)((x,y) => x+y ) 9
    aggregate(zeroValue)(seqOp , combOp) 与reduce相似,不过返回不同类型的元素

    rdd. aggregate(( 0, 0)) ((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))

    {9,4}
    foreach(f) 给每个元素使用给定的函数,结果不需发回本地 rdd.foreach(f)

     *后面有详解

    q1: take()、top()和takeOrdered() 的区别,顺序在其中如何理解

    take(): 用于获取RDD中从0到num-1下标的元素,不排序。

    scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
     
    scala> rdd1.take(1)
    res0: Array[Int] = Array(10)                                                    
     
    scala> rdd1.take(2)
    res1: Array[Int] = Array(10, 4)
    

    top():用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。

    scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
     
    scala> rdd1.top(1)
    res2: Array[Int] = Array(12)
     
    scala> rdd1.top(2)
    res3: Array[Int] = Array(12, 10)
     
    //指定排序规则
    scala> implicit val myOrd = implicitly[Ordering[Int]].reverse
    myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499ef
     
    scala> rdd1.top(1)
    res4: Array[Int] = Array(2)
     
    scala> rdd1.top(2)
    res5: Array[Int] = Array(2, 3)
    

    takeOrdered():按自然顺序输出

    scala> val rdd = sc.makeRDD(Seq(3,2,5,1,4))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
    
    scala> val result = rdd.takeOrdered(2)
    result: Array[Int] = Array(1, 2)

    q2: fold()详解

    fold(zeroValue)(fun), 使用zeroValue和每个分区的元素进行聚合运算,最后各分区结果和zeroValue再进行一次聚合运算。

    object LFold {
      def main(args:Array[String]) {
        val conf = new SparkConf ().setMaster ("local").setAppName ("app_1")
        val sc = new SparkContext (conf)
        val listRDD = sc.parallelize(List(1,2,3,4,5),1)
        val sum= listRDD.fold(3)((x,y)=>printLn(x,y))
        println("Sum -> "+sum)
    
      }
      def printLn(param : (Int,Int) ): Int = {
        println("=============="+param.toString()+"==============")
        val ret : Int = param._1+param._2
        ret
      }
    }
    

    运行结果:

    ==============(3,1)==============
    ==============(4,2)==============
    ==============(6,3)==============
    ==============(9,4)==============
    ==============(13,5)==============

    ==============(3,18)==============

    Sum -> 21

    解析:

    a. 第一次执行相加时,此时无汇总值,所以取默认值3作补充加法。(3,1)

    b. 随后逐个元素相加,至最后一个元素5。(13,5)

    c. 汇总相加所有的值,此时无汇总值,所以取默认值3作补充加法。(3,18)

    d. 最后相加 3+1+2+3+4+5+3 = 21

    为方便理解再举例子:

    val listRDD = sc.parallelize(List(1,2,3,4,5),2)
    val sum= listRDD.fold(3)((x,y)=> x + y ) 

    2个分区,zeroValue为3 ,经过3次聚合操作,结果应为24 ,详细分析如图

    q3: aggregate()详解

    语法格式: aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U 

    与fold相近,aggregate函数将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

    参数解析:

    @param zeroValue the initial value for the accumulated result of each partition for the
    `seqOp` operator, and also the initial value for the combine results from
    different partitions for the `combOp` operator - this will typically be the
    neutral element (e.g. `Nil` for list concatenation or `0` for summation)
    //zeroValue是为seqOp函数定义的每个分区计算结果的初始值,也是为combOp函数定义的不同分区的聚合值的初始值。
    //--这通常是一个典型的中间元素(如:'Nil')代表字符串拼接操作,'0'代表求和操作。

    @param seqOp an operator used to accumulate results within a partition
    //用于计算毎个分区的元素聚合的结果
    @param combOp an associative operator used to combine results from different partitions
    //用于计算不同分区聚合的结果

     求{1,2,3,3}的平均值:

    val rdd= sc.parallelize(List(1,2,3,3,4,5),2)
    val result = rdd. aggregate((1, 0)) (
       (x, y) => (x._1 + y, x._2 + 1),      // 单个分区(单个分区元素相加总数,单个分区元素个数相加)
       (x, y) => (x._1 + y._1, x._2 + y._2) // 不同分区(所有分区元素总数相加,所有分区元素个数相加)
    )
    val avg = result._1 / result._2. toDouble  //avg = 3.5
    

    解析:

    元组1,求出单个分区里的元素聚合的总和以及元素个数

    元组2,把不同分区里的元素聚合的总和以及元素个数进行最后的聚合

    分析如图

     

    RDD持久化

      持久化即以序列化的形式缓存。

      如上所述,RDD转换操作会惰性求值,如果多次访问同一个RDD调用行动操作,Spark每次都要重算RDD,消耗极大。

      为了避免多次计算同一个RDD,可以对数据进行持久化。  

      出于不同目的和场景需求,我们可选择的持久化级别有:

    级别 使用空间 CPU时间 是否在内存中 是否在磁盘上
    MEMORY_ONLY
    MEMORY_ONLY_SER
    MEMORY_AND_DISK 部分 部分
    MEMORY_AND_DISK_SER 部分 部分
    DISK_ONLY

      在Scala中的调用方法为

    val result = input. map( x => x * x) 
    result. persist( StorageLevel.DISK_ ONLY) 
    println( result. count()) 
    println( result. collect(). mkString(","))
    

      如果要缓存的数据太多,内存放不下,Spark会自动使用LRU(最近最小使用)的缓存策略把最老的分区从内存中移除。

      最后,可调用rdd.unpersist()方法手动移除RDD缓存。 

      详细内容参考《Spark快速大数据分析第三章》

     

  • 相关阅读:
    FileUpload组件
    国际化
    dbutils
    BeanUtils
    c3p0连接池]
    JDBC代码模板
    JDBC基础与连接sql2012
    JSP以及JSP解析原理
    Tomcat使用,部署
    JAVA---反射
  • 原文地址:https://www.cnblogs.com/yongjian/p/6387577.html
Copyright © 2011-2022 走看看