zoukankan      html  css  js  c++  java
  • Spark算子


    释义

    将每个partition内元素进行聚合,然后将每个partition的聚合结果进行combine,得到最终聚合结果。最终结果允许跟原始RDD类型不同

    方法签名如下:

    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
      ...
    }
    
    • zeroValue: 每个partition的聚合初始值
    • seqOp: sequence operation,对partition内数据进行映射,最终1个partition只有1个结果。输入类型为U跟T,输出为U,即每次操作结果要跟zeroValue类型一致
      • 第一次操作时,U为zeroValue(初始值),第一次操作之后输出结果U,作为下一次操作的U
      • 第二次操作及之后操作时,U为前一次操作输出结果,而不再是zeroValue
    • combOp: combine operation,对每个partition的结果进行combine操作。输入类型为U跟U,输出为U,即输入类型与输出类型一致。最终结果为:U类型的RDD

    案例

    统计所有单词总长度,单词的总个数

    object TestAggregate {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("TestAggregate").setMaster("local[1]")
        val sc = new SparkContext(conf)
        val data = Array("hello", "world", "simple", "app", "is", "good", "good", "world")
        val result: (Int, Int) = sc.parallelize(data, 2) // 当前设置为2个partition
          .aggregate((0, 0))( //①
            (v: (Int, Int), str: String) => (v._1 + str.length, v._2 + 1), // ②
            (v1: (Int, Int), v2: (Int, Int)) => (v1._1 + v2._1, v1._2 + v2._2) // ③
          )
        println(result)
      }
    }
    

    输出

    (34,8)
    

    解释

    1. 在每个partition内传入初始值(0, 0),如①处
    2. 之后每个partition内开始进行聚合计算,如②处。每个partition内的单词长度累加,放入结果二元组的第一位;每处理一个单词,结果二元组的第二位加一,即单词个数加一
    3. 每个partition的结果二元组再进行汇总操作,如③处。最终形成一个二元组,第一位是所有字母的总长度,第二位是所有单词的总个数



    尊重写作权利,转载请注明出处 ^_^
  • 相关阅读:
    【剑指offer】二叉搜索树与双向链表
    【剑指offer】复杂链表的复制
    【剑指offer】二叉树中和为某一值的路径
    2018.12.30 Intellij IDEA设置main方法自动补全
    2018.12.29 Spring FrameWork各个版本下载
    2018.12.26 Mac下的Eclipse在编辑Spring配置文件xml时自动提示类class包名配置
    2018.12.25 Spring中JDBCTemplate模版API学习
    2018.12.24 Spring中的aop演示(也就是运用aop技术实现代理模式)
    2018.12.22 Spring学习02
    2018.12.24 Ubuntu18.0.4 主题参考
  • 原文地址:https://www.cnblogs.com/convict/p/14828227.html
Copyright © 2011-2022 走看看