zoukankan      html  css  js  c++  java
  • 【Spark】算子

    1. mapWith
    mapWith(i => i*10)((a,b) => b+2)
    (拿到分区号)(a是每次取到的RDD中的元素,b接收i*10的结果)
    2. flatMapWith
    类似mapWith,区别在于flatMapWith返回的是一个序列

    3. mapPartitions
    每次取到的是分区号
    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7),3)
    对分区每个元素乘10
    rdd1.mapPartitions(_.map(_ * 10))

    4. mapPartitionsWithIndex
    对RDD中的每个分区进行操作,带有分区号
    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
    创建一个函数
    (index分区号,iter分区中的每个元素)
    def func1(index:Int,iter:Iterator[Int]):Iterator[String] ={
    ter.toList.map(x=>"[PartID:" + index +",value="+x+"]").iterator
    }
    rdd1.mapPartitionsWithIndex(func1).collect
    结果:
    Array(
    [PartID:0,value=1], [PartID:0,value=2], [PartID:0,value=3], [PartID:0,value=4],
    [PartID:1,value=5], [PartID:1,value=6], [PartID:1,value=7], [PartID:1,value=8], [PartID:1,value=9])

    5. aggregate:聚合操作
    先对局部进行操作,再对全局进行操作
    val rdd1 = sc.parallelize(List(1,2,3,4,5),2)
    (初始值)(局部操作,全局操作)
    rdd1.aggregate(0)(math.max(_,_),_+_)

    6. aggregateByKey
    类似aggregate,也是先对局部,再对全局
    区别:aggregateByKey操作<key,value>
    val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
    把每个笼子中,每种动物最多的个数进行求和
    (初始值)(局部操作,全局操作)
    pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect

    7. foldByKey:对相同key进行聚合操作
    foldByKey(初始值)(对value的操作)
    拼接相同key的value字符串
    val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
    val rdd2 = rdd1.map(x => (x.length, x))
    val rdd3 = rdd2.foldByKey("")(_+_)
    结果:Array[(Int, String)] = Array((4,wolfbear), (3,dogcat))

    8. combineByKey 操作的是key-value
    val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)

    第一个参数:List(_),(分区的每一个元素:它的类型就是整个函数的返回类型(List[String]))
    第二参数 (x: List[String], y: String=> x :+ y,):分区内的局部聚合,x与第一个参数的返回类型一致,y是RDD的每一个value
    第三个参数((m: List[String], n: List[String]) => m ++ n): 全局聚合,类型与第一个参数一致
    val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)

    9. coalesce、repartition、partitionBy:重新分区
    coalesce:默认不会shuffle,可以重新分更小的分区,不能分更大的分区,如果要shuffle,需要传入参数true
    val rdd1 = sc.parallelize(1 to 10, 5)
    val rdd2 = rdd1.coalesce(10, true)
    rdd2.partitions.length
    repartition: 会进行shuffle,任意分区都可以,传入参数是Int数字
    rdd1.repartition(8)
    partitionBy:会进行shuffle,任意分区都可以,传入参数是分区器
    rdd1.partitionBy(new org.apache.spark.HashPartitioner(7))

    10. countByKey 计算相同key的元组有多少个
    countByValue:计算相同value的元组有多少个
    val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
    rdd1.countByKey
    rdd1.countByValue

    11. filterByRange :根据key的范围进行过滤
    val rdd1 = sc.parallelize(List(("e", 5), ("f",2),("c", 3), ("d", 4), ("c", 2), ("a", 1)))
    取出c到d范围的元组
    val rdd2 = rdd1.filterByRange("c", "d")
    rdd2.colllect

    12. flatMapValues :对元组的值进行操作并压平
    val rdd3 = sc.parallelize(List(("a","1 2"), ("b","3 4")))
    rdd3.flatMapValues(_.split(" ")).collect
    结果: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))

    13. foreachPartition :对每个分区的操作,返回每个分区的结果
    val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
    rdd1.foreachPartition(x => println(x.reduce(_ + _)))
    结果
    6
    15
    24

    14. keyBy : 将传入的参数作为元组的key,原RDD元素作为value
    val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
    val rdd2 = rdd1.keyBy(_.length)
    rdd2.collect
    结果:Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))

    15. keys :获取RDD的key
    values :获取RDD的value
    val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
    val rdd2 = rdd1.map(x => (x.length, x))
    rdd2.keys.collect
    rdd2.values.collect

    16. collectAsMap :将集合转化成元组
    val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
    rdd.collectAsMap
    结果 :scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)
  • 相关阅读:
    js-监听网络状态
    call、apply、bind三者比较
    弹框滑动击穿问题
    Vue指令及自定义指令的使用
    vue-cli 运行打开浏览器
    递归判断多维数组中对象是否有值
    sync 修饰符在Vue中如何使用
    自定义组件 v-model 的使用
    Object.keys( )与 for in 区别
    mongodb 安装
  • 原文地址:https://www.cnblogs.com/snova/p/9195701.html
Copyright © 2011-2022 走看看