zoukankan      html  css  js  c++  java
  • RDD 编程(一)

    1. RDD 的创建

    创建 RDD 有三种方式:

    • 从集合中创建
    • 从外部存储创建
    • 从其他 RDD 转换得到新的 RDD

    1.1 从集合中创建

    1、使用 parallelize 函数:

    import org.apache.spark.sql.SparkSession
    
    object CreateRdd {
      def main(args: Array[String]): Unit = {
        val session = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
        val sc = session.sparkContext
        val arr = Array(10, 20, 30, 40, 50, 60)
        val rdd1 = sc.parallelize(arr)
    
        rdd1.collect().foreach(println)
    
        sc.stop()
      }
    }
    

    注意:parallelize 函数第二个参数可以指定分区数目

    2、使用 makeRDD 函数:

    val rdd1 = sc.makeRDD(arr)
    

    1.2 从外部存储创建

    外部存储包括:

    • 任意 Hadoop 支持的存储数据源来创建分布式数据集
    • 本地文件系统, HDFS, Cassandra, HVase, Amazon S3 等等
    val rdd1 = sc.textFile(URL)
    
    • URL 可以是本地系统文件,但是必须每个节点都要存在这个路径
    • 可以是 hdfs 路径:hdfs://...
    • 所有基于文件的方法,支持目录、压缩文件、通配符(*):textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz")
    • textFile 第二个参数,表示分区数,默认情况下每个块对应一个分区(对 HDFS 来说,块大小默认是 128M,HDFS 文件有多少个就有多少个分区),可以传递一个大于块数的分区数,但是不能传递一个比块数小的分区数

    1.3 从其他 RDD 转换为新的 RDD

    1.4 RDD 操作

    RDD 支持两种操作:transformation、action 操作:

    • 转换操作 transformation:从一个已知的 rdd 中创建处理一个新的 rdd,转换操作是惰性的,只有遇到行动操作才会触发计算
    • 行动操作 action:数据集计算结束后,给驱动程序返回一个值

    RDD 类型

    根据数据类型不同,整体分为:

    • value 类型
    • key-value 类型(二维数组)

    注意:每次 action 前面的 transformation 都会被重新计算,但可以通过缓存来加快访问速度

    2. value 类型

    2.1 map(func)

    返回一个新的 RDD, 该 RDD 是由原 RDD 的每个元素经过函数转换后的值而组成. 就是对 RDD 中的数据做转换,用来做数据结构的调整

    val arr = Array(10, 20, 30, 40, 50, 60)
    
    //    val rdd1 = sc.parallelize(arr).map(x => x * 2)
    val rdd1 = sc.parallelize(arr).map(_ * 2)
    

    2.2 mapPartitions(func)

    功能类似于 map,但是独立在每个分区上运行,假设有N个元素,有M个分区,那么 map 的函数的将被调用N次,而 mapPartitions 被调用M次,一个函数一次处理所有分区:

    // mapPartitions, x.map() 用的是 scala 的 map 函数,而非 spark 的 map
    //    val rdd2 = sc.parallelize(arr, 4).mapPartitions(x => x.map(_ * 2))
    
    val rdd2 = sc.parallelize(arr, 4).mapPartitions(x => {
        println("called one times!!!!!!")
    
        x.map(_ * 3)
    })
    

    运行结果:

    called one times!!!!!! 会输出 4 次,因为有 4 个分区:

    called one times!!!!!!
    called one times!!!!!!
    called one times!!!!!!
    called one times!!!!!!
    30
    60
    90
    120
    150
    180
    

    注意:当每个分区的数据很大时,使用 mapPartitions x.toList() 可能会把内存撑爆,从而导致 OOM;当内存足够大的时候,使用 mapPartitions 执行效率要比 map

    2.3 mapPartitionsWithIndex(func)

    mapPartitions(func)类似. 但是会给 func 多提供一个 Int 值来表示分区的索引. 所以func的类型是:(Int, Iterator<T>) => Iterator<U>

    val rdd3 = sc.parallelize(arr, 2).mapPartitionsWithIndex((index, it) => it.map(x => (index, x * 2)))
    rdd3.collect().foreach(println)
    sc.stop()
    

    运行结果:

    (0,20)
    (0,40)
    (0,60)
    (1,80)
    (1,100)
    (1,120)
    

    2.4 flatMap(func)

    flatMap 返回一个序列,输入一个可能会返回 0 个或多个,map 是一一对应的:

    // list1 中每个元素都是一个集合,flatMap 会将集合中的元素拍散,放在一个新的集合中
    val list1 = List(1 to 5, 6 to 11, 12 to 18, 18 to 25)
    val rdd1 = sc.parallelize(list1, 2)
    
    val rdd2 = rdd1.flatMap(x => x)
    

    运行结果:

    1
    2
    3
    4
    .
    .
    .
    24
    25
    

    其他用法:

    val list2 = List(30, 5, 70, 6, 1, 20)
    val rdd3 = sc.parallelize(list2).flatMap(x => List(x, x * x, x * x * x))
    
    // 结果:2、4、8,else 返回空列表
    val list3 = List(1, 2, 3)
    val rdd4 = sc.parallelize(list3, 2).flatMap(x => if (x % 2 == 0) List(x, x * x, x * x * x) else List[Int]())
    

    2.5 glom()

    将每个分区元素合并成一个数组,形成新的 rdd,类型是 RDD[Array[T]]

    val list1 = List(1, 2, 3, 4, 5, 6)
    val rdd = sc.parallelize(list1, 2).glom().map(x => x.toList)
    //    val rdd = sc.parallelize(list1, 2).glom().map(_.toList)
    

    运行结果:

    List(1, 2, 3)
    List(4, 5, 6)
    

    2.6 groupBy(func)

    按照 func 的返回值进行分组,func 的返回值作为 key, 对应值放入一个迭代器中,返回:RDD[(K, Iterable[T])],但是顺序不能保证(不推荐使用, groupByKey 用的比较多)

    按照元素的奇偶性进行分组:

    val rdd = sc.parallelize(List(30, 50, 7, 6, 1, 20), 2)
    val rdd2 = rdd.groupBy(x => x % 2)
    /*
    奇数、偶数聚合
    rdd2 =
      (0,CompactBuffer(30, 50, 6, 20))
      (1,CompactBuffer(7, 1))
      
      rdd3
      (0,106)
      (1,8)
      */
    val rdd3 = rdd2.map {
      case (k, it) => (k, it.sum)
    }
    rdd3.collect().foreach(println)
    

    2.7 filter(func)

    作用: 过滤. 返回一个新的 RDD 是由 func 的返回值为 true 的那些元素组成

    // 过滤出大于 20 的元素
    val rdd = sc.parallelize(List(30, 50, 7, 6, 1, 20), 2).filter(x => x > 20)
    

    2.8 sample(withReplacement, fraction, seed)

    作用:

    • 以指定的随机种子随机抽样出比例为fraction的数据,(抽取到的数量是: size * fraction). 需要注意的是得到的结果并不能保证准确的比例.
    • withReplacement 表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样. 放回表示数据有可能会被重复抽取到, false 则不可能重复抽取到. 如果是false, 则fraction必须是:[0,1], 是 true 则大于等于0就可以了.
    • seed 用于指定随机数生成器种子。 一般用默认的, 或者传入当前的时间戳
    val rdd = sc.parallelize(1 to 26, 2)
    
    //    不放回抽样,比例[0, 1]
    //    val sample_rdd = rdd.sample(false, 0.1)
    
    // 放回抽样
    val sample_rdd = rdd.sample(true, 1)
    

    2.9 distinct([numTasks]))

    RDD 中元素执行去重操作. 参数表示任务的数量,默认值和分区数保持一致:

    val list1 = List(1, 2, 3, 4, 5, 6, 6, 4, 3)
    val rdd = sc.parallelize(list1, 2).distinct()
    

    模板匹配对象去重:

    import org.apache.spark.sql.SparkSession
    
    object Distinct {
      def main(args: Array[String]): Unit = {
        val session = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
        val sc = session.sparkContext
    
        val list1 = List(1, 2, 3, 4, 5, 6, 6, 4, 3)
    //    val rdd = sc.parallelize(list1, 2).distinct()
    
        val rdd = sc.parallelize(List(User(10, "lisi"), User(20, "zs"), User(10, "ab")), 2).distinct(2)
        rdd.collect().foreach(println)
        sc.stop()
    
      }
    }
    
    case class User(age: Int, name: String) {
      override def hashCode(): Int = this.age
    
      override def equals(obj: Any): Boolean = obj match {
        case User(age, _) => this.age == age
        case _ => false
      }
    }
    

    注意:distinctshuffle 操作

    2.10 coalesce(numPartitions)

    作用:减少分区到指定数目,用于大数据集过滤后,提高小数据集的执行效率,不常用,常用 reparation

    由原来 5 个分区减少到 2 个分区:

    val rdd = sc.parallelize(List(30, 50, 70, 60, 10, 20), 5)
    println("分区前:" + rdd.getNumPartitions)
    
    val rdd2 = rdd.coalesce(2)
    println("分区后:" + rdd2.getNumPartitions)
    
    rdd.collect().foreach(println)
    

    coalesce 一般用来减少分区,也可以增加分区,增加分区时一定需要 shuffle,减少分区一般不会 shuffle

    // 增加分区,第二个参数为 true,表示增加分区
    rdd.coalesce(6, true)
    

    2.11 repartition(numPartitions)

    作用: 根据新的分区数, 重新 shuffle 所有的数据, 这个操作总会通过网络,新的分区数相比以前可多,也可以少:

    rdd.repartition(6)
    

    coalasce 和 reparation 的区别

    • coalasce:重新分区可选择是否进行 shuffle,由第二个参数 Boolean = false/true 决定
    • reparation:实质调用的 coalasce 进行 shuffle
    • 若减少分区,尽量减少 shuffle 使用 coalasce

    2.12 sortBy(func,[ascending], [numTasks])

    作用:使用 func 先对数据进行处理,而后对结果进行排序,默认为正序

    val list1 = List("aaa", "ccc", "bbb", "ddd", "eee")
    val rdd1 = sc.parallelize(list1, 2)
    val rdd = rdd1.sortBy(x => x)
    //    val rdd = rdd1.sortBy(x => x, ascending = false)
    //    val rdd = rdd1.sortBy(x => x.length, ascending = false)
    

    2.13 pipe(command, [envVars])

    作用:管道,针对每个分区,把 RDD 中的每个数据通过管道传递给shell命令或脚本,返回输出的 RDD。一个分区执行一次这个命令. 如果只有一个分区, 则执行一次命令,shell 脚本必须在 Worker 节点

    1、创建脚本 test_pipe.sh

    #!/bin/bash
    
    echo Hello
    while read line;do
        echo ">>>>>>" $line
    done
    
    # 记得添加权限,否则第二步调用会报错,没有权限 chmod +x test_pipe.sh
    

    2、spark-shell 中调用 test_pipe.sh

    // 一个分区
    scala> val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> rdd1.pipe("/home/hadoop/apps/test_pipe.sh").collect()
    res5: Array[String] = Array(Hello, >>>>>> 1, >>>>>> 2, >>>>>> 3, >>>>>> 4, >>>>>> 5, >>>>>> 6)
    
    // 二个分区 每个分区执行一次脚本, 但是每个元素算是标准输入中的一行
    scala> val rdd2 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
    
    scala> rdd2.pipe("/home/hadoop/apps/test_pipe.sh").collect()
    res6: Array[String] = Array(Hello, >>>>>> 1, >>>>>> 2, >>>>>> 3, Hello, >>>>>> 4, >>>>>> 5, >>>>>> 6)
    

    3. 双 Value 类型

    所谓双 value 类型即两个 rdd 进行交互,常用的有:union、subtract、zip

    3.1 union(otherDataset) 并集

    • union:并集
    • intersection:交集
    • subtract:差集
    • cartesian:笛卡尔积

    作用:求并集,返回一个新的 rdd

    val list1 = 1 to 15
    val list2 = 13 to 26
    
    val rdd1 = sc.parallelize(list1, 2)
    val rdd2 = sc.parallelize(list2, 2)
    
    // 并集
    val union_rdd = rdd1.union(rdd2)
    
    // 交集
    val intersection_rdd = rdd1.intersection(rdd2)
    
    // 差集
    val subtract_rdd = rdd1.subtract(rdd2)
    
    // 笛卡尔积
    val cartesian_rdd = rdd1.cartesian(rdd2)
    

    3.2 zip(otherDataset) 并集

    作用:拉链操作,两个 rddd元素和分区数必须一致,否则异常(scala 中元素个数可以不一致)

    1、元素个数、分区数一致:

    val list1 = 1 to 5
    val list2 = List("A", "B", "C", "D", "E")
    val rdd1 = sc.parallelize(list1, 2)
    val rdd2 = sc.parallelize(list2, 2)
    val rdd = rdd1.zip(rdd2)
    

    运行结果:

    (1,A)
    (2,B)
    (3,C)
    (4,D)
    (5,E)
    

    2、分区数不一致:

    val list1 = 1 to 5
    val list2 = List("A", "B", "C", "D", "E")
    val rdd1 = sc.parallelize(list1, 2)
    val rdd2 = sc.parallelize(list2, 3)
    val rdd = rdd1.zip(rdd2)
    

    运行结果:

    Can't zip RDDs with unequal numbers of partitions: List(2, 3)
    

    3、其他函数:

    // 和元素索引组成一个元组 (元素,索引),如:(A,0)、(B,1)
    //    val rdd = rdd2.zipWithIndex()
    
    // 将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求
    val rdd = rdd1.zipPartitions(rdd2)((it1, it2) => {
      it1.zipAll(it2, 100, 200)
    })
    

    运行结果:

    (1,A)
    (2,B)
    (3,C)
    (4,D)
    (5,E)
    

    4. key-value(PairRDD) 类型

    key-value 键值对类型,大多数有 shuffle 操作,其结构类似于:(key, value),常用的有:groupByKey、reduceByKey、partitionBy 等。

    作用: 对 pairRDD 进行分区操作,如果原有的 partionRDD 的分区器和传入的分区器相同, 则返回原 pairRDD,否则会生成 ShuffleRDD,即会产生 shuffle 过程

  • 相关阅读:
    bzoj4044/luoguP4762 [Cerc2014]Virus synthesis(回文自动机+dp)
    bzoj4032/luoguP4112 [HEOI2015]最短不公共子串(后缀自动机+序列自动机上dp)
    bzoj3926/luoguP3346 [Zjoi2015]诸神眷顾的幻想乡(trie上构建广义后缀自动机)
    bzoj3144 [HNOI2013]切糕(最小割)
    知识点简单总结——原根和指标
    uoj86 mx的组合数 (lucas定理+数位dp+原根与指标+NTT)
    rest_framework 学习笔记(一)
    Django 数据库操作
    02-Kubenetes资源
    10-Helm
  • 原文地址:https://www.cnblogs.com/midworld/p/15391273.html
Copyright © 2011-2022 走看看