zoukankan      html  css  js  c++  java
  • RDD-aggregate

    1. 参数:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)

    2. 作用:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

    // 并行化创建RDD,有2个分区
    val z = sc.parallelize(List(1,2,3,4,5,6), 2)
    
    // 先用mapPartitionsWithIndex分区标签打印出RDD的内容
    def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
      iter.map(x => "[partID:" +  index + ", val: " + x + "]")
    }
    
    z.mapPartitionsWithIndex(myfunc).collect
    // res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])
    
    // 或者简写如下:
    z.mapPartitionsWithIndex((idx,iter)=>(iter.map((idx,_)))).collect
    // Array[(Int, Int)] = Array((0,1), (0,2), (0,3), (1,4), (1,5), (1,6))
    
    // 应用aggreate RDD算子
    // 先计算每个partition的数与初始值的最大值,得到每个分区的最大值3、6,和初始值0累加求和
    z.aggregate(0)(math.max(_, _), _ + _)
    // res40: Int = 9
    
    //这个例子返回16,因为初始值是5
    //分区0的最大值(5,1,2,3)= 5
    //分区1的最大值为(5,4,5,6)= 6
    //分区之间的最终缩减值将是5 + 5 + 6 = 16
    //注:最终的值包括初始值
    z.aggregate(5)(math.max(_, _), _ + _)
    res29: Int = 16
    
    // aggregate字符串拼接
    val z = sc.parallelize(List("a","b","c","d","e","f"),2)
    //先用分区标签打印出RDD的内容
    def myfunc(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
      iter.map(x => "[partID:" +  index + ", val: " + x + "]")
    }
    
    z.mapPartitionsWithIndex(myfunc).collect
    res31: Array[String] = Array([partID:0, val: a], [partID:0, val: b], [partID:0, val: c], [partID:1, val: d], [partID:1, val: e], [partID:1, val: f])
    
    // 或简写为:
    z.mapPartitionsWithIndex((idx,iter)=>(iter.map((idx,_)))).collect
    // Array[(Int, String)] = Array((0,a), (0,b), (0,c), (1,d), (1,e), (1,f))
    
    scala> z.aggregate("")(_+_,_+_)
    res10: String = defabc
    
    scala> z.aggregate("")(_+_,_+_)
    res13: String = abcdef
    // 由于每个分区是并行计算的,因此不确定哪个分区先执行完成,
    // 所以最后分区之间combine的操作无法确定哪个分区在前在后,但是初始值肯定在最前面
    
    // 初始值不为空的情况
    scala> z.aggregate("@")(_+_,_+_)
    // res14: String = @@abc@def
    
    scala> z.aggregate("@")(_+_,_+_)
    // res15: String = @@def@abc
    
    scala> z.aggregate("@")(_+_,_+_)
    // res16: String = @@def@abc
    
    
    // 下面是一些更高级的示例
    
    // 求每个分区字符串长度的最大值然后拼接结果
    val z = sc.parallelize(List("12","23","345","4567"),2)
    z.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
    // 分区0的数据为"12","23",和初始值""比较字符串的最大长度2,并转换为String "2"。
    // 分区1的数据为"345","4567",和初始值""比较字符串的最大长度4,并转换为String "4"。
    // 最后分区的结果和初始值拼接,得到42或者24
    // res141: String = 42
    
    // 求每个分区字符串长度的最小值然后拼接结果
    z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
    // res142: String = 11
    
    // 开始误以为结果是20或者02,其实不然,对""取length是0,0 toString之后是字符"0",
    // 字符"0"的长度是1
    "".length   // res24: Int = 0
    "".length.toString  // res25: String = 0
     "".length.toString.length  //res26: Int = 1
    
  • 相关阅读:
    Mac下各种编程环境的配置问题(python java)
    hackintosh和windows时区问题
    CAN波特率计算公式
    Jetson nano 安装 TensorFlow
    python逻辑运算符优先级
    CSS知识点(一)
    HTML标签(二)
    python2与python3的区别
    HTML标签(一)
    IO多路复用和协程
  • 原文地址:https://www.cnblogs.com/lucas-zhao/p/12101404.html
Copyright © 2011-2022 走看看