zoukankan      html  css  js  c++  java
  • Flink实战(110):FLINK-SQL应用场景(11)connector(十九)Flink 与 hive 结合使用(七) Flink Hive Connector 使用

    来源:https://www.yuque.com/docs/share/14a7a0e8-37d1-4142-8962-48dcf3761f7e?#

    Flink 1.12 版本

    1. Hive 建表

    //1、创建 Hive 数据库
    create database zhisheng;
    
    //2、查看创建的数据库
    show databases;
    
    //3、使用创建的数据库
    use zhisheng;
    
    //4、在该库下创建 Hive 表
    CREATE TABLE IF NOT EXISTS flink ( 
      appid int, 
      message String
    ) ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '	'
    LINES TERMINATED BY '
    '
    STORED AS TEXTFILE;
    
    //5、往该表插入一条数据
    insert into flink values(11111, '233sadadadwqqdq');

    2.Flink 读取 Hive 已经存在的表数据

    //1、创建 Hive CATALOG,Flink 通过 catalog 不仅可以将自己的表写入 Hive 的 metastore,也能读写 Hive 的表
    CREATE CATALOG flinkHiveCatalog WITH (
        'type' = 'hive',
        'default-database' = 'zhisheng',
        'hive-conf-dir' = '/app/apache-hive-2.1.1-bin/conf'
    );
    
    //2、使用该 Catalog
    USE CATALOG flinkHiveCatalog;
    
    //3、因为刚才已经写入了一条数据到 Hive 表(flink) 
    select * from flink;

     

    3.Flink 往 Hive 中已经存在的表写数据

    //1、创建 Source 表
    CREATE TABLE yarn_log_datagen_test_hive_sink (
     appid INT,
     message STRING
    ) WITH (
     'connector' = 'datagen',
     'rows-per-second'='10',
     'fields.appid.kind'='random',
     'fields.appid.min'='1',
     'fields.appid.max'='1000',
     'fields.message.length'='100'
    );
    
    //2、将数据写入到 Hive 表
    insert into flink select * from yarn_log_datagen_test_hive_sink;

     

    //再次查询 Hive 表里面的数据
    select * from flink;

     直接在 Hive 利用命令查询:

    4 .完整 Example

    CREATE CATALOG flinkHiveCatalog WITH (
    'type' = 'hive',
    'default-database' = 'zhisheng',
    'hive-conf-dir' = '/app/apache-hive-2.1.1-bin/conf'
    );
    
    USE CATALOG flinkHiveCatalog;
    
    SET table.sql-dialect=hive;   -- 创建 Hive 表要指定 sql-dialect 为 Hive,否则创建的时候识别不了下面的 DDL 语句
    CREATE TABLE yarn_logs (
      appid INT,
      message STRING
    ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
      'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
      'sink.partition-commit.trigger'='partition-time',
      'sink.partition-commit.delay'='1 h',
      'sink.partition-commit.policy.kind'='metastore,success-file',
      'sink.parallelism'='2' -- 该参数内部才支持设置并行度
    );
    
    SET table.sql-dialect=default;  -- 创建 Flink 表又要换回默认的 sql-dialect,Flink 支持在同一个 SQL 里面设置多个 sql-dialect
    CREATE TABLE yarn_log_datagen_test (
      appid INT,
      message STRING,
      log_ts TIMESTAMP(3),
      WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '10',
    'fields.appid.kind' = 'random',
    'fields.appid.min' = '1',
    'fields.appid.max' = '1000',
    'fields.message.length' = '100'
    );
    
    
    -- streaming sql, insert into hive table
    INSERT INTO yarn_logs 
    SELECT appid, message, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
    FROM yarn_log_datagen_test;
    
    -- batch sql, select with partition pruning
    SELECT * FROM yarn_logs WHERE dt='2020-12-16' and hr='12';

    查看 table 的存储路径

    show create table yarn_logs;

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/14180795.html

  • 相关阅读:
    (教程)怎么避免拼多多比价订单行为
    Ubuntu下搭建apache2+GGI环境
    搭建k8s
    我的2021年总结
    工作三年的一些感悟
    xshell6+xftp6免安装破解版
    centos7启动docker容器时提示Error response from daemon: Unknown runtime specified dockerrunc
    解决一个C#中定时任务被阻塞问题
    工程中实际问题解决两例——基于C#
    解决一次gitlab因异常关机导致启动失败
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14180795.html
Copyright © 2011-2022 走看看