zoukankan      html  css  js  c++  java
  • Flink集成Iceberg简介

    1. 概述

    Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.

    官方的定义,iceberg是一种表格式。我们可以简单理解为他是基于计算层(flink、spark)和存储层(orc、parqurt)的一个中间层,我们可以把它定义成一种“数据组织格式”,Iceberg将其称之为“表格式”也是表达类似的含义。他与底层的存储格式(比如ORC、Parquet之类的列式存储格式)最大的区别是,它并不定义数据存储方式,而是定义了数据、元数据的组织方式,向上提供统一的“表”的语义。它构建在数据存储格式之上,其底层的数据存储仍然使用Parquet、ORC等进行存储。在hive建立一个iceberg格式的表。用flink或者spark写入iceberg,然后再通过其他方式来读取这个表,比如spark、flink、presto等。

     

    Iceberg的架构和实现并未绑定于某一特定引擎,它实现了通用的数据组织格式,利用此格式可以方便地与不同引擎(如Flink、Hive、Spark)对接。

    2. Iceberg优势

    • 增量读取处理能力:Iceberg支持通过流式方式读取增量数据,支持Structed Streaming以及Flink table Source;
    • 支持事务(ACID),上游数据写入即可见,不影响当前数据处理任务,简化ETL;提供upsert和merge into能力,可以极大地缩小数据入库延迟;
    • 可扩展的元数据,快照隔离以及对于文件列表的所有修改都是原子操作;
    • 同时支持流批处理、支持多种存储格式和灵活的文件组织:提供了基于流式的增量计算模型和基于批处理的全量表计算模型。批处理和流任务可以使用相同的存储模型,数据不再孤立;Iceberg支持隐藏分区和分区进化,方便业务进行数据分区策略更新。支持Parquet、Avro以及ORC等存储格式。
    • 支持多种计算引擎,优秀的内核抽象使之不绑定特定的计算引擎,目前Iceberg支持的计算引擎有Spark、Flink、Presto以及Hive。

    3. Flink+ Iceberg搭建使用

    Apache Iceberg支持Apache Flink的DataStream Api和Table Api写记录进iceberg表。当前,我们只集成Iceberg和apache flink 1.11.x。

      

    3.1. 准备

    为了在flink中创建iceberg表,我们要求使用flink SQL client,因为这对使用者们来说更容易去理解概念。

    准备两个jar包:

    启动flink sql client,不带hive connector jar包,可以创建hadoop catalog如下:

    ./bin/sql-client.sh embedded 
        -j /data/flink-1.11.2/otherlib/iceberg-flink-runtime-0.10.0.jar 
        shell

    启动flink sql client,带hive connector jar包,可以创建hadoop catalog和hive catalog如下:

    ./bin/sql-client.sh embedded 
        -j /data/flink-1.11.2/otherlib/iceberg-flink-runtime-0.10.0.jar 
        -j /data/flink-1.11.2/otherlib/flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar 
        shell

      注:

        iceberg-flink-runtime-0.10.0.jar和flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar不能放在flink安装目录的lib下,需要另外放置在别的目录

    3.2. 创建catalogs和使用catalogs

    Flink1.11支持通过flink sql创建catalogs。catalog是Iceberg对表进行管理(create、drop、rename等)的一个组件。目前Iceberg主要支持HiveCatalog和HadoopCatalog两种Catalog。其中HiveCatalog将当前表metadata文件路径存储在hive Metastore,这个表metadata文件是所有读写Iceberg表的入口,所以每次读写Iceberg表都需要先从hive Metastore中取出对应的表metadata文件路径,然后再解析这个Metadata文件进行接下来的操作。而HadoopCatalog将当前表metadata文件路径记录在一个文件目录下,因此不需要连接hive Metastore。

    3.2.1 Hive catalog

    创建一个名为hive_catalog的 iceberg catalog ,用来从 hive metastore 中加载表。

    CREATE CATALOG hive_catalog WITH (
      'type'='iceberg',
      'catalog-type'='hive',
      'uri'='thrift://localhost:9083',
      'clients'='5',
      'property-version'='1',
      'warehouse'='hdfs://nn:8020/warehouse/path'
    );
    • type: 只能使用iceberg,用于 iceberg 表格式。(必须)
    • catalog-type: Iceberg 当前支持hive或hadoopcatalog 类型。(必须)
    • uri: Hive metastore 的 thrift URI。 (必须)
    • clients: Hive metastore 客户端池大小,默认值为 2。 (可选)
    • property-version: 版本号来描述属性版本。此属性可用于在属性格式发生更改时进行向后兼容。当前的属性版本是 1。(可选)
    • warehouse: Hive 仓库位置, 如果既不将 hive-conf-dir 设置为指定包含 hive-site.xml 配置文件的位置,也不将正确的 hive-site.xml 添加到类路径,则用户应指定此路径。
    • hive-conf-dir: 包含 Hive-site.xml 配置文件的目录的路径,该配置文件将用于提供自定义的 Hive 配置值。 如果在创建 iceberg catalog 时同时设置 hive-conf-dir 和 warehouse,那么将使用 warehouse 值覆盖 < hive-conf-dir >/hive-site.xml (或者 classpath 中的 hive 配置文件)中的 hive.metastore.warehouse.dir 的值。

    3.2.2 Hadoop catalog

    Iceberg 还支持 HDFS 中基于目录的 catalog ,可以使用’catalog-type’='hadoop’进行配置:

    CREATE CATALOG hadoop_catalog WITH (
      'type'='iceberg',
      'catalog-type'='hadoop',
      'warehouse'='hdfs://nn:8020/warehouse/path',
      'property-version'='1'
    );
    • warehouse:hdfs目录存储元数据文件和数据文件。(必须)

    我们可以执行sql命令USE CATALOG hive_catalog来设置当前的catalog。

    3.2.3 Custom catalog

    Flink也支持通过指定catalog-impl属性来加载自定义的Iceberg catalog接口。当catalog-impl设置了,catalog-type的值可以忽略,这里有个例子:

    CREATE CATALOG my_catalog WITH (
      'type'='iceberg',
      'catalog-impl'='com.my.custom.CatalogImpl',
      'my-additional-catalog-config'='my-value'
    );

    3.2.4 Create through YAML config

    在启动SQL客户端之前,Catalogs可以通过在sql-client-defaults.yaml文件中注册。这里有个例子:

    catalogs: 
      - name: my_catalog
        type: iceberg
        catalog-type: hadoop
        warehouse: hdfs://nn:8020/warehouse/path

    3.2.5 Iceberg表的目录组织形式

    1. HiveCatalog

    hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hive_iceberg.db/action_logs
    Found 2 items
    drwxrwxrwx   - hadoop supergroup          0 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/data
    drwxrwxrwx   - hadoop supergroup          0 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata

    其中data目录下存储数据文件,metadata目录下存储元数据文件。

    2. metadata目录

    hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata
    Found 4 items
    -rw-r--r--   1 hadoop supergroup       1448 2020-06-08 11:31 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/00000-e7c1e6ce-8eb9-4faf-a176-bd94dec3c0e4.metadata.json
    -rw-r--r--   1 hadoop supergroup       2217 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/00001-62ade8ab-c1cf-40d3-bc21-fd5027bc3a55.metadata.json
    -rw-r--r--   1 hadoop supergroup       5040 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/bb641961-162a-49a8-b567-885430d4e799-m0.avro
    -rw-r--r--   1 hadoop supergroup       2567 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/snap-6771375506965563160-1-bb641961-162a-49a8-b567-885430d4e799.avro

    其中00001-62ade8ab-c1cf-40d3-bc21-fd5027bc3a55.metadata.json中存储表的shcema、partition spec以及当前snapshot manifests文件路径。snap-6771375506965563160-1-bb641961-162a-49a8-b567-885430d4e799.avro存储manifest文件路径。bb641961-162a-49a8-b567-885430d4e799-m0.avro记录本次提交的文件以及文件级别元数据。

    3. data目录

    hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-19/action=click
    Found 1 items
    -rw-r--r--   1 hadoop supergroup       1425 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-19/action=click/00015-47-a9f5ce8f-ee6f-4748-9f49-0f94761859bc-00000.parquet

    4. HadoopCatalog

    Hadoopcatalog与Hivecatalog的data目录完全相同,metadata目录下文件稍有不同,HadoopCatalog管理的metadata目录如下所示:

    hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata
    Found 5 items
    -rw-r--r--   1 hadoop supergroup       5064 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/b222d277-2692-4e35-9327-3716dec9f070-m0.avro
    -rw-r--r--   1 hadoop supergroup       2591 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/snap-3124052841098464551-1-b222d277-2692-4e35-9327-3716dec9f070.avro
    -rw-r--r--   1 hadoop supergroup       1476 2020-06-08 17:23 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/v1.metadata.json
    -rw-r--r--   1 hadoop supergroup       2261 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/v2.metadata.json
    -rw-r--r--   1 hadoop supergroup          1 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/version-hint.text
     其中文件version-hint.text中存储当前iceberg表的最新snapshot_id,如下所示:
    hadoop@xxx:~$ hdfs dfs -cat /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/version-hint.text 2

    说明该表的最新snapshot_id是2,即对应的snapshot元数据文件是v2.metadata.json,解析v2.metadata.json可以获取到该表当前最新snapshot对应的scheme、partition spec、父snapshot以及该snapshot对应的manifestList文件路径等,因此version-hint.text是HadoopCatalog获取当前snapshot版本的入口。

    HiveCatalog的metadata目录下并没有version-hint.text文件,那它获取当前snapshot版本的入口在哪里呢?它的入口在Metastore中的schema里面,可以在HiveCatalog建表schema中的TBPROPERTIES中有个key是“metadata_location”,对应的value就是当前最新的snapshot文件。因此,有两点需要说明:

    • HiveCatalog创建的表,每次提交写入文件生成新的snapshot后都需要更新Metastore中的metadata_location字段。
    • HiveCatalog和HadoopCatalog不能混用。即使用HiveCatalog创建的表,再使用HadoopCatalog是不能正常加载的,反之亦然。

    3.2.6 为什么选择HadoopCatalog

    上面说到Iceberg目前支持两种Catalog,而且两种Catalog相互不兼容。那这里有两个问题:

    1. 社区是出于什么考虑实现两种不兼容的Catalog?

    2. 因为两者不兼容,必须选择其一作为系统唯一的Catalog,那是选择HiveCatalog还是HadoopCatalog,为什么?

    先回答第一个问题。社区是出于什么考虑实现两种不兼容的Catalog?

    在回答这个问题之前,首先回顾一下上一篇文章中介绍到的基于HadoopCatalog,Iceberg实现数据写入提交的ACID机制,最终的结论是使用了乐观锁机制和HDFS rename的原子性一起保障写入提交的ACID。如果某些文件系统比如S3不支持rename的原子性呢?那就需要另外一种机制保障写入提交的ACID,HiveCatalog就是另一种不依赖文件系统支持,但是可以提供ACID支持的方案,它在每次提交的时候都更新MySQL中同一行记录,这样的更新MySQL本身是可以保证ACID的。这就是社区为什么会支持两种不兼容Catalog的本质原因。

    再来回答第二个问题。HadoopCatalog依赖于HDFS提供的rename原子性语义,而HiveCatalog不依赖于任何文件系统的rename原子性语义支持,因此基于HiveCatalog的表不仅可以支持HDFS,而且可以支持s3、oss等其他文件系统。但是HadoopCatalog可以认为只支持HDFS表,比较难以迁移到其他文件系统。但是HadoopCatalog写入提交的过程只依赖HDFS,不和Metastore/MySQL交互,而HiveCatalog每次提交都需要和Metastore/MySQL交互,可以认为是强依赖于Metastore,如果Metastore有异常,基于HiveCatalog的Iceberg表的写入和查询会有问题。相反,HadoopCatalog并不依赖于Metastore,即使Metastore有异常,也不影响Iceberg表的写入和查询。

    考虑到我们目前主要还是依赖HDFS,同时不想强依赖于Metastore,所以我们选择HadoopCatalog作为我们系统唯一的Catalog。即使有一天,想要把HDFS上的表迁移到S3上去,也是可以办到的,大家想想,无论是HadoopCatalog还是HiveCatalog,数据文件和元数据文件本身都是相同的,只是标记当前最新的snapshot的入口不一样,那只需要简单的手动变换一下入口就可以实现Catalog的切换,切换到HiveCatalog上之后,就可以摆脱HDFS的依赖,问题并不大。

    3.3. DDL命令

    • 创建数据库

    默认的,iceberg将会在flink中使用default数据库。如果我们不想在default数据库下面创建表,可以使用下面的例子去创建别的数据库。

    CREATE DATABASE iceberg_db;
    USE iceberg_db;
    • 创建表
    CREATE TABLE hive_catalog.default.sample (
        id BIGINT COMMENT 'unique id',
        data STRING
    );

    注:default为数据库

    表创建命令支持最常用的 flink create 子句,包括:

    PARTITION BY (column1, column2, ...) 配置分区,apache flik 还不支持隐藏分区。

    COMMENT 'table document'设置一个表描述。

    WITH ('key'='value', ...)设置将存储在 apache iceberg 表属性中的表配置。

    目前,它不支持计算列、主键和水印定义等。

    • PARTITIONED BY 分区

    要创建分区表,使用 PARTITIONED BY:

    CREATE TABLE hive_catalog.default.sample (
        id BIGINT COMMENT 'unique id',
        data STRING
    ) PARTITIONED BY (data);

    Apache Iceberg支持隐藏分区但apache flink不支持在列上按照函数分区,因此我们现在没有途径在flink DDL上支持隐藏分区,我们在未来将会改善flink DDL。

    • CREATE TABLE LIKE

    为了创建和另一张表具有相同结构、分区和表属性的一张表,使用CREATE TAABLE LIKE。

    CREATE TABLE hive_catalog.default.sample (
        id BIGINT COMMENT 'unique id',
        data STRING
    );
    
    CREATE TABLE  hive_catalog.default.sample_like LIKE hive_catalog.default.sample;

    为了更详细,可以查看Flink CREATE TABLE documentation

    • ALTER TABLE 更改表

    Iceberg 现在只支持在 flink 1.11中修改表属性。

    ALTER TABLE hive_catalog.default.sample SET ('write.format.default'='avro')
    • ALTER TABLE .. RENAME TO
    ALTER TABLE hive_catalog.default.sample RENAME TO hive_catalog.default.new_sample;
    • DROP TABLE 删除表
    DROP TABLE hive_catalog.default.sample;

    3.4. sql读写

    3.4.1.  Sql查询

    Iceberg现在支持flink流式读和批量读。我们可以执行下面sql命令去把执行类型流式模式切换为批处理模式,如下:

    -- Execute the flink job in streaming mode for current session context
    SET execution.type = streaming
    
    -- Execute the flink job in batch mode for current session context
    SET execution.type = batch

    3.4.1.1. Flink批量读

    如果在提交flink批处理作业时想要检查iceberg表中所有的记录,你可以执行下面的句子:

    -- Execute the flink job in streaming mode for current session context
    SET execution.type = batch ;
    SELECT * FROM sample;

    3.4.1.2. Flink流式读

    Iceberg支持处理flink流式作业中的增量数据,该数据从历史快照ID开始:

    -- Submit the flink job in streaming mode for current session.
    SET execution.type = streaming ;
    
    -- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
    SET table.dynamic-table-options.enabled=true;
    
    -- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
    SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
    
    -- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
    SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;

    这些是可以在flink SQL提示选项中为流作业设置的选项:

    • monitor-interval:连续监视新提交的数据文件的时间间隔(默认值:1s)
    • start-snapshot-id:流式作业开始的快照id

    3.4.2. Sql写入

    现在Iceberg支持在flink1.11中使用insert into和insert overwrite。

    • INSERT INTO

    flink 流作业将新数据追加到表中,使用 INSERT INTO:

    INSERT INTO hive_catalog.default.sample VALUES (1, 'a');
    INSERT INTO hive_catalog.default.sample SELECT id, data from other_kafka_table;
    • INSERT OVERWRITE

    要使用查询结果替换表中的数据,请在批作业中使用 INSERT OVERWRITE (flink 流作业不支持 INSERT OVERWRITE)。覆盖是 Iceberg 表的原子操作。

    具有由 SELECT 查询生成的行的分区将被替换,例如:

    INSERT OVERWRITE sample VALUES (1, 'a');

    Iceberg 还支持通过 select 值覆盖给定的分区:

    INSERT OVERWRITE hive_catalog.default.sample PARTITION(data='a') SELECT 6;

    对于分区的Iceberg表,当在PARTITION子句中为所有分区设置值时,它将插入到静态分区中;否则,如果在PARTITON子句中将部分分区列(所有分区列的前缀部分)设置为值,则将查询结果写入动态分区。对于未分区的Iceberg表,其数据将被INSERT OVERWRITE完全覆盖。

    3.5. DataStream读写数据(Java API)

    3.5.1.  DataStream读数据

    Iceberg现在支持使用Java API流式或者批量读取。

    3.5.1.1. 批量读

    这个例子从Iceberg表读取所有记录,然后在flink批处理作业中打印到stdout控制台。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
    DataStream<RowData> batch = FlinkSource.forRowData()
         .env(env)
         .tableLoader(loader)
         .streaming(false)
         .build();
    
    // Print all records to stdout.
    batch.print();
    
    // Submit and execute this batch read job.
    env.execute("Test Iceberg Batch Read");

    3.5.1.2. 流式读

    这个例子将会读取从快照id‘3821550127947089987’开始的增量记录,然后在flink流式作业中打印到stdout控制台中。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
    DataStream<RowData> stream = FlinkSource.forRowData()
         .env(env)
         .tableLoader(loader)
         .streaming(true)
         .startSnapshotId(3821550127947089987)
         .build();
    
    // Print all records to stdout.
    stream.print();
    
    // Submit and execute this streaming read job.
    env.execute("Test Iceberg streaming Read");

    还有其他选项可以通过Java Api设置,详情请看FlinkSource#Builder.

    3.5.2. DataStream写数据

    Iceberg 支持从不同的 DataStream 输入写入 Iceberg 表。

    • Appending data 追加数据

    我们支持在本地编写 DataStream < rowdata > 和 DataStream < Row> 到 sink iceberg 表。

    StreamExecutionEnvironment env = ...;
    DataStream<RowData> input = ... ;
    Configuration hadoopConf = new Configuration();
    TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
    FlinkSink.forRowData(input)
        .tableLoader(tableLoader)
        .hadoopConf(hadoopConf)
        .build();
    env.execute("Test Iceberg DataStream");
    • Overwrite data 重写数据

    为了动态覆盖现有 Iceberg 表中的数据,我们可以在FlinkSink构建器中设置overwrite标志。

    StreamExecutionEnvironment env = ...;
    DataStream<RowData> input = ... ;
    Configuration hadoopConf = new Configuration();
    TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
    FlinkSink.forRowData(input)
        .tableLoader(tableLoader)
        .overwrite(true)
        .hadoopConf(hadoopConf)
        .build();
    env.execute("Test Iceberg DataStream");

    3.6. 检查表

    现在Iceberg不支持在flink Sql中检查表,我们需要使用 iceberg’s Java API 去读取Iceberg来得到这些表信息。

    3.7. 重写文件操作

    Iceberg可以通过提交flink批作业去提供API重写小文件变为大文件。flink操作表现与spark的rewriteDataFiles.一样。

    import org.apache.iceberg.flink.actions.Actions;
    
    TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
    Table table = tableLoader.loadTable();
    RewriteDataFilesActionResult result = Actions.forTable(table)
            .rewriteDataFiles()
            .execute();

    更多的重写文件操作选项文档,请看RewriteDataFilesAction

    3.8. 将来提升

    当前flink iceberg整合工作还有下面的特性不支持:

    • 不支持创建带有隐藏分区的Iceberg表;
    • 不支持创建带有计算列的Iceberg表;
    • 不支持创建带有水印的Iceberg表;
    • 不支持添加列,删除列,重命名列,修改列;

    4. Iceberg实例

    4.1. 使用编程SQL方式读写Iceberg表

    4.1.1.  添加依赖

    <dependency>
                <groupId>org.apache.iceberg</groupId>
                <artifactId>iceberg-flink-runtime</artifactId>
                <version>0.10.0</version>
    </dependency>

    4.1.2.  部分代码实现

    // 使用table api 创建 hadoop catalog
     TableResult tableResult = tenv.executeSql("CREATE CATALOG hadoop_catalog WITH (
    " +
                    "  'type'='iceberg',
    " +
                    "  'catalog-type'='hadoop',
    " +
                    "  'warehouse'='hdfs://nameservice1/tmp',
    " +
                    "  'property-version'='1'
    " +
                    ")");
     
            // 使用catalog
            tenv.useCatalog("hadoop_catalog");
            // 创建库
            tenv.executeSql("CREATE DATABASE if not exists iceberg_hadoop_db");
            tenv.useDatabase("iceberg_hadoop_db");
     
         
            // 创建iceberg 结果表
            tenv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_001");
            tenv.executeSql("CREATE TABLE  hadoop_catalog.iceberg_hadoop_db.iceberg_001 (
    " +
                    "    id BIGINT COMMENT 'unique id',
    " +
                    "    data STRING
    " +
                    ")");
     
            // 测试写入
            tenv.executeSql("insert into hadoop_catalog.iceberg_hadoop_db.iceberg_001 select 100,'abc'");

    4.1.3. 创建hive的外部表来实时查询iceberg表

    hive> add jar /tmp/iceberg-hive-runtime-0.10.0.jar;
     
    hive> CREATE EXTERNAL TABLE tmp.iceberg_001(id bigint,data string)
    STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
    LOCATION '/tmp/iceberg_hadoop_db/iceberg_001';
     
    hive> select * from tmp.iceberg_001;
    OK
    100        abc
    1001    abcd
    Time taken: 0.535 seconds, Fetched: 2 row(s)

    4.2. Flink结合Kafka实时写入Iceberg实践笔记

    4.2.1. 创建Hadoop Catalog的Iceberg 表

    // create hadoop catalog
            tenv.executeSql("CREATE CATALOG hadoop_catalog WITH (
    " +
                    "  'type'='iceberg',
    " +
                    "  'catalog-type'='hadoop',
    " +
                    "  'warehouse'='hdfs://nameservice1/tmp',
    " +
                    "  'property-version'='1'
    " +
                    ")");
     
            // change catalog
            tenv.useCatalog("hadoop_catalog");
            tenv.executeSql("CREATE DATABASE if not exists iceberg_hadoop_db");
            tenv.useDatabase("iceberg_hadoop_db");
            // create iceberg result table
            tenv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_002"); 
            tenv.executeSql("CREATE TABLE  hadoop_catalog.iceberg_hadoop_db.iceberg_002 (
    " +
                    "    user_id STRING COMMENT 'user_id',
    " +
                    "    order_amount DOUBLE COMMENT 'order_amount',
    " +
                    "    log_ts STRING
    " +
                    ")");

    4.2.2. 使用Hive Catalog创建Kafka流表

      String HIVE_CATALOG = "myhive";
            String DEFAULT_DATABASE = "tmp";
            String HIVE_CONF_DIR = "/xx/resources";
            Catalog catalog = new HiveCatalog(HIVE_CATALOG, DEFAULT_DATABASE, HIVE_CONF_DIR);
            tenv.registerCatalog(HIVE_CATALOG, catalog);
            tenv.useCatalog("myhive");
            // create kafka stream table
            tenv.executeSql("DROP TABLE IF EXISTS ods_k_2_iceberg");
            tenv.executeSql(
                    "CREATE TABLE ods_k_2_iceberg (
    " +
                            " user_id STRING,
    " +
                            " order_amount DOUBLE,
    " +
                            " log_ts TIMESTAMP(3),
    " +
                            " WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
    " +
                            ") WITH (
    " +
                            "  'connector'='kafka',
    " +
                            "  'topic'='t_kafka_03',
    " +
                            "  'scan.startup.mode'='latest-offset',
    " +
                            "  'properties.bootstrap.servers'='xx:9092',
    " +
                            "  'properties.group.id' = 'testGroup_01',
    " +
                            "  'format'='json'
    " +
                            ")");

    4.2.3. 使用SQL连接kafka流表和iceberg 目标表

     System.out.println("---> 3. insert into iceberg  table from kafka stream table .... ");
            tenv.executeSql(
                    "INSERT INTO  hadoop_catalog.iceberg_hadoop_db.iceberg_002 " +
                            " SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd') FROM myhive.tmp.ods_k_2_iceberg");

    4.2.4. 数据验证

    bin/kafka-console-producer.sh --broker-list xx:9092 --topic t_kafka_03
    {"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
    {"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}
    {"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}
    {"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
    {"user_id":"a1111","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}
    {"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:12:12"}
     
    hive> add jar /home/zmbigdata/iceberg-hive-runtime-0.10.0.jar;
    hive> CREATE EXTERNAL TABLE tmp.iceberg_002(user_id STRING,order_amount DOUBLE,log_ts STRING)
    STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
    LOCATION '/tmp/iceberg_hadoop_db/iceberg_002';
    hive> select * from tmp.iceberg_002  limit 5;
    a1111    11.0    2020-06-29
    a1111    11.0    2020-06-29
    a1111    11.0    2020-06-29
    a1111    11.0    2020-06-29
    a1111    13.0    2020-06-29
    Time taken: 0.108 seconds, Fetched: 5 row(s)

    参考资料:

    https://blog.csdn.net/u010834071/article/details/112507474  Flink结合Iceberg的一种实现方式笔记

    https://zhengqiang.blog.csdn.net/article/details/112850376  Flink结合Kafka实时写入Iceberg实践笔记

  • 相关阅读:
    正态分布与中心极限定理
    超几何分布与二项分布及其期望
    cf492E. Vanya and Field(扩展欧几里得)
    ZR#317.【18 提高 2】A(计算几何 二分)
    小米OJ刷题日志
    cf519D. A and B and Interesting Substrings(前缀和)
    cf519C. A and B and Team Training(找规律)
    BZOJ2118: 墨墨的等式(最短路 数论)
    Service生命周期图
    python2.7中使用mysql (windows XP)
  • 原文地址:https://www.cnblogs.com/swordfall/p/14548574.html
Copyright © 2011-2022 走看看