zoukankan      html  css  js  c++  java
  • Spark学习摘记 —— RDD转化操作API归纳

    本文参考

    在阅读了《Spark快速大数据分析》动物书后,大概了解到了spark常用的api,不过书中并没有给予所有api具体的示例,而且现在spark的最新版本已经上升到了2.4.5,动物书中的spark版本还停留在1.2.0,所以就有了这篇文章,在最新的2.4.5版本下测试常用的api

    由于spark的惰性计算特性,RDD只有在第一次行动操作中被用到时才会真正进行计算,因此我打算将文章内容分为"转化操作API"和"行动操作API"两部分,同时因为pair RDD(RDD中的元素是键值对)的部分api较为特殊,所以我打算单独再写一篇文章

    本文仅介绍转化操作API,前5个api —— map()、flatMap()、filter()、distinct()、sample()是针对一个RDD的转化操作,后续的api —— union()、intersection()、subtract()、cartesion()是针对两个RDD的转化操作

    RDD行动操作API归纳:https://www.cnblogs.com/kuluo/p/12550938.html

    Pair RDD转化操作API归纳:https://www.cnblogs.com/kuluo/p/12558563.html

    Pair RDD行动操作API归纳:https://www.cnblogs.com/kuluo/p/12567221.html

    环境

    idea + spark 2.4.5 + scala 2.11.12

    RDD均通过SparkContext的parallelize()函数创建

    map()函数

    目的:

    将函数应用于RDD中的每个元素,将返回值构成新的RDD

    转化前后的RDD的元素类型可以不同(比如经典的WordCount示例中转化为了键值对元素)

    代码:

    val testList = List(1, 2, 3, 3)
    val testRdd = sc.parallelize(testList)
    testRdd.map(ele => ele * ele).foreach(ele => print(s"$ele "))

    输出:

    1 4 9 9

    更高效的操作:

    每个RDD被分为多个分区,这些分区在集群的不同节点上运行,可以使用mapPartitions()函数,将转化操作作用于每个分区的元素上,这种方法还可以为每个分区创建一个JDBC连接,而不是为每一个元素创建一个连接(此处不做示例)

    mapPartitions()函数有两个参数,第一个参数接收一个函数,和map()函数相同,第二个参数为preservesPartitioning,默认值为false,仅当我们对pair RDD进行转化操作,并且没有修改键时设置为true

    val testList = List(1, 2, 3, 3)
    val testRdd = sc.parallelize(testList)
    testRdd.mapPartitions(partition =>
      partition.map(
        ele => {
          ele * ele
        }
    )).foreach(ele => print(s"$ele "))

     

    flatMap()函数

    目的:

    将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD,我们也常常说成是"压扁"

    "压扁"这个词可能听上去不大好理解,我们提供给flatMap()的函数分别应用到RDD的那个元素上,不过返回的不是一个元素,而是一个返回值序列的迭代器,但输出的RDD不是由迭代器组成,得到的是一个包含各个迭代器可以访问的所有元素的RDD

    转化前后的RDD的元素类型不变

    代码:

    val testList = List(1, 2, 3, 3)
    val testRdd = sc.parallelize(testList)
    testRdd.flatMap(ele => {
      ele.to(5)
    }).foreach(ele => print(s"$ele "))

    我们也可以手动返回迭代器,这段代码也类似于

    val testList = List(Range(1, 6), Range(2, 6), Range(3, 6), Range(3, 6))
    val testRdd = sc.parallelize(testList)
    testRdd.flatMap(_.iterator).foreach(ele => print(s"$ele "))

    输出:

    1 2 3 4 5 2 3 4 5 3 4 5 3 4 5

     

    filter()函数

    目的:

    返回一个由传给filter()函数的元素组成的RDD,当函数返回值为true时,保留该元素,可以理解为 "被过滤"出来

    代码:

    val testList = List(1, 2, 3, 3)
    val testRdd = sc.parallelize(testList)
    testRdd.filter(ele => ele > 2).foreach(ele => print(s"$ele "))

    输出:

    3 3

    更高效的操作:

    通过过滤操作后,RDD中的元素减少,可以在filter()操作后执行coalesce()函数进行分区合并,第一个参数指定分区数,当指定的分区数大于当前RDD的分区数时不会进行合并,当前分区数不变(除非指定第二参数shuffle为true,默认为false),当指定的分区数小于当前的RDD的分区数时会进行合并,并且不会进行shuffle(尽量不要指定极端的情况,如指定合并后的分区数为1)

    val testList = List(1, 2, 3, 3)
    val testRdd = sc.parallelize(testList)
    testRdd.filter(ele => ele > 2).coalesce(5).foreach(ele => print(s"$ele "))

     

    distinct()函数

    目的:

    去重,因为会进行shuffle,所以不推荐此操作

    代码:

    val testList = List(1, 2, 3, 3)
    val testRdd = sc.parallelize(testList)
    testRdd.distinct().foreach(ele => print(s"$ele "))

    输出:

    1 2 3

     

    sample()函数

    目的:

    对RDD进行采样

    代码:

    val testList = List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
    val testRdd = sc.parallelize(testList)
    testRdd.sample(false, 0.9).foreach(ele => print(s"$ele "))

    第一个参数withReplacement指定false时,第二个参数fraction必须为 [ 0 , 1 ] 之间,表示每个元素被选中的可能性

    按照该示例,也有人将该函数理解为,从所有元素中抽取90%返回,但是在源码中我们可以看到:

    without replacement: probability that each element is chosen; fraction must be [0, 1]

    This is NOT guaranteed to provide exactly the fraction of the count of the given [[RDD]]

    因此这种理解方式我认为是错误的

    输出:

    0 1 2 3 5 6 8 9(不一定)

    疑点:

    当第一个参数withReplacement指定true时,第二个参数fraction并不要求一定小于1,源码中注释为"with replacement: expected number of times each element is chosen; fraction must be greater than or equal to 0"

    val testList = List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
    val testRdd = sc.parallelize(testList)
    testRdd.sample(true, 3).foreach(ele => print(s"$ele "))

    输出:0 0 1 1 1 1 2 2 2 3 3 3 3 4 4 4 4 4 5 5 5 5 5 5 6 6 7 7 8 9 9

    目前不大理解是如何在采样的,希望各位看官大大能在评论区发表看法哈

     

    unon()函数

    目的:

    相当于集合运算的并集,生成一个包含两个RDD中所有元素的RDD,要求两个RDD的元素类型相同

    代码:

    val testList1 = List(1, 2, 3)
    val testList2 = List(3, 4, 5)
    val testRdd1 = sc.parallelize(testList1)
    val testRdd2 = sc.parallelize(testList2)
    testRdd1.union(testRdd2).foreach(ele => print(s"$ele "))

    输出:

    1 2 3 3 4 5

     

    intersection()函数

    目的:

    相当于集合运算的交集,求两个RDD共同的元素的RDD,要求两个RDD的元素类型相同

    代码:

    val testList1 = List(1, 2, 3)
    val testList2 = List(3, 4, 5)
    val testRdd1 = sc.parallelize(testList1)
    val testRdd2 = sc.parallelize(testList2)
    testRdd1.intersection(testRdd2).foreach(ele => print(s"$ele "))

    输出:

    3

     

    subtract()函数

    目的:

    想当于集合运算的差集,移除一个RDD中的内容,要求两个RDD的元素类型相同

    代码:

    val testList1 = List(1, 2, 3)
    val testList2 = List(3, 4, 5)
    val testRdd1 = sc.parallelize(testList1)
    val testRdd2 = sc.parallelize(testList2)
    testRdd1.subtract(testRdd2).foreach(ele => print(s"$ele "))

    输出:

    1 2

     

    cartesion()函数

    目的:

    与另一个RDD的笛卡尔积

    代码:

    val testList1 = List(1, 2, 3)
    val testList2 = List('a', 'b', 'c')
    val testRdd1 = sc.parallelize(testList1)
    val testRdd2 = sc.parallelize(testList2)
    testRdd1.cartesian(testRdd2).foreach(ele => print(s"$ele "))

    输出:

    (1,a) (1,b) (1,c) (2,a) (2,b) (2,c) (3,a) (3,b) (3,c)

  • 相关阅读:
    Java面向对象之封装静态
    分布式平台Spark环境的搭建
    高斯混合模型
    异常排除: 调用方未由服务进行身份验证
    HttpClient介绍和简单使用流程
    阿里短信服务的使用流程
    笔记工具选择
    特效图文制作
    语言基础(23):智能指针
    无线通信基础(一):无线网络演进
  • 原文地址:https://www.cnblogs.com/kuluo/p/12545374.html
Copyright © 2011-2022 走看看