zoukankan      html  css  js  c++  java
  • 【ClickHouse 技术系列】 在 ClickHouse 中处理实时更新

    简介:本文翻译自 Altinity 针对 ClickHouse 的系列技术文章。面向联机分析处理(OLAP)的开源分析引擎 ClickHouse,因其优良的查询性能,PB级的数据规模,简单的架构,被国内外公司广泛采用。本系列技术文章,将详细展开介绍 ClickHouse。

    前言

    本文翻译自 Altinity 针对 ClickHouse 的系列技术文章。面向联机分析处理(OLAP)的开源分析引擎 ClickHouse,因其优良的查询性能,PB 级的数据规模,简单的架构,被国内外公司广泛采用。

    阿里云 EMR-OLAP 团队,基于开源 ClickHouse 进行了系列优化,提供了开源 OLAP 分析引擎 ClickHouse 的云上托管服务。EMR ClickHouse 完全兼容开源版本的产品特性,同时提供集群快速部署、集群管理、扩容、缩容和监控告警等云上产品功能,并且在开源的基础上优化了 ClickHouse 的读写性能,提升了 ClickHouse 与 EMR 其他组件快速集成的能力。访问 ClickHouse - E-MapReduce - 阿里云 了解详情。

    译者:何源(荆杭),阿里云计算平台事业部高级产品专家

    image.png

    (图源Altinity,侵删)

    在 ClickHouse 中处理实时更新

    目录

    • ClickHouse 更新的简短历史
    • 用例
    • 实现更新
    • 结论
    • 后续

    在 OLAP 数据库中,可变数据通常不受欢迎。ClickHouse 也不欢迎可变数据。像其他一些 OLAP 产品一样,ClickHouse 最初甚至不支持更新。后来添加了更新功能,但是像其他许多功能一样,都是以“ClickHouse 方式”添加的。

    即使是现在,ClickHouse 更新也是异步的,因此很难在交互式应用程序中使用。尽管如此,在许多用例中,用户需要对现有数据进行修改,并期望立即看到效果。ClickHouse 能做到吗?当然可以。

    ClickHouse 更新的简短历史

    早在 2016 年,ClickHouse 团队就发布了一篇题为“如何在 ClickHouse 中更新数据”的文章。当时 ClickHouse 并不支持数据修改,只能使用特殊的插入结构来模拟更新,并且数据必须按分区丢弃。

    为满足 GDPR 的要求,ClickHouse 团队在 2018 年提供了 UPDATE 和 DELETE。后续文章ClickHouse 中的更新和删除目前仍然是 Altinity 博客中阅读量最多的文章之一。这种异步、非原子性的更新以 ALTER TABLE UPDATE 语句的形式实现,并且可能会打乱大量数据。这对于批量操作和不频繁的更新是很有用的,因为它们不需要即时的结果。尽管“正常”的 SQL 更新每年都妥妥地出现在路线图中,但依然没能在 ClickHouse 中实现。如果需要实时更新行为,我们必须使用其他方法。让我们考虑一个实际的用例,并比较在 ClickHouse 中的不同实现方法。

    用例

    考虑一个生成各种报警的系统。用户或机器学习算法会不时查询数据库,以查看新的报警并进行确认。确认操作需要修改数据库中的报警记录。一旦得到确认,报警将从用户的视图中消失。这看起来像是一个 OLTP 操作,与 ClickHouse 格格不入。

    由于我们无法使用更新,因此只能转而插入修改后的记录。一旦数据库中有两条记录,我们就需要一种有效的方法来获取最新的记录。为此,我们将尝试 3 种不同的方法:

    • ReplacingMergeTree
    • 聚合函数
    • AggregatingMergeTree

    ReplacingMergeTree

    我们首先创建一个用来存储报警的表。

    CREATE TABLE alerts(
      tenant_id     UInt32,
      alert_id      String,
      timestamp     DateTime Codec(Delta, LZ4),
      alert_data    String,
      acked         UInt8 DEFAULT 0,
      ack_time      DateTime DEFAULT toDateTime(0),
      ack_user      LowCardinality(String) DEFAULT ''
    )
    ENGINE = ReplacingMergeTree(ack_time)
    PARTITION BY tuple()
    ORDER BY (tenant_id, timestamp, alert_id);

    为简单起见,将所有报警特定列都打包到一个通用的“alert_data”列中。但是可以想象到,报警可能包含数十甚至数百列。此外,在我们的示例中,“alert_id”是一个随机字符串。

    请注意 ReplacingMergeTree 引擎。ReplacingMergeTee 是一个特殊的表引擎,它借助 ORDER BY  语句按主键替换数据——具有相同键值的新版本行将替换旧版本行。在我们的用例中,“行数据的新旧程度”由“ack_time”列确定。替换是在后台合并操作中进行的,它不会立即发生,也不能保证会发生,因此查询结果的一致性是个问题。不过,ClickHouse 有一种特殊的语法来处理这样的表,我们在下面的查询中就会用到该语法。

    在运行查询之前,我们先用一些数据填充这个表。我们为 1000 个租户生成 1000 万个报警:

    INSERT INTO alerts(tenant_id, alert_id, timestamp, alert_data)
    SELECT
      toUInt32(rand(1)%1000+1) AS tenant_id,
      randomPrintableASCII(64) as alert_id,
      toDateTime('2020-01-01 00:00:00') + rand(2)%(3600*24*30) as timestamp,
      randomPrintableASCII(1024) as alert_data
    FROM numbers(10000000);

    接下来,我们确认 99% 的报警,为“acked”、“ack_user”和“ack_time”列提供新值。我们只是插入一个新行,而不是更新。

    INSERT INTO alerts (tenant_id, alert_id, timestamp, alert_data, acked, ack_user, ack_time)
    SELECT tenant_id, alert_id, timestamp, alert_data, 
      1 as acked, 
      concat('user', toString(rand()%1000)) as ack_user,       now() as ack_time
    FROM alerts WHERE cityHash64(alert_id) % 99 != 0;

    如果我们现在查询这个表,会看到如下结果:

    SELECT count() FROM alerts
    
    ┌──count()─┐
    │ 19898060 │
    └──────────┘
    
    1 rows in set. Elapsed: 0.008 sec. 

    表中显然既有已确认的行,也有未确认的行。所以替换还没有发生。为了查看“真实”数据,我们必须添加 FINAL 关键字。

    SELECT count() FROM alerts FINAL
    
    ┌──count()─┐
    │ 10000000 │
    └──────────┘
    
    1 rows in set. Elapsed: 3.693 sec. Processed 19.90 million rows, 1.71 GB (5.39 million rows/s., 463.39 MB/s.) 

    现在计数是正确了,但是看看查询时间增加了多少!使用 FINAL 后,ClickHouse 执行查询时必须扫描所有的行,并按主键合并它们。这样能得到正确答案,但造成了大量开销。让我们看看,只筛选未确认的行会不会有更好的效果。

    SELECT count() FROM alerts FINAL WHERE NOT acked
    
    ┌─count()─┐
    │  101940 │
    └─────────┘
    
    1 rows in set. Elapsed: 3.570 sec. Processed 19.07 million rows, 1.64 GB (5.34 million rows/s., 459.38 MB/s.) 

    尽管计数显著减少,但查询时间和处理的数据量还是一样。筛选无助于加快查询速度。随着表增大,成本可能会更加巨大。它不能扩展。

    注:为了提高可读性,所有查询和查询时间都像在“clickhouse-client”中运行一样显示。实际上,我们尝试了多次查询,以确保结果一致,并使用“clickhouse-benchmark”实用程序进行确认。

    好吧,查询整个表没什么帮助。我们的用例还能使用 ReplacingMergeTree 吗?让我们随机选择一个 tenant_id,然后选择所有未确认的记录——想象用户正在查看监控视图。我喜欢 Ray Bradbury,那就选 451 好了。由于“alert_data”的值只是随机生成的,因此我们将计算一个校验和,用来确认多种方法的结果相同:

    SELECT 
      count(), 
      sum(cityHash64(*)) AS data
    FROM alerts FINAL
    WHERE (tenant_id = 451) AND (NOT acked)
    
    ┌─count()─┬─────────────────data─┐
    │      90 │ 18441617166277032220 │
    └─────────┴──────────────────────┘
    
    1 rows in set. Elapsed: 0.278 sec. Processed 106.50 thousand rows, 119.52 MB (383.45 thousand rows/s., 430.33 MB/s.)

    太快了!我们只用了 278 毫秒就查询了所有未确认的数据。为什么这次很快?区别就在于筛选条件。“tenant_id”是某个主键的一部分,所以 ClickHouse 可以在 FINAL 之前筛选数据。在这种情况下,ReplacingMergeTree 就变得高效了。

    我们也试试用户筛选器,并查询由特定用户确认的报警数量。列的基数是相同的——我们有 1000 个用户,可以试试 user451。

    SELECT count() FROM alerts FINAL
    WHERE (ack_user = 'user451') AND acked
    
    ┌─count()─┐
    │    9725 │
    └─────────┘
    
    1 rows in set. Elapsed: 4.778 sec. Processed 19.04 million rows, 1.69 GB (3.98 million rows/s., 353.21 MB/s.)

    这个速度非常慢,因为没有使用索引。ClickHouse 扫描了全部 1904 万行。请注意,我们不能将“ack_user”添加到索引,因为它将破坏 ReplacingMergeTree 语义。不过,我们可以用 PREWHERE 进行一个巧妙的处理:

    SELECT count() FROM alerts FINAL
    PREWHERE (ack_user = 'user451') AND acked
    
    ┌─count()─┐
    │    9725 │
    └─────────┘
    
    1 rows in set. Elapsed: 0.639 sec. Processed 19.04 million rows, 942.40 MB (29.80 million rows/s., 1.48 GB/s.)

    PREWHERE 是一个特别的妙招,能让 ClickHouse 以不同方式应用筛选器。通常情况下 ClickHouse 是足够智能的,可以自动将条件移动到 PREWHERE,因此用户不必在意。这次没有发生,幸好我们检查过了。

    聚合函数

    ClickHouse 因支持各种聚合函数而闻名,最新版本可支持 100 多种。结合 9 个聚合函数组合子(参见 Combinators | ClickHouse Documentation),这为有经验的用户提供了很高的灵活性。对于此用例,我们不需要任何高级函数,仅使用以下 3 个函数:“argMax”、“max”和“any”。

    可以使用“argMax”聚合函数执行针对第 451 个租户的相同查询,如下所示:

    SELECT count(), sum(cityHash64(*)) data FROM (
      SELECT tenant_id, alert_id, timestamp, 
             argMax(alert_data, ack_time) alert_data, 
             argMax(acked, ack_time) acked,
             max(ack_time) ack_time_,
             argMax(ack_user, ack_time) ack_user
      FROM alerts 
      GROUP BY tenant_id, alert_id, timestamp
    ) 
    WHERE tenant_id=451 AND NOT acked;
    
    ┌─count()─┬─────────────────data─┐
    │      90 │ 18441617166277032220 │
    └─────────┴──────────────────────┘
    
    1 rows in set. Elapsed: 0.059 sec. Processed 73.73 thousand rows, 82.74 MB (1.25 million rows/s., 1.40 GB/s.)

    同样的结果,同样的行数,但性能是之前的 4 倍!这就是 ClickHouse 聚合的效率。缺点在于,查询变得更加复杂。但是我们可以让它变得更简单。

    请注意,当确认报警时,我们只更新以下 3 列:

    • acked: 0 => 1
    • ack_time: 0 => now()
    • ack_user: ‘’ => ‘user1’

    在所有 3 种情况下,列值都会增加!因此,我们可以使用“max”代替略显臃肿的“argMax”。由于我们不更改“alert_data”,因此不需要对此列进行任何实际聚合。ClickHouse 有一个很好用的“any”聚合函数,可以实现这一点。它可以在没有额外开销的情况下选取任何值:

    SELECT count(), sum(cityHash64(*)) data FROM (
      SELECT tenant_id, alert_id, timestamp, 
        any(alert_data) alert_data, 
        max(acked) acked, 
        max(ack_time) ack_time,
        max(ack_user) ack_user
      FROM alerts
      GROUP BY tenant_id, alert_id, timestamp
    ) 
    WHERE tenant_id=451 AND NOT acked;
    
    ┌─count()─┬─────────────────data─┐
    │      90 │ 18441617166277032220 │
    └─────────┴──────────────────────┘
    
    1 rows in set. Elapsed: 0.055 sec. Processed 73.73 thousand rows, 82.74 MB (1.34 million rows/s., 1.50 GB/s.)

    查询变简单了,而且更快了一点!原因就在于使用“any”函数后,ClickHouse 不需要对“alert_data”列计算“max”!

    AggregatingMergeTree

    AggregatingMergeTree 是 ClickHouse 最强大的功能之一。与物化视图结合使用时,它可以实现实时数据聚合。既然我们在之前的方法中使用了聚合函数,那么能否用 AggregatingMergeTree 使其更加完善呢?实际上,这并没有什么改善。

    我们一次只更新一行,所以一个组只有两行要聚合。对于这种情况,AggregatingMergeTree 不是最好的选择。不过我们有个小技巧。我们知道,报警总是先以非确认状态插入,然后再变成确认状态。用户确认报警后,只有 3 列需要修改。如果我们不重复其他列的数据,可以节省磁盘空间并提高性能吗?

    让我们创建一个使用“max”聚合函数来实现聚合的表。我们也可以用“any”代替“max”,但列必须是可以设置为空的——“any”会选择一个非空值。

    DROP TABLE alerts_amt_max;
    
    CREATE TABLE alerts_amt_max (
      tenant_id     UInt32,
      alert_id      String,
      timestamp     DateTime Codec(Delta, LZ4),
      alert_data    SimpleAggregateFunction(max, String),
      acked         SimpleAggregateFunction(max, UInt8),
      ack_time      SimpleAggregateFunction(max, DateTime),
      ack_user      SimpleAggregateFunction(max, LowCardinality(String))
    )
    Engine = AggregatingMergeTree()
    ORDER BY (tenant_id, timestamp, alert_id);

    由于原始数据是随机的,因此我们将使用“alerts”中的现有数据填充新表。我们将像之前一样分两次插入,一次是未确认的报警,另一次是已确认的报警:

    INSERT INTO alerts_amt_max SELECT * FROM alerts WHERE NOT acked;
    
    INSERT INTO alerts_amt_max 
    SELECT tenant_id, alert_id, timestamp,
      '' as alert_data, 
      acked, ack_time, ack_user 
    FROM alerts WHERE acked;

    请注意,对于已确认的事件,我们会插入一个空字符串,而不是“alert_data”。我们知道数据不会改变,我们只能存储一次!聚合函数将填补空白。在实际应用中,我们可以跳过所有不变的列,让它们获得默认值。

    有了数据后,我们先检查数据大小:

    SELECT 
        table, 
        sum(rows) AS r, 
        sum(data_compressed_bytes) AS c, 
        sum(data_uncompressed_bytes) AS uc, 
        uc / c AS ratio
    FROM system.parts
    WHERE active AND (database = 'last_state')
    GROUP BY table
    
    ┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬──────────────ratio─┐
    │ alerts         │ 19039439 │ 20926009562 │ 21049307710 │ 1.0058921003373666 │
    │ alerts_amt_max │ 19039439 │ 10723636061 │ 10902048178 │ 1.0166372782501314 │
    └────────────────┴──────────┴─────────────┴─────────────┴────────────────────┘

    好吧,由于有随机字符串,我们几乎没有压缩。但是,由于我们不必存储“alerts_data”两次,所以相较于不聚合,聚合后数据规模可以缩小一半。

    现在我们试试对聚合表进行查询:

    SELECT count(), sum(cityHash64(*)) data FROM (
       SELECT tenant_id, alert_id, timestamp, 
              max(alert_data) alert_data, 
              max(acked) acked, 
              max(ack_time) ack_time,
              max(ack_user) ack_user
         FROM alerts_amt_max
       GROUP BY tenant_id, alert_id, timestamp
    ) 
    WHERE tenant_id=451 AND NOT acked;
    
    ┌─count()─┬─────────────────data─┐
    │      90 │ 18441617166277032220 │
    └─────────┴──────────────────────┘
    
    1 rows in set. Elapsed: 0.036 sec. Processed 73.73 thousand rows, 40.75 MB (2.04 million rows/s., 1.13 GB/s.)

    多亏了 AggregatingMergeTree,我们处理的数据更少(之前是 82MB,现在是 40MB),效率更高。

    实现更新

    ClickHouse 会尽最大努力在后台合并数据,从而删除重复的行并执行聚合。然而,有时强制合并是有意义的,例如为了释放磁盘空间。这可以通过 OPTIMIZE FINAL 语句来实现。OPTIMIZE 操作速度慢、代价高,因此不能频繁执行。让我们看看它对查询性能有什么影响。

    OPTIMIZE TABLE alerts FINAL
    Ok.
    0 rows in set. Elapsed: 105.675 sec.
    
    OPTIMIZE TABLE alerts_amt_max FINAL
    Ok.
    0 rows in set. Elapsed: 70.121 sec.

    执行 OPTIMIZE FINAL 后,两个表的行数相同,数据也相同。

    ┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬────────────ratio─┐
    │ alerts         │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │
    │ alerts_amt_max │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │
    └────────────────┴──────────┴─────────────┴─────────────┴──────────────────┘

    不同方法之间的性能差异变得不那么明显了。汇总表如下:

    结论

    ClickHouse 提供了丰富的工具集来处理实时更新,如 ReplacingMergeTree、CollapsingMergeTree(本文未提及)、AggregatingMergeTree 和聚合函数。所有这些方法都具有以下三个共性:

    • 通过插入新版本来“修改”数据。ClickHouse 中的插入速度非常快。
    • 有一些有效的方法来模拟类似于 OLTP 数据库的更新语义。
    • 然而,实际的修改并不会立即发生。

    具体方法的选择取决于应用程序的用例。对用户来说,ReplacingMergeTree 是直截了当的,也是最方便的方法,但只适用于中小型的表,或者数据总是按主键查询的情况。使用聚合函数可以提供更高的灵活性和性能,但需要大量的查询重写。最后,AggregatingMergeTree 可以节约存储空间,只保留修改过的列。这些都是 ClickHouse DB 设计人员的好工具,可根据具体需要来应用。

    后续

    您已经了解了在 ClickHouse 中处理实时更新相关内容,本系列还包括其他内容:

    • 使用新的 TTL move,将数据存储在合适的地方
    • 在 ClickHouse 物化视图中使用 Join
    • ClickHouse 聚合函数和聚合状态
    • ClickHouse 中的嵌套数据结构

    原文链接
    本文为阿里云原创内容,未经允许不得转载。

  • 相关阅读:
    Fidder4 顶部提示 “The system proxy was changed,click to reenable fiddler capture”。
    redis 哨兵 sentinel master slave 连接建立过程
    虚拟点赞浏览功能的大数据量测试
    python基础练习题(题目 字母识词)
    python基础练习题(题目 回文数)
    python基础练习题(题目 递归求等差数列)
    python基础练习题(题目 递归输出)
    python基础练习题(题目 递归求阶乘)
    python基础练习题(题目 阶乘求和)
    python基础练习题(题目 斐波那契数列II)
  • 原文地址:https://www.cnblogs.com/yunqishequ/p/15672270.html
Copyright © 2011-2022 走看看