zoukankan      html  css  js  c++  java
  • 2021寒假(20)

    算子用法

    1)该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition

    操作都可以完成,因为无论如何都会经 shuffle 过程。
    val dataRDD = sparkContext.makeRDD(List(
     1,2,3,4,1,2
    ),2)
    val dataRDD1 = dataRDD.repartition(4)
    

    2)该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程

    val dataRDD = sparkContext.makeRDD(List(
     1,2,3,4,1,2
    ),2)
    val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
    

     3)对源 RDD 和参数 RDD 求交集后返回一个新的 RDD

    val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
    val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
    val dataRDD = dataRDD1.intersection(dataRDD2)
    

    4)对源 RDD 和参数 RDD 求并集后返回一个新的 RDD

    val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
    val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
    val dataRDD = dataRDD1.union(dataRDD2)
    

    5)以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集

    val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
    val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
    val dataRDD = dataRDD1.subtract(dataRDD2)
    

     6)将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素。

    val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
    val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
    val dataRDD = dataRDD1.zip(dataRDD2)
    

     7)将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner

    val rdd: RDD[(Int, String)] =
     sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
    import org.apache.spark.HashPartitioner
    val rdd2: RDD[(Int, String)] =
     rdd.partitionBy(new HashPartitioner(2))
    

     8)可以将数据按照相同的 Key 对 Value 进行聚合

    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val dataRDD2 = dataRDD1.reduceByKey(_+_)
    val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
    

     9)将数据源的数据根据 key 对 value 进行分组

    val dataRDD1 =
     sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val dataRDD2 = dataRDD1.groupByKey()
    val dataRDD3 = dataRDD1.groupByKey(2)
    val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))
    

    10)将数据根据不同的规则进行分区内计算和分区间计算

    val dataRDD1 =
     sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val dataRDD2 =
     dataRDD1.aggregateByKey(0)(_+_,_+_)
    

    取出每个分区内相同 key 的最大值然后分区间相加

    // TODO : 取出每个分区内相同 key 的最大值然后分区间相加
    // aggregateByKey 算子是函数柯里化,存在两个参数列表
    // 1. 第一个参数列表中的参数表示初始值
    // 2. 第二个参数列表中含有两个参数
    // 2.1 第一个参数表示分区内的计算规则
    // 2.2 第二个参数表示分区间的计算规则
    val rdd =
     sc.makeRDD(List(
     ("a",1),("a",2),("c",3),
     ("b",4),("c",5),("c",6)
     ),2)
    // 0:("a",1),("a",2),("c",3) => (a,10)(c,10)
    // => (a,10)(b,10)(c,20)
    // 1:("b",4),("c",5),("c",6) => (b,10)(c,10)
    val resultRDD =
     rdd.aggregateByKey(10)(
     (x, y) => math.max(x,y),
     (x, y) => x + y
     )
    resultRDD.collect().foreach(println)
    
  • 相关阅读:
    php数组函数-array_push()
    php数组函数-array_pop()
    php数组函数-array_pad()
    php数组函数-array_merge()
    php数组函数-array_map()
    php数组函数-array_keys()
    php数组函数-array_key_exists()
    php数组函数-array_intersect()
    php数组函数-array_flip()
    php数组函数-array_filter()
  • 原文地址:https://www.cnblogs.com/ywqtro/p/14319341.html
Copyright © 2011-2022 走看看