zoukankan      html  css  js  c++  java
  • Hive:select count(distinct)优化以及hive.groupby.skewindata

    Hive:select count(distinct)优化以及hive.groupby.skewindata

    原文链接:https://juejin.cn/post/6926536667877048333

    问题引入

    数据分析师小A接到需求,需要统计当日各个省份20岁以下的日活跃用户数(去重统计user_id,即UV)

    现有一个Hive表存储着用户行为数据:user_behaviour_trace_info

    描述
    user_id 用户id
    nickname 昵称
    age 年龄
    province 省份
    url 访问地址
    access_time 访问时间
    device_id 用户手机设备id

    小A很顺其自然的写出这段SQL:

    select
      province,
      count(distinct user_id) as uv
    from
      user_behaviour_trace_info
    where
      access_time = '今天' and age < 20
    group by
      province
    复制代码

    立马提交SQL开始执行任务,一顿操作猛如虎,一看时长十点五(小时)

    心想不愧是用户行为数据,数据量居然这么大?那让我们看看任务各个Task的执行耗时:

    以下三个JobHistory截图属于另一个select count(distinct)数据倾斜任务,具有代表意义

    我们可以观察到

    • 任务整体耗时:10小时11分钟:
    • Map Task 平均耗时:1分16秒
    • Reduce Task 平均耗时:1分59秒

    任务执行时间长,MR Task 平均耗时短,极有可能是出现了数据倾斜!

    那我们继续看看Map Task的执行情况,按Map Task耗时倒序排序

    Map Task最长耗时为2分49秒,而且整体看起来运行耗时相差不大,问题不在Map阶段

    接下来看看Reduce阶段,按Reduce Task耗时倒序排序

    好家伙,有一个Reduce Task执行了10个小时,另个一执行了近2小时,其余Reduce Task的执行时间很短。

    说好了大家一起干活,最终却只有我一个人扛下了所有?

    那么,问题出在哪里?

    我们先要弄明白 Hive 是如何执行这段 SQL 的

    Hive SQL 最终要转化成 MapReducer 任务,在逻辑上可以细分为三个阶段:

    1. Map阶段:将 group by 字段作为 key,聚合函数中的列作为 Value,输出键值对
    2. Shuffle阶段:对 Map 阶段输出的键值对 Key 进行 Hash,按照Hash值将键值对发送至各个 Reducer 中(相同的 Key 会分配给同一个 Reducer)
    3. Reduce阶段:执行聚合操作

    简而言之:SQL 中的 Group By 字段会决定某条数据最终落在哪一个 Reducer 上处理。

    下文将 group by 的字段称之为 group_by_column

    那么,对于刚刚那段SQL,group_by_column 是 province,同一个 province 的数据会分配给同一个 Reducer,在 Reduce 阶段,对 user_id 进行去重统计

    然鹅,我国共有34个省级行政区域,一个 Reducer 处理一个省的数据,最多也只能有 34 个 Reducer 同时处理数据

    当然,多个省份还可能落在同一个 Reducer 中

    如何优雅的解决问题?

    我们先来分析一下最初那段SQL的本质:

    1. Map + Shuffle阶段:按 province 将数据分发给 Reducer
    2. Reduce阶段:对同一个 province 的 user_id 先去重,再计数

    Reduce 阶段任务最重,执行了去重和计数两个操作:

    • 去重:在 province 内,对 user_id 去重
    • 计数:统计 province 内 user_id 的个数

    SQL任务慢的原因是:同一个 province 的数据全部由同一个 Reducer 处理

    思考一下,不难发现:

    1. 可以通过 group by 实现快速去重
    2. 计数操作可以由多个Task分别计数,最终再汇总结果

    那么优化思路就不言而喻了:将去重和计数两个操作分开,并且用多个Task同时计数,最终再汇总所有Task的计数数据

    hive.groupby.skewindata参数

    其实 Hive 早就考虑到这个场景,并且贴心的提供了 hive.groupby.skewindata 参数。

    当 hive.groupby.skewindata = true 时,Hive 会将原来的一个 MaReduce 阶段转化成两个 MapReduce 阶段:

    1. 一阶段MapReduce:随机打散数据,打散后进行局部聚合(数据去重 + 多Task局部计数)
    2. 二阶段MapReduce:对一阶段的局部聚合结果进行最终聚合(最终汇总计数)

    这样的描述看起来有点云里雾里,那不妨让我们自己通过手动优化来更加深入的理解这个参数。手动优化的思路和原理和 hive.groupby.skewindata = true 是一致的

    第一步:数据去重

    我们先实现第一步,在每个 province 中,对 user_id 进行去重

    SQL很简单,但有一些需要注意的点:

    1. 去重性能:group by 的去重性能要比 select distinct 要好,所以使用 group by 去重
    2. 数据过滤:因为要计算的 uv 指标有条件,所以需要对数据进行过滤
    3. null值:因为 count(distinc user_id) 不会计算 user_id 为 null 的数据,所以在去重时需要过滤 null 值

    那么我们可以写出这段SQL

    select
      province,
      user_id,
      cast(rand() * 1000 as int) as random_key -- 随机数,作用稍后解释~
    from
      user_behaviour_trace_info
    where
      access_time = '今天' and age < 20 -- uv统计条件
      and user_id is not null -- count(distinc)不统计null值
    group by -- 对数据进行去重
      province,
      user_id
    复制代码

    聪明的同学已经看出来了,这里除了对数据进行去重外,还多了一个随机数字段。这个随机数字段是用来做什么的呢,继续往下看你就知道了~

    第二步:打散数据,计算局部聚合结果

    数据去重完毕后,只需要统计每个 province 的 user_id 个数就能得到对应 province 的 uv 指标!

    由上文提到,group_by_column 决定了数据怎么分发给 Reducer

    同一个 group_by_column 的数据会分配给同一个 Reducer

    那么我们该如何让多个 Reducer 同时计算某个 province 的 user_id 个数呢?这里就可以使用去重阶段“多出来”的 随机数 random_key !

    select
      province,
      random_key,
      sum(1) as partial_uv -- 对 user_id 进行计数,是局部聚合结果
    from (
      select -- 子查询是第一步SQL:数据去重
        province,
        user_id,
        cast(rand() * 1000 as int) as random_key
      from
        user_behaviour_trace_info
      where
        access_time = '今天' and age < 20
        and user_id is not null
      group by
        province,
        user_id
    ) t1
    group by -- 对随机数也进行 group by,让多个 Reducer 一起统计数据
      province,
      random_key
    复制代码

    使用组合键 "province + random_key" 进行 group by,同一个 province 的数据会随机分发给多个 Reducer

    每个 Reducer 对 user_id 进行计数,获得局部聚合结果

    任务执行过程如下:

    第一步 + 第二步 就相当于是 hive.groupby.skewindata = true 时的一阶段Mapreduce

    第三步:最终聚合

    在第二步中,我们已经将同一个 province 的 user_id 分成多个部分,并且统计出了每个部分的 user_id 数量(partial_uv)

    那么接下来,我们只要对局部聚合结果进行简单的相加就可以了

    最终SQL如下:

    select
      province,
      sum(partial_aggregation) as uv -- 最终聚合结果就是 count(distinct user_id)
    from (
      select -- 第二步SQL:打散数据,计算局部聚合结果
        province,
        random_key,
        sum(1) as partial_uv
      from ( -- 第一步SQL:数据去重
        select
          province,
          user_id,
          cast(rand() * 1000 as int) as random_key
        from
          user_behaviour_trace_info
        where
          access_time = '今天' and age < 20
          and user_id is not null
        group by
          province,
          user_id
      ) t1
      group by
        province,
        random_key
    ) t2
    group by
      province
    复制代码

    进阶:如何优化多列 count(distinct)

    hive.groupby.skewindata 对 count(distinct) 的优化是有限制的,当 hive.groupby.skewindata = true 时,SQL只能对一个列进行 count(distinct),否则会抛出异常:

    Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: DISTINCT on different columns not supported with skew in data
    复制代码

    其实这很容易理解,在刚刚的手动优化过程中,我们能够很容易发现,这个方法不能同时对多个列进行 去重+计数 得出各自的 count(distinct) 值

    主要原因:无法在某一个维度里,同时对多个列进行去重

    count(distinct)优化方案不能直接套用在计算多列的情况上,但可以采用分治的思想,对每个列单独计算 count(distinct),然后再将结果进行合并

    案例

    现在有个需求,需要按省份分别去重统计当日 user_id 和 device_id 的去重数量,要求用户年龄为20岁以下

    优化前SQL:

    select
      province,
      count(distinct user_id) as uv,
      count(distinct device_id) as dv
    from
      user_behaviour_trace_info
    where
      access_time = '今天' and age < 20
    group by
      province
    复制代码
    优化方案
    • 第一步:单独计算 uv、dv:Job1、Job2
    • 第二步:合并计算结果:Job3

    值得注意的是,最外层 SELECT 使用 COALESCE() 是因为:在单独计算某个 count(distinct) 时,可能因为添加了统计条件(年龄小于20岁),而导致 province 没有对应的取值,left join 时指标为 null

    虽然写起来比原SQL要麻烦些,但效率吊打原SQL不知道多少倍

    笔者曾经将一个9小时耗时的任务,通过该方法优化至15分钟~

  • 相关阅读:
    ansible for devops 读书笔记第二章Ad-Hoc Commands
    ansible for devops读书笔记第一章
    python3 获取天气
    简单cpu web flask mysql
    mysql mysqldump只导出表结构或只导出数据的实现方法
    nginx 限制solr
    [Selenium] 如何使 InternetExplorerDriver 每次启动的端口不会随机变化
    [Selenium] 如何绕过 IE 的安全模式
    [Selenium] close alert window
    [Selenium] waitUntilAllAjaxRequestCompletes
  • 原文地址:https://www.cnblogs.com/solong1989/p/14959599.html
Copyright © 2011-2022 走看看