zoukankan      html  css  js  c++  java
  • spark rdd高级算子 scala版

    • mapPartitionsWithIndex

      

    def func(index:Int,iter:Iterator[(int)]) : Iterator[String] = {
        iter.toList.map(x = > "[prtID:"+index+",val:"+x+"]").iterator
    }
    
    val rdd = sc.parallelize(List(1,2,3,4,5,6,7),2)
    rdd.mapPartitionsWithIndex(fun).collect

    传入参数是一个接受两个参数的函数,第一参数是partition的编号,第二个参数是各个分区的数据迭代器

    • aggregate
    //传入seqOp的是 reduce逻辑,(1,2,3,4) 就是 先传1,2 再传1,2的结果和3
    (zeroValue: U)(seqOp:(U,T) => U,combOp:(U,U) = > U) : U seqOp 对每个parttition的元素作用 combOp 对每个partition的是聚合结果作用
    val rdd = sc.parallelize(List(1,2,3,4,5,6,7),2)
    rdd.aggregate(0)(_+_,_+_) //aggregate求和
    res : Int = 28
    rdd.aggregate(0)(math.max(_,_),_+_) //分区中的最大值求和
    结果:10
    rdd.aggregate(10)(math.max(_,_),_+_) //10 在combOp中的还会被操作一次
    结果:30
    val rdd1 =sc.parallelize(List("a","b","c","d","e","f"),2)
    rdd1.aggregate("|")(_+_,_+_)
    结果 ||abc|def
     

    val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
    rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)

    结果由于是并行计算,所以结果 可能是"42",或者"24"

    
    
    •  aggregateByKey

    实用于key-value类型的rdd,pairrdd类型

    首先对各自分区的pair,将key相同的提取出来,然后将value的集合运用后面传进来的方法

    val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
    def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
      iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
    }
    pairRDD.mapPartitionsWithIndex(func2).collect
    pairRDD.aggregateByKey(0)(_+_, _ + _).collect
    结果: (dog,12), (cat 19),(mouse,6)
    pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
    结果: (dog,12), (cat 17),(mouse,6)
    pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
    结果:  Array((dog,100), (cat,200), (mouse,200))
    • com

    文件内容:

    hello tom

    hello tom

    hello tom

    hello tom

    hello tom

    hello tom

    hello jerry

    hello jerry

    hello jerry

    hello kitty

    hello kitty

    hello kitty

    hello world

    hello world

    hello world

    //传入三个参数,第一个参数是进行分区迭代的初始值
    //第二个分区局部聚合
    //第三个分区汇总聚合
    combineByKey()
    sc.textFile("上面的文件内容").flatMap(_.split("")).map((_,1)).combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
    结果:(tom,6) (hello,15) (kitty,3) (jerry,3)
    sc.textFile("上面的文件内容").flatMap(_.split("")).map((_,1)).combineByKey(x => x+10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
    //因为有三个分区,所以初始相加值+10,总和加30
    结果:(tom,36) (hello,45) (kitty,33) (jerry,33)
     
  • 相关阅读:
    IdentityServer4 接口说明
    MQTT中的Retained(保留消息) 与 LWT(最后遗嘱)
    Docker常用命令
    开源服务容错处理库Polly使用文档
    MQTT 主题的高级特性
    MQTT的$SYS主题定义
    RabbitMQ消息队列之Windows下安装和部署
    RabbitMQ多台物理机集群搭建
    Ocelot.json完整配置文件
    nginx.conf文件配置明细详解
  • 原文地址:https://www.cnblogs.com/zhangweilun/p/6567059.html
Copyright © 2011-2022 走看看