zoukankan      html  css  js  c++  java
  • Spark权威指南读书笔记(四) 聚合与连接

    Spark权威指南读书笔记(四) 聚合与连接

    image-20200818094742912

    一、聚合函数

    大多数聚合函数位于org.apache.spark.sql.functions。当给定多个输入值时,聚合函数给每个分组计算出一个结果。

    count

    使用count对指定列进行计数或者使用count(*)或count(1)对所有列进行计数。需要注意的是,当执行count(*)时,Spark会对null值进行计数;而当对某指定列计数时,则不会对null值进行计数。

    image-20200818100546379

    countDistinct

    获取某列的唯一值数量计数

    image-20200818100905695

    approx_count_distinct

    在处理大数据计算时,获得 一个精确的结果开销会很大,但是计算一个近似结果相对容易很多。此时可使用approx_count_distinct。approx_count_distinct具有两个参数,第一个为列,第二个参数指定可容忍的最大误差。

    image-20200818102056966

    **first 和 last **

    基于DataFrame行的顺序而非中值顺序,获取DataFrame的第一个值和最后一个值。

    image-20200818102947837

    min 和 max

    从DataFrame中获取最小值和最大值

    image-20200818103628099

    sum

    累加一行中的所有值

    image-20200818103917699

    sumDistinct

    对一组去重值进行求和

    image-20200818104840040

    avg

    使用avg或mean方法获取平均值

    image-20200818105638905

    方差与标准差

    image-20200818110156445

    偏度系数(skewness)与峰度系数(kurtosis)

    image-20200818111419564

    协方差与相关性

    image-20200818113827511

    聚合输出复杂类型

    image-20200818160942448

    二、分组

    分组 groupBy

    分两个阶段进行分组,首先指定要对其分组的一列或多列,然后指定一个或多个聚合操作。第一步返回一个RelationalGroupedDataset,第二步返回 DataFrame。

    image-20200818161147721

    使用表达式分组

    image-20200818161436822

    使用Map进行分组

    转换操作指定一系列Map更方便,其中键为列,值为要执行的字符串形式的聚合函数。若以inline方式指定可重用多个列名。

    image-20200818162519883

    window函数

    window函数具体是指在指定数据窗口上执行聚合操作,并使用对当前数据的引用来定义它,此窗口指定将哪些行传递给此函数。不同于groupBy处理数据分组的方式 每一行只能进入一个分组,窗口函数基于Frame的一组行,计算表中的每一输入行的返回值,每一行属于一个或多个Frame。

    配置窗口函数第一步是创建一个窗口规范。

    image-20200818165610853

    rowsBetween参数说明

    rowBetween含有两参数,第一个参数指定操作起始位置,第二个参数指定操作的最后位置。

    Window.unboundPreceding 分区开始位置

    Window.currentRow 分区计算当前位置

    Window.unboundedFollowing 分区的最后位置

    负数 — 前有元素的情况下向前追加

    0 — 等价于currentRow

    正数 — 后有元素的情况下向后追加

    分组集

    分组集用于多组聚合操作组合在一起的底层工具,使得能够在group-by语句中创建任意得聚合操作。

    image-20200818171512039

    注:分组集取决于聚合级别的null值,如果不过滤空值,则会得到不正确的结果。

    rollup

    image-20200818174716873

    cube

    image-20200818174908567

    使用grouping_id对元数据分组

    image-20200818175931992

    透视转换

    使用透视转换后,DataFrame会为每一个Country和数值类型列组合产生一个新列。

    image-20200818180755793

    三、用户自定义聚合函数UDAF

    UDAF是用户根据自定义公式或业务逻辑定义自己的聚合函数的一种。可以使用UDAF计算输入数据组(与单行相对)的自定义计算。Spark维护单个AggregateBuffer,用于存储每组输入数据的中间结果。

    若要创建UDAF,必须继承UserDefineAggregateFunction基类实现以下方法:

    • inputSchema用于指定输入参数,输入参数类型为StructType
    • bufferSchema用于指定UDAF中间结果,中间结果类型为StructType
    • dataType用于指定返回结果,返回结果类型为DataType
    • deterministic是一个布尔值,它指定此UDAF对于某个输入是否会返回相同的结果
    • initialize初始化聚合缓冲区的初始值
    • update描述应如何根据给定行更新内部缓冲区
    • merge描述应如何合并两个聚合缓冲区
    • evaluate将生成聚合最终结果

    image-20200818184434129

    四、连接

    连接类型

    • inner join: 内部连接,保留左、右数据集内某个键都存在的行
    • outer join:外部连接, 保留左侧或右侧数据集中具有某个键的行
    • left outer join:左外部连接, 保留左侧数据集中具有某个键的行
    • right outer join: 右外部连接, 保留右侧数据集中具有某个键的行
    • left semi join: 左半连接,如果某键在右侧数据行中出现,则保留且仅保留左侧数据行
    • left anti join:左反连接, 如果某键在右侧数据行中没出现,则保留且仅保留左侧数据行
    • natural join: 自然连接, 通过隐式匹配两个数据集之间具有相同名称的列来执行连接
    • cross join:笛卡尔连接,又称为交叉连接,将左侧数据集中的每一行与右侧数据集中每一行匹配

    内连接

    image-20200818215521416

    外连接

    image-20200818215614760

    左外连接

    image-20200818215652272

    右外连接

    image-20200818215749677

    左半连接

    image-20200818215937647

    左反连接

    image-20200818220029305

    笛卡尔连接

    image-20200818220108960

    复杂类型连接操作

    image-20200818220343583

    处理连接中的重复列

    image-20200818220538260

    五、Spark通信策略

    在连接过程中,Spark以两种不同方式处理集群通信问题。要么执行导致all-to-all通信的shuffle join, 要么采用broadcast join。

    大表与大表连接(shuffle join)

    大表连接大表时,执行shuffle join。则每个结点都与所有其他结点进行通信,并根据哪个结点具有用于连接的某个键或某一组键来共享数据。由于网络会因通信量而阻塞,所以这种方式很耗时,特别是如果数据没有合理分区的情况下

    大表与小表连接(broadcast join)

    通过将数据量较小的DataFrame复制到集群中的所有工作结点上,只需开始时执行一次,然后让每个工作节点独立执行作业,无需等待其他工作节点,无需与其他工作结点通信。

    通过DataFrame API可以显式告知优化器,使用broadcast函数作用于较小的DataFrame上,并执行广播通信模式的连接操作。

    image-20200818225242861

    image-20200818225255520

    小表与小表连接

    最好让Spark自行决定

    六、补充 SparkSQL join的三种实现

    这部分内容参考博客:https://www.linkedin.com/pulse/spark-sql-3-common-joins-explained-ram-ghadiyaram

    http://hbasefly.com/2017/03/19/sparksql-basic-join/

    先说结论:

    1. Broadcast Hash Join 适合一张较小的表与一张大表进行join
    2. Shuffle Hash Join 适合一张小表和一张大表进行join,或者两张小表之间的join
    3. Sort Merge Join: 适合两张较大的表进行join

    注:前两者基于Hash Join, 不同点在于优先进行Shuffle或Broadcast。

    Hash Join

    整个过程分为三步:

    1. 确定BuildTable,ProbeTable。BuildTable使用join key构建Hash Table,Probe Table使用join key进行探测,探测成功则可Join。通常情况下,小表为BuildTable, 大表为ProbeTable。
    2. 构建HashTable。依次读取BuildTable的数据,对于每一行数据根据join key进行hash,hash到对应的Bucket,生成HashTable中的一条记录,数据缓存在内存中,若内存放不下需要dump到外存。
    3. 探测。再依次扫描ProbeTable数据,使用相同的hash函数映射HashTable中记录。映射成功后再检测Join条件,若匹配成功,则将二者进行Join。

    broadcast join(广播表要足够小)

    broadcast Join的条件:

    1. 被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M
    2. 基表不能被广播,如left outer join,只能广播右表

    broadcast join步骤:

    1. broadcast阶段:将小表广播分发到大表所在的所有结点。
    2. hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探

    0

    shuffle hash join

    shuffle hash join的条件:

    1. 分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M
    2. 基表不能被广播,比如left outer join时,只能广播右表
    3. 一侧的表要明显小于另外一侧,小的一侧将被广播(明显小于的定义为3倍小,此处为经验值)

    shuffle hash join步骤:

    1. 对两张表分别按照join keys进行重分区,即shuffle,目的是为了让有相同join keys值的记录分到对应的分区中
    2. 对对应分区中的数据进行join,对各个分区内先将小表分区构造为一张hash表,然后根据大表分区中记录的join keys值拿出来进行匹配

    sort merge join

    sort merge join步骤:

    1. shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理
    2. sort阶段:对单个分区节点的两表数据,分别进行排序
    3. merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则取更小一边

  • 相关阅读:
    UVA 11859
    [OpenGL]OpenGL坐标系和坐标变换
    树状数组
    编程算法
    乞讨 间隔[a,b]在见面p^k*q*^m(k>m)中数号码
    解析Android的 消息传递机制Handler
    Atitit.故障排除系列---php 计划网站数据库错误排除过程
    Remove Element
    [Angular Directive] Write a Structural Directive in Angular 2
    [Compose] 18. Maintaining structure whilst asyncing
  • 原文地址:https://www.cnblogs.com/ganshuoos/p/13527018.html
Copyright © 2011-2022 走看看