zoukankan      html  css  js  c++  java
  • FlinkSQL写入hive

    配置1:vim flink-conf.yml
    流式写入hive需要配置检查点
    # state.backend: filesystem
    state.backend: filesystem
    # 取消的时候保存检查点
    execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
    # 60s 一次检查点
    execution.checkpointing.interval: 60s
    # 检查点语意
    execution.checkpointing.mode: EXACTLY_ONCE
    
    
    # Directory for checkpoints filesystem, when using any of the default bundled
    # state backends.
    #
    # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
    state.checkpoints.dir: file:///tmp/flink12-checkpoints
    # Default target directory for savepoints, optional.
    #
    # state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
    state.savepoints.dir: file:///tmp/flink12-savepoints
    
    配置2,使用FlinkSQL-client需要配置
    vim  sql-client-defaults.yaml
    
    catalogs:  #[] # empty list
    # A typical catalog definition looks like:
      - name: uathive
        type: hive
        hive-conf-dir: /etc/hive/conf
        default-database: temp

    写sql作业

    set execution.type=streaming;
    --使用hive方言
    SET table.sql-dialect=hive; 
    --创建一张hive分区表,按天,时分区
    drop table if exists ods_hive_t_area;
    CREATE TABLE ods_hive_t_area (
    `id` int COMMENT '代号',
    `name` string COMMENT '姓名',
    `area_id` int COMMENT '区域',
    `money` int COMMENT '销售额'
    ) PARTITIONED BY (dt STRING,hr string,mi string) STORED AS parquet  TBLPROPERTIES (
      'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
      'sink.partition-commit.trigger'='process-time',
      'sink.partition-commit.delay'='1 min',
      --'sink.partition-commit.policy.kind'='metastore,success-file'
      'sink.partition-commit.policy.kind'='success-file'
    );
    drop table if exists ods_source_2hive_kafka_t_area;
    create table ods_source_2hive_kafka_t_area(
    `before` row(id int,name string,area_id int ,money int),
    `after` row(id int,name string,area_id int ,money int),
    op string
    ) with(
      'connector' = 'kafka',
      'topic' = 'ods_t_area1',
      'properties.bootstrap.servers' = '10.16.74.34:9092',
      'properties.group.id' = 'ods_t_area1_group2hive',
      --value值可为 latest-offset | earliest-offset
      'scan.startup.mode' = 'earliest-offset',
      --此处的key用的format,默认是对josn中value的数据进行定义,此时='value.format', 当json中的数据有类型错误时,该字段会给null值。
      'format' = 'json',
      --如果给true, 则错误格式可以忽略,给null值,如果给false,则会导致读取数据错误,读取中断, 仅限于json数据使用此选项
      'json.ignore-parse-errors'='true'
      );
    INSERT INTO ods_hive_t_area  
    select 
    case when op='d' and after is null then before.id else after.id end ,
    case when op='d' and after is null then null else after.name end ,
    case when op='d' and after is null then null else after.area_id end ,
    case when op='d' and after is null then null else after.money end,
    cast(minute(localtimestamp) as string)  FROM ods_source_2hive_kafka_t_area;

    遇到的问题:

    [hive@m764 lib]$ hadoop fs -ls -R /user/hive/warehouse/temp.db/ods_hive_t_area/
    drwxrwxr-x   - hive hive          0 2021-05-06 17:27 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=26
    -rw-r--r--   1 hive hive          0 2021-05-06 17:27 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=26/_SUCCESS
    -rw-r--r--   1 hive hive       1156 2021-05-06 17:26 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=26/part-f35d61fa-6a8d-4a51-a59f-83c597c6c42c-0-0
    drwxrwxr-x   - hive hive          0 2021-05-06 17:29 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=27
    -rw-r--r--   1 hive hive          0 2021-05-06 17:29 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=27/_SUCCESS
    -rw-r--r--   1 hive hive        541 2021-05-06 17:27 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=27/part-f35d61fa-6a8d-4a51-a59f-83c597c6c42c-0-1

    显示成功写入hive,有_seccess文件,但是select 不到数据

    解决:刷新一下元数据

    msck repair table  ods_hive_t_area;

    然后可以查到hive中的数据了

  • 相关阅读:
    产品小细节中的大体验
    产品经理的四点思考:不该简单满足用户需求
    产品经理的十大顶级错误
    SQL Server数据库大型应用解决方案总结
    java中public static void main(String[] args)中String[] args代表什么意思?
    异常处理1
    java中的String
    华为2013年西安java机试题目:如何过滤掉数组中的非法字符。
    2用java代码实现冒泡排序算法(转载)
    1用java实现冒泡排序算法以及解决的几个小问题。
  • 原文地址:https://www.cnblogs.com/yoyowin/p/14736186.html
Copyright © 2011-2022 走看看