zoukankan      html  css  js  c++  java
  • Spark实战(二)Spark常用算子

    一、算子分类

       从大方向来说,Spark 算子大致可以分为以下两类:

        1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。
       Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
       2)Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。
       Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。
       从小方向来说,Spark 算子大致可以分为以下三类:
       1)Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的数据。
       2)Key-Value数据类型的Transfromation算子,这种变换并不触发提交作业,针对处理的数据项是Key-Value型的数据对。 
       3)Action算子,这类算子会触发SparkContext提交Job作业。

    二、常用算子

    (一).Value数据类型的Transformation算子

    1 .输入分区与输出分区一对一型

    1)、map

       将原来 RDD 的每个数据项通过 map 中的用户自定义函数 f 映射转变为一个新的元素

    val data = sc.parallelize(1 to 10,3)
    val result = data.map(it => it + 1)
    result.collect
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    2)、flatMap算子

       map是对RDD中元素逐一进行函数操作映射为另外一个RDD,而flatMap操作是将函数应用于RDD之中的每一个元素,将返回的迭代器的所有内容构成新的RDD,将原来的数组或容器结合拆散,拆散的数据形成为 RDD 中的数据项

    val result2 = data.flatMap(it => (it to 10)).collect
    
    • 1

    在这里插入图片描述

    3)、mapPartitions算子

       区于foreachPartition(属于Action,且无返回值),而mapPartitions可获取返回值。与map的区别前面已经提到过了,但由于单独运行于RDD的每个分区上(block),所以在一个类型为T的RDD上运行时,(function)必须是Iterator => Iterator< U >类型的方法(入参)。

    def function(it:Iterator[Int]): Iterator[Int] = {
        var res = for(e <- it) yield e * 2
        res 
    }
    
    val result4 = data.mapPartition(function)
    result4.collect
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述
    在这里插入图片描述

    4)、glom算子

       glom函数将每个分区形成一个数组,内部实现是返回的GlommedRDD。

    val a = sc.parallelize(1 to 100, 3)
    a.glom.collect
    
    • 1
    • 2

    在这里插入图片描述
    在这里插入图片描述

    2. 输入分区与输出分区多对一型

    1)、union算子

       使用 union 函数时需要保证两个 RDD 元素的数据类型相同,返回的 RDD 数据类型和被合并的 RDD 元素数据类型相同,并不进行去重操作,保存所有元素。如果想去重 可以使用 distinct()。同时 Spark 还提供更为简洁的使用 union 的 API,通过 ++ 符号相当于 union 函数操作

    val a = sc.parallelize(1 to 3, 1)
    val b = sc.parallelize(5 to 7, 1)
    (a ++ b).collect
    a.union(b).collect
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    2)、cartesian算子

       对 两 个 RDD 内 的 所 有 元 素 进 行 笛 卡 尔 积 操 作。 操 作 后, 内 部 实 现 返 回CartesianRDD

    val x = sc.parallelize(List(1,2,3,4,5))
    val y = sc.parallelize(List(6,7,8,9,10))
    x.cartesian(y).collect
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    3.输入分区与输出分区多对多型

    1)、grouBy算子

       将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组。

    函数实现如下:
      1)将用户函数预处理:
      val cleanF = sc.clean(f)
      2)对数据 map 进行函数操作,最后再进行 groupByKey 分组操作。
    
         this.map(t => (cleanF(t), t)).groupByKey(p)
      其中, p 确定了分区个数和分区函数,也就决定了并行化的程度。
    val a = sc.parallelize(1 to 9, 3)
    a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述

    4.输出分区为输入分区子集型

    1)、filter算子

       filter 函数功能是对元素进行过滤,对每个 元 素 应 用 f 函 数, 返 回 值 为 true 的 元 素 在RDD 中保留,返回值为 false 的元素将被过滤掉。 内 部 实 现 相 当 于 生 成 FilteredRDD(this,sc.clean(f))。 下面代码为函数的本质实现: deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))

    val a = sc.parallelize(1 to 10, 3)
    val b = a.filter(_ % 2 == 0)
    b.collect
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    2)、distinct算子

       distinct将RDD中的元素进行去重操作

    val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
    c.distinct.collect
    
    • 1
    • 2

    在这里插入图片描述

    3)、subtract算子

       subtract相当于进行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素

    val a = sc.parallelize(1 to 9, 3)
    val b = sc.parallelize(1 to 3, 3)
    val c = a.subtract(b)
    c.collect
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    4)、sample算子

       sample 将 RDD 这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。内部实现是生成 SampledRDD(withReplacement, fraction, seed)。

     函数参数设置:
    ‰   withReplacement=true,表示有放回的抽样。
    ‰   withReplacement=false,表示无放回的抽样。
    
    val a = sc.parallelize(1 to 10000, 3)
    a.sample(false, 0.1, 0).count
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述

    5)、takeSample算子

       takeSample()函数和上面的sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行
    Collect(),返回结果的集合为单机的数组。

    val x = sc.parallelize(1 to 1000, 3)
    x.takeSample(true, 100, 1)
    
    • 1
    • 2

    在这里插入图片描述

    5.Cache型

    1)、cache算子

       cache 将 RDD 元素从磁盘缓存到内存。 相当于 persist(MEMORY_ONLY) 函数的功能。

    val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
    c.getStorageLevel
    c.cache
    c.getStorageLevel
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    2)、persist算子

       persist 函数对 RDD 进行缓存操作。数据缓存在哪里依据 StorageLevel 这个枚举类型进行确定。 有以下几种类型的组合(见10), DISK 代表磁盘,MEMORY 代表内存, SER 代表数据是否进行序列化存储。

    (二).Key-Value数据类型的Transfromation算子

    1 .输入分区与输出分区一对一型

    1)、mapValues算子

       mapValues :针对(Key, Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。

    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
    val b = a.map(x => (x.length, x))
    b.mapValues("x" + _ + "x").collect
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    2. 对单个RDD或两个RDD聚集

    单个RDD:

    1)、combineByKey算子

    下面代码为 combineByKey 函数的定义:
      combineByKey[C](createCombiner:(V) C,
      mergeValue:(C, V) C,
      mergeCombiners:(C, C) C,
      partitioner:Partitioner,
      mapSideCombine:Boolean=true,
      serializer:Serializer=null):RDD[(K,C)]
    
    说明:
    createCombiner: V => C, C 不存在的情况下,比如通过 V 创建 seq C。
    mergeValue: (C, V) => C,当 C 已经存在的情况下,需要 merge,比如把 item V
    加到 seq C 中,或者叠加。
       mergeCombiners: (C, C) => C,合并两个 C。
    partitioner: Partitioner, Shuff le 时需要的 Partitioner。
    mapSideCombine : Boolean = true,为了减小传输量,很多 combine 可以在 map
    端先做,比如叠加,可以先在一个 partition 中把所有相同的 key 的 value 叠加,
    再 shuffle。
    serializerClass: String = null,传输需要序列化,用户可以自定义序列化类:
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

       例如,相当于将元素为 (Int, Int) 的 RDD 转变为了 (Int, Seq[Int]) 类型元素的 RDD。如,通过 combineByKey, 将 (V1,2), (V1,1)数据合并为( V1,Seq(2,1))。

    val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
    val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
    val c = b.zip(a)
    val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
    d.collect
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    2)、reduceByKey算子

       reduceByKey 是比 combineByKey 更简单的一种情况,只是两个值合并成一个值,( Int, Int V)to (Int, Int C),比如叠加。所以 createCombiner reduceBykey 很简单,就是直接返回 v,而 mergeValue和 mergeCombiners 逻辑是相同的,没有区别。

       函数实现:
       def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
    	= {
       combineByKey[V]((v: V) => v, func, func, partitioner)
       }
    
    val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
    val b = a.map(x => (x.length, x))
    b.reduceByKey(_ + _).collect
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述
    val a = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “panther”, “eagle”), 2)
    val b = a.map(x => (x.length, x))
    b.reduceByKey(_ + _).collect
    在这里插入图片描述

    3)、partitionBy算子

       partitionBy函数对RDD进行分区操作。函数定义如下。partitionBy(partitioner:Partitioner)如果原有RDD的分区器和现有分区器(partitioner)一致,则不重分区,如果不一致,则相当于根据分区器生成一个新的ShuffledRDD。

    多个RDD:

    4)、Cogroup算子

        cogroup函数将两个RDD进行协同划分,cogroup函数的定义如下。cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]对在两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集合,并且返回两个RDD中对应Key的元素集合的迭代器。(K, (Iterable[V], Iterable[W]))其中,Key和Value,Value是两个RDD下相同Key的两个数据集合的迭代器所构成的元组。

    val a = sc.parallelize(List(1, 2, 1, 3), 1)
    val b = a.map((_, "b"))
    val c = a.map((_, "c"))
    b.cogroup(c).collect
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    3. 连接

    1)、join算子

        join 对两个需要连接的 RDD 进行 cogroup函数操作,将相同 key 的数据能够放到一个分区,在 cogroup 操作之后形成的新 RDD 对每个key 下的元素进行笛卡尔积的操作,返回的结果再展平,对应 key 下的所有元组形成一个集合。最后返回 RDD[(K, (V, W))]。下 面 代 码 为 join 的 函 数 实 现, 本 质 是通 过 cogroup 算 子 先 进 行 协 同 划 分, 再 通 过flatMapValues 将合并的数据打散。this.cogroup(other,partitioner).f latMapValues{case(vs,ws) => for(v<-vs;w<-ws)yield(v,w) }

    val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
    val b = a.keyBy(_.length)
    val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
    val d = c.keyBy(_.length)
    b.join(d).collect
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述
    在这里插入图片描述

    2)、leftOutJoin和 rightOutJoin算子

       LeftOutJoin(左外连接)和RightOutJoin(右外连接)相当于在join的基础上先判断一侧的RDD元素是否为空,如果为空,则填充为空。 如果不为空,则将数据进行连接运算,并返回结果。下面代码是leftOutJoin的实现。

    if (ws.isEmpty) {
    vs.map(v => (v, None))
    } else {
    for (v <- vs; w <- ws) yield (v, Some(w))
    }
    val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
    val b = a.keyBy(_.length)
    val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
    val d = c.keyBy(_.length)
    b.leftOuterJoin(d).collect
    b.rightOuterJoin(d).collect
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里插入图片描述
    在这里插入图片描述

    (三).Action算子

       本质上在 Action 算子中通过 SparkContext 进行了提交作业的 runJob 操作,触发了RDD DAG 的执行。

    1 .无输出

    1)、foreach算子

       foreach 对 RDD 中的每个元素都应用 f 函数操作,不返回 RDD 和 Array, 而是返回Uint

    val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)
    c.foreach(x => println(x + "s are yummy"))
    
    • 1
    • 2

    2.HDFS

    1)、saveAsTextFile算子

       函数将数据输出,存储到 HDFS 的指定目录。

    下面为 saveAsTextFile 函数的内部实现,其内部
      通过调用 saveAsHadoopFile 进行实现:
    this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
    将 RDD 中的每个元素映射转变为 (null, x.toString),然后再将其写入 HDFS。
    
    val a = sc.parallelize(1 to 10000, 3)
    a.saveAsTextFile("mydata_a")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2)、saveAsObjectFile算子

       saveAsObjectFile将分区中的每10个元素组成一个Array,然后将这个Array序列化,映射为(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile的格式。

    下面代码为函数内部实现。
      map(x=>(NullWritable.get(),new BytesWritable(Utils.serialize(x))))
    val x = sc.parallelize(1 to 100, 3)
    x.saveAsObjectFile("objFile")
    val y = sc.objectFile[Int]("objFile")
    y.collect
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.Scala集合和数据类型

    1)、collect算子

       collect 相当于 toArray, toArray 已经过时不推荐使用, collect 将分布式的 RDD 返回为一个单机的 scala Array 数组。在这个数组上运用 scala 的函数式操作。

    val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
    c.collect
    
    • 1
    • 2

    在这里插入图片描述

    2)、collectAsMap算子

       collectAsMap对(K,V)型的RDD数据返回一个单机HashMap。 对于重复K的RDD元素,后面的元素覆盖前面的元素。

    val a = sc.parallelize(List(1, 2, 1, 3), 1)
    val b = a.zip(a)
    b.collectAsMap
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    3)、reduceByKeyLocally算子

       实现的是先reduce再collectAsMap的功能,先对RDD的整体进行reduce操作,然后再收集所有结果返回为一个HashMap。

    val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
    val b = a.map(x => (x.length, x))
    b.reduceByKey(_ + _).collect
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    4)、lookup算子

       lookup(key:K):Seq[V] Lookup函数对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq。 这个函数处理优化的部分在于,如果这个RDD包含分区器,则只会对应处理K所在的分区,然后返回由(K,V)形成的Seq。 如果RDD不包含分区器,则需要对全RDD元素进行暴力扫描处理,搜索指定K对应的元素。

    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
    val b = a.map(x => (x.length, x))
    b.lookup(5)
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    5)、count算子

       count 返回整个 RDD 的元素个数。内部函数实现为:

    defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum
    
    val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
    c.count
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    6)、top算子

       top可返回最大的k个元素。 函数定义如下。

    top(num:Int)(implicit ord:Ordering[T]):Array[T]
    相近函数说明如下。
    ·top返回最大的k个元素。
    ·take返回最小的k个元素。
    ·takeOrdered返回最小的k个元素,并且在返回的数组中保持元素的顺序。
    ·first相当于top(1)返回整个RDD中的前k个元素,可以定义排序的方式Ordering[T]。
    返回的是一个含前k个元素的数组。
    
    val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2)
    c.top(2)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在这里插入图片描述

    7)、reduce算子

       reduce函数相当于对RDD中的元素进行reduceLeft函数的操作。 函数实现如下。Some(iter.reduceLeft(cleanF))reduceLeft先对两个元素<K,V>进行reduce函数操作,然后将结果和迭代器取出的下一个元素<k,V>进行reduce函数操作,直到迭代器遍历完所有元素,得到最后结果。在RDD中,先对每个分区中的所有元素<K,V>的集合分别进行reduceLeft。 每个分区形成的结果相当于一个元素<K,V>,再对这个结果集合进行reduceleft操作。

    val a = sc.parallelize(1 to 100, 3)
    a.reduce(_ + _)
    
    • 1
    • 2

    在这里插入图片描述

    8)、fold算子

       fold和reduce的原理相同,但是与reduce不同,相当于每个reduce时,迭代器取的第一个元素是zeroValue。

    val a = sc.parallelize(List(1,2,3), 3)
    a.fold(0)(_ + _)
    
    • 1
    • 2

    在这里插入图片描述

    9)、aggregate算子

       aggregate先对每个分区的所有元素进行aggregate操作,再对分区的结果进行fold操作。aggreagate与fold和reduce的不同之处在于,aggregate相当于采用归并的方式进行数据聚集,这种聚集是并行化的。 而在fold和reduce函数的运算过程中,每个分区中需要进行串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚集,并返回最终聚集结果。函数的定义如下。

    aggregate[B](z: B)(seqop: (B,A) => B,combop: (B,B) => B): B
    
    val z = sc.parallelize(List(1,2,3,4,5,6), 2)
    def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
      iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
    }
    z.mapPartitionsWithIndex(myfunc).collect
    z.aggregate(0)(math.max(_, _), _ + _)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述

    4.计算模型中的两个特殊变量

       广播(broadcast)变量:其广泛用于广播Map Side Join中的小表,以及广播大变量等场景。 这些数据集合在单节点内存能够容纳,不需要像RDD那样在节点之间打散存储。 Spark运行时把广播变量数据发到各个节点,并保存下来,后续计算可以复用。 相比Hadoo的distributed cache,广播的内容可以跨作业共享。 Broadcast的底层实现采用了BT机制。②代表V③代表U

       accumulator变量:允许做全局累加操作,如accumulator变量广泛使用在应用中记录当前的运行指标的情景。

  • 相关阅读:
    Servlet的建立以及配置使用
    maven 安装 及其 创建
    错误总结
    使用分层实现业务处理
    JSP数据交互(三)
    JSP数据交互(二)
    JSP数据交互(一)
    动态网页开发基础
    记录mysql编码问题
    .net core 2.0 升级 2.1 访问mysql出现bug解决
  • 原文地址:https://www.cnblogs.com/ExMan/p/14318519.html
Copyright © 2011-2022 走看看