zoukankan      html  css  js  c++  java
  • 【ClickHouse 技术系列】 在 ClickHouse 中处理实时更新

    简介:本文翻译自 Altinity 针对 ClickHouse 的系列技术文章。面向联机分析处理(OLAP)的开源分析引擎 ClickHouse,因其优良的查询性能,PB级的数据规模,简单的架构,被国内外公司广泛采用。本系列技术文章,将详细展开介绍 ClickHouse。

    前言

    本文翻译自 Altinity 针对 ClickHouse 的系列技术文章。面向联机分析处理(OLAP)的开源分析引擎 ClickHouse,因其优良的查询性能,PB 级的数据规模,简单的架构,被国内外公司广泛采用。

    阿里云 EMR-OLAP 团队,基于开源 ClickHouse 进行了系列优化,提供了开源 OLAP 分析引擎 ClickHouse 的云上托管服务。EMR ClickHouse 完全兼容开源版本的产品特性,同时提供集群快速部署、集群管理、扩容、缩容和监控告警等云上产品功能,并且在开源的基础上优化了 ClickHouse 的读写性能,提升了 ClickHouse 与 EMR 其他组件快速集成的能力。访问 ClickHouse - E-MapReduce - 阿里云 了解详情。

    译者:何源(荆杭),阿里云计算平台事业部高级产品专家

    image.png

    (图源Altinity,侵删)

    在 ClickHouse 中处理实时更新

    目录

    • ClickHouse 更新的简短历史
    • 用例
    • 实现更新
    • 结论
    • 后续

    在 OLAP 数据库中,可变数据通常不受欢迎。ClickHouse 也不欢迎可变数据。像其他一些 OLAP 产品一样,ClickHouse 最初甚至不支持更新。后来添加了更新功能,但是像其他许多功能一样,都是以“ClickHouse 方式”添加的。

    即使是现在,ClickHouse 更新也是异步的,因此很难在交互式应用程序中使用。尽管如此,在许多用例中,用户需要对现有数据进行修改,并期望立即看到效果。ClickHouse 能做到吗?当然可以。

    ClickHouse 更新的简短历史

    早在 2016 年,ClickHouse 团队就发布了一篇题为“如何在 ClickHouse 中更新数据”的文章。当时 ClickHouse 并不支持数据修改,只能使用特殊的插入结构来模拟更新,并且数据必须按分区丢弃。

    为满足 GDPR 的要求,ClickHouse 团队在 2018 年提供了 UPDATE 和 DELETE。后续文章ClickHouse 中的更新和删除目前仍然是 Altinity 博客中阅读量最多的文章之一。这种异步、非原子性的更新以 ALTER TABLE UPDATE 语句的形式实现,并且可能会打乱大量数据。这对于批量操作和不频繁的更新是很有用的,因为它们不需要即时的结果。尽管“正常”的 SQL 更新每年都妥妥地出现在路线图中,但依然没能在 ClickHouse 中实现。如果需要实时更新行为,我们必须使用其他方法。让我们考虑一个实际的用例,并比较在 ClickHouse 中的不同实现方法。

    用例

    考虑一个生成各种报警的系统。用户或机器学习算法会不时查询数据库,以查看新的报警并进行确认。确认操作需要修改数据库中的报警记录。一旦得到确认,报警将从用户的视图中消失。这看起来像是一个 OLTP 操作,与 ClickHouse 格格不入。

    由于我们无法使用更新,因此只能转而插入修改后的记录。一旦数据库中有两条记录,我们就需要一种有效的方法来获取最新的记录。为此,我们将尝试 3 种不同的方法:

    • ReplacingMergeTree
    • 聚合函数
    • AggregatingMergeTree

    ReplacingMergeTree

    我们首先创建一个用来存储报警的表。

    CREATE TABLE alerts(
      tenant_id     UInt32,
      alert_id      String,
      timestamp     DateTime Codec(Delta, LZ4),
      alert_data    String,
      acked         UInt8 DEFAULT 0,
      ack_time      DateTime DEFAULT toDateTime(0),
      ack_user      LowCardinality(String) DEFAULT ''
    )
    ENGINE = ReplacingMergeTree(ack_time)
    PARTITION BY tuple()
    ORDER BY (tenant_id, timestamp, alert_id);

    为简单起见,将所有报警特定列都打包到一个通用的“alert_data”列中。但是可以想象到,报警可能包含数十甚至数百列。此外,在我们的示例中,“alert_id”是一个随机字符串。

    请注意 ReplacingMergeTree 引擎。ReplacingMergeTee 是一个特殊的表引擎,它借助 ORDER BY  语句按主键替换数据——具有相同键值的新版本行将替换旧版本行。在我们的用例中,“行数据的新旧程度”由“ack_time”列确定。替换是在后台合并操作中进行的,它不会立即发生,也不能保证会发生,因此查询结果的一致性是个问题。不过,ClickHouse 有一种特殊的语法来处理这样的表,我们在下面的查询中就会用到该语法。

    在运行查询之前,我们先用一些数据填充这个表。我们为 1000 个租户生成 1000 万个报警:

    INSERT INTO alerts(tenant_id, alert_id, timestamp, alert_data)
    SELECT
      toUInt32(rand(1)%1000+1) AS tenant_id,
      randomPrintableASCII(64) as alert_id,
      toDateTime('2020-01-01 00:00:00') + rand(2)%(3600*24*30) as timestamp,
      randomPrintableASCII(1024) as alert_data
    FROM numbers(10000000);

    接下来,我们确认 99% 的报警,为“acked”、“ack_user”和“ack_time”列提供新值。我们只是插入一个新行,而不是更新。

    INSERT INTO alerts (tenant_id, alert_id, timestamp, alert_data, acked, ack_user, ack_time)
    SELECT tenant_id, alert_id, timestamp, alert_data, 
      1 as acked, 
      concat('user', toString(rand()%1000)) as ack_user,       now() as ack_time
    FROM alerts WHERE cityHash64(alert_id) % 99 != 0;

    如果我们现在查询这个表,会看到如下结果:

    SELECT count() FROM alerts
    
    ┌──count()─┐
    │ 19898060 │
    └──────────┘
    
    1 rows in set. Elapsed: 0.008 sec. 

    表中显然既有已确认的行,也有未确认的行。所以替换还没有发生。为了查看“真实”数据,我们必须添加 FINAL 关键字。

    SELECT count() FROM alerts FINAL
    
    ┌──count()─┐
    │ 10000000 │
    └──────────┘
    
    1 rows in set. Elapsed: 3.693 sec. Processed 19.90 million rows, 1.71 GB (5.39 million rows/s., 463.39 MB/s.) 

    现在计数是正确了,但是看看查询时间增加了多少!使用 FINAL 后,ClickHouse 执行查询时必须扫描所有的行,并按主键合并它们。这样能得到正确答案,但造成了大量开销。让我们看看,只筛选未确认的行会不会有更好的效果。

    SELECT count() FROM alerts FINAL WHERE NOT acked
    
    ┌─count()─┐
    │  101940 │
    └─────────┘
    
    1 rows in set. Elapsed: 3.570 sec. Processed 19.07 million rows, 1.64 GB (5.34 million rows/s., 459.38 MB/s.) 

    尽管计数显著减少,但查询时间和处理的数据量还是一样。筛选无助于加快查询速度。随着表增大,成本可能会更加巨大。它不能扩展。

    注:为了提高可读性,所有查询和查询时间都像在“clickhouse-client”中运行一样显示。实际上,我们尝试了多次查询,以确保结果一致,并使用“clickhouse-benchmark”实用程序进行确认。

    好吧,查询整个表没什么帮助。我们的用例还能使用 ReplacingMergeTree 吗?让我们随机选择一个 tenant_id,然后选择所有未确认的记录——想象用户正在查看监控视图。我喜欢 Ray Bradbury,那就选 451 好了。由于“alert_data”的值只是随机生成的,因此我们将计算一个校验和,用来确认多种方法的结果相同:

    SELECT 
      count(), 
      sum(cityHash64(*)) AS data
    FROM alerts FINAL
    WHERE (tenant_id = 451) AND (NOT acked)
    
    ┌─count()─┬─────────────────data─┐
    │      90 │ 18441617166277032220 │
    └─────────┴──────────────────────┘
    
    1 rows in set. Elapsed: 0.278 sec. Processed 106.50 thousand rows, 119.52 MB (383.45 thousand rows/s., 430.33 MB/s.)

    太快了!我们只用了 278 毫秒就查询了所有未确认的数据。为什么这次很快?区别就在于筛选条件。“tenant_id”是某个主键的一部分,所以 ClickHouse 可以在 FINAL 之前筛选数据。在这种情况下,ReplacingMergeTree 就变得高效了。

    我们也试试用户筛选器,并查询由特定用户确认的报警数量。列的基数是相同的——我们有 1000 个用户,可以试试 user451。

    SELECT count() FROM alerts FINAL
    WHERE (ack_user = 'user451') AND acked
    
    ┌─count()─┐
    │    9725 │
    └─────────┘
    
    1 rows in set. Elapsed: 4.778 sec. Processed 19.04 million rows, 1.69 GB (3.98 million rows/s., 353.21 MB/s.)

    这个速度非常慢,因为没有使用索引。ClickHouse 扫描了全部 1904 万行。请注意,我们不能将“ack_user”添加到索引,因为它将破坏 ReplacingMergeTree 语义。不过,我们可以用 PREWHERE 进行一个巧妙的处理:

    SELECT count() FROM alerts FINAL
    PREWHERE (ack_user = 'user451') AND acked
    
    ┌─count()─┐
    │    9725 │
    └─────────┘
    
    1 rows in set. Elapsed: 0.639 sec. Processed 19.04 million rows, 942.40 MB (29.80 million rows/s., 1.48 GB/s.)

    PREWHERE 是一个特别的妙招,能让 ClickHouse 以不同方式应用筛选器。通常情况下 ClickHouse 是足够智能的,可以自动将条件移动到 PREWHERE,因此用户不必在意。这次没有发生,幸好我们检查过了。

    聚合函数

    ClickHouse 因支持各种聚合函数而闻名,最新版本可支持 100 多种。结合 9 个聚合函数组合子(参见 Combinators | ClickHouse Documentation),这为有经验的用户提供了很高的灵活性。对于此用例,我们不需要任何高级函数,仅使用以下 3 个函数:“argMax”、“max”和“any”。

    可以使用“argMax”聚合函数执行针对第 451 个租户的相同查询,如下所示:

    SELECT count(), sum(cityHash64(*)) data FROM (
      SELECT tenant_id, alert_id, timestamp, 
             argMax(alert_data, ack_time) alert_data, 
             argMax(acked, ack_time) acked,
             max(ack_time) ack_time_,
             argMax(ack_user, ack_time) ack_user
      FROM alerts 
      GROUP BY tenant_id, alert_id, timestamp
    ) 
    WHERE tenant_id=451 AND NOT acked;
    
    ┌─count()─┬─────────────────data─┐
    │      90 │ 18441617166277032220 │
    └─────────┴──────────────────────┘
    
    1 rows in set. Elapsed: 0.059 sec. Processed 73.73 thousand rows, 82.74 MB (1.25 million rows/s., 1.40 GB/s.)

    同样的结果,同样的行数,但性能是之前的 4 倍!这就是 ClickHouse 聚合的效率。缺点在于,查询变得更加复杂。但是我们可以让它变得更简单。

    请注意,当确认报警时,我们只更新以下 3 列:

    • acked: 0 => 1
    • ack_time: 0 => now()
    • ack_user: ‘’ => ‘user1’

    在所有 3 种情况下,列值都会增加!因此,我们可以使用“max”代替略显臃肿的“argMax”。由于我们不更改“alert_data”,因此不需要对此列进行任何实际聚合。ClickHouse 有一个很好用的“any”聚合函数,可以实现这一点。它可以在没有额外开销的情况下选取任何值:

    SELECT count(), sum(cityHash64(*)) data FROM (
      SELECT tenant_id, alert_id, timestamp, 
        any(alert_data) alert_data, 
        max(acked) acked, 
        max(ack_time) ack_time,
        max(ack_user) ack_user
      FROM alerts
      GROUP BY tenant_id, alert_id, timestamp
    ) 
    WHERE tenant_id=451 AND NOT acked;
    
    ┌─count()─┬─────────────────data─┐
    │      90 │ 18441617166277032220 │
    └─────────┴──────────────────────┘
    
    1 rows in set. Elapsed: 0.055 sec. Processed 73.73 thousand rows, 82.74 MB (1.34 million rows/s., 1.50 GB/s.)

    查询变简单了,而且更快了一点!原因就在于使用“any”函数后,ClickHouse 不需要对“alert_data”列计算“max”!

    AggregatingMergeTree

    AggregatingMergeTree 是 ClickHouse 最强大的功能之一。与物化视图结合使用时,它可以实现实时数据聚合。既然我们在之前的方法中使用了聚合函数,那么能否用 AggregatingMergeTree 使其更加完善呢?实际上,这并没有什么改善。

    我们一次只更新一行,所以一个组只有两行要聚合。对于这种情况,AggregatingMergeTree 不是最好的选择。不过我们有个小技巧。我们知道,报警总是先以非确认状态插入,然后再变成确认状态。用户确认报警后,只有 3 列需要修改。如果我们不重复其他列的数据,可以节省磁盘空间并提高性能吗?

    让我们创建一个使用“max”聚合函数来实现聚合的表。我们也可以用“any”代替“max”,但列必须是可以设置为空的——“any”会选择一个非空值。

    DROP TABLE alerts_amt_max;
    
    CREATE TABLE alerts_amt_max (
      tenant_id     UInt32,
      alert_id      String,
      timestamp     DateTime Codec(Delta, LZ4),
      alert_data    SimpleAggregateFunction(max, String),
      acked         SimpleAggregateFunction(max, UInt8),
      ack_time      SimpleAggregateFunction(max, DateTime),
      ack_user      SimpleAggregateFunction(max, LowCardinality(String))
    )
    Engine = AggregatingMergeTree()
    ORDER BY (tenant_id, timestamp, alert_id);

    由于原始数据是随机的,因此我们将使用“alerts”中的现有数据填充新表。我们将像之前一样分两次插入,一次是未确认的报警,另一次是已确认的报警:

    INSERT INTO alerts_amt_max SELECT * FROM alerts WHERE NOT acked;
    
    INSERT INTO alerts_amt_max 
    SELECT tenant_id, alert_id, timestamp,
      '' as alert_data, 
      acked, ack_time, ack_user 
    FROM alerts WHERE acked;

    请注意,对于已确认的事件,我们会插入一个空字符串,而不是“alert_data”。我们知道数据不会改变,我们只能存储一次!聚合函数将填补空白。在实际应用中,我们可以跳过所有不变的列,让它们获得默认值。

    有了数据后,我们先检查数据大小:

    SELECT 
        table, 
        sum(rows) AS r, 
        sum(data_compressed_bytes) AS c, 
        sum(data_uncompressed_bytes) AS uc, 
        uc / c AS ratio
    FROM system.parts
    WHERE active AND (database = 'last_state')
    GROUP BY table
    
    ┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬──────────────ratio─┐
    │ alerts         │ 19039439 │ 20926009562 │ 21049307710 │ 1.0058921003373666 │
    │ alerts_amt_max │ 19039439 │ 10723636061 │ 10902048178 │ 1.0166372782501314 │
    └────────────────┴──────────┴─────────────┴─────────────┴────────────────────┘

    好吧,由于有随机字符串,我们几乎没有压缩。但是,由于我们不必存储“alerts_data”两次,所以相较于不聚合,聚合后数据规模可以缩小一半。

    现在我们试试对聚合表进行查询:

    SELECT count(), sum(cityHash64(*)) data FROM (
       SELECT tenant_id, alert_id, timestamp, 
              max(alert_data) alert_data, 
              max(acked) acked, 
              max(ack_time) ack_time,
              max(ack_user) ack_user
         FROM alerts_amt_max
       GROUP BY tenant_id, alert_id, timestamp
    ) 
    WHERE tenant_id=451 AND NOT acked;
    
    ┌─count()─┬─────────────────data─┐
    │      90 │ 18441617166277032220 │
    └─────────┴──────────────────────┘
    
    1 rows in set. Elapsed: 0.036 sec. Processed 73.73 thousand rows, 40.75 MB (2.04 million rows/s., 1.13 GB/s.)

    多亏了 AggregatingMergeTree,我们处理的数据更少(之前是 82MB,现在是 40MB),效率更高。

    实现更新

    ClickHouse 会尽最大努力在后台合并数据,从而删除重复的行并执行聚合。然而,有时强制合并是有意义的,例如为了释放磁盘空间。这可以通过 OPTIMIZE FINAL 语句来实现。OPTIMIZE 操作速度慢、代价高,因此不能频繁执行。让我们看看它对查询性能有什么影响。

    OPTIMIZE TABLE alerts FINAL
    Ok.
    0 rows in set. Elapsed: 105.675 sec.
    
    OPTIMIZE TABLE alerts_amt_max FINAL
    Ok.
    0 rows in set. Elapsed: 70.121 sec.

    执行 OPTIMIZE FINAL 后,两个表的行数相同,数据也相同。

    ┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬────────────ratio─┐
    │ alerts         │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │
    │ alerts_amt_max │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │
    └────────────────┴──────────┴─────────────┴─────────────┴──────────────────┘

    不同方法之间的性能差异变得不那么明显了。汇总表如下:

    结论

    ClickHouse 提供了丰富的工具集来处理实时更新,如 ReplacingMergeTree、CollapsingMergeTree(本文未提及)、AggregatingMergeTree 和聚合函数。所有这些方法都具有以下三个共性:

    • 通过插入新版本来“修改”数据。ClickHouse 中的插入速度非常快。
    • 有一些有效的方法来模拟类似于 OLTP 数据库的更新语义。
    • 然而,实际的修改并不会立即发生。

    具体方法的选择取决于应用程序的用例。对用户来说,ReplacingMergeTree 是直截了当的,也是最方便的方法,但只适用于中小型的表,或者数据总是按主键查询的情况。使用聚合函数可以提供更高的灵活性和性能,但需要大量的查询重写。最后,AggregatingMergeTree 可以节约存储空间,只保留修改过的列。这些都是 ClickHouse DB 设计人员的好工具,可根据具体需要来应用。

    后续

    您已经了解了在 ClickHouse 中处理实时更新相关内容,本系列还包括其他内容:

    • 使用新的 TTL move,将数据存储在合适的地方
    • 在 ClickHouse 物化视图中使用 Join
    • ClickHouse 聚合函数和聚合状态
    • ClickHouse 中的嵌套数据结构

    原文链接
    本文为阿里云原创内容,未经允许不得转载。

  • 相关阅读:
    比官方文档更易懂的Vue.js教程!包你学会!
    张腾:腾讯云融合通信应用场景及案例分享
    你大概走了假敏捷:《手绘敏捷宝典》在此,还不来收!
    这个PHP无解深坑,你能解出来吗?(听说能解出来的都很秀)
    从小数学就不及格的我,竟然用极坐标系表白了我的女神!(附代码)
    epoll
    hash_map
    BloomFilter
    Wrapper模式(Decorator模式)
    setvbuf
  • 原文地址:https://www.cnblogs.com/yunqishequ/p/15672270.html
Copyright © 2011-2022 走看看