zoukankan      html  css  js  c++  java
  • spark算子篇-aggregate 系列

    aggregate

    aggregate 是比较常用的 行动 操作,不是很好懂,这里做个解释。

    aggregate(zeroValue, seqOp, combOp)

    zeroValue 是一个初始值,自己根据实际情况进行设定;

    首先我们知道 RDD 是被分区,然后并行操作的;

    seqOp 是对每个分区进行聚合,每个分区聚合结果作为 combOp 的输入;

    combOp 对分区聚合结果再次进行聚合;

    seqOp 和 combOp 必须有且仅有2个参数

    示例如下

    seqOp:

    把初始值设为 0,累加就是求和

    把初始值设为 0,每次加1就是计数;然后 迭代 初始值

    combOp:

    每个分区的聚合结果为两部分(sum,count)

    在初始值的基础上,把每个分区的 sum 相加,count 相加

    迭代初始值

    seqOp = (lambda x, y: (x[0] + y, x[1] + 1))     ## x 为 初始值,y 是 list 中的单个元素
    combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))  ## x 为 初始值,y 是 seqOp 单个分区的聚合结果
    sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)       # (10, 4)
    
    ## 初始值变成 (3, 0), 分一个区,seqOp 聚合结果加 3, combOp 聚合结果加 3, 总共加 6,即 16
    sc.parallelize([1, 2, 3, 4], 1).aggregate((3, 0), seqOp, combOp)    # (16, 4)  
    ## 初始值变成 (3, 0), 分两个区,seqOp 每个分区聚合结果加 3,共加 6,combOp 聚合结果加 3, 总共加 9,即 19
    sc.parallelize([1, 2, 3, 4], 2).aggregate((3, 0), seqOp, combOp)    # (19, 4)
    
    sc.parallelize([1, 2, 3, 4], 4).aggregate((3, 0), seqOp, combOp)    # (25, 4)
    sc.parallelize([1, 2, 3, 4]).aggregate((3, 0), seqOp, combOp)    # 如果不写分区,默认为 cpu 核数个分区,即 4 核就分 4 个区

    计算过程如下

    aggregate 是针对 序列 进行操作的,还有一个 aggregateByKey 是针对 (key - value 对) 中的 key 进行 aggregate 操作的 

  • 相关阅读:
    js判断选择时间不能小于当前时间的代码
    shell脚本编程之for语句、if语句使用介绍
    linux命令 chattr超级权限控件
    教你配置linux服务器登陆欢迎信息
    PHP基础入门教程 PHP循环函数
    php获取客户端ip地址
    PHP获取域名、IP地址的方法
    两日期间的间隔
    mysql 案例 ~ pt-archiver 归档工具的使用
    mysql 案例 ~ pt修复工具的使用
  • 原文地址:https://www.cnblogs.com/yanshw/p/11649276.html
Copyright © 2011-2022 走看看