zoukankan      html  css  js  c++  java
  • Flink基础(二十一):Table API 和 Flink SQL(六)Flink和Hive集成

    Apache Hive 已经成为了数据仓库生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。

    Flink 与 Hive 的集成包含两个层面。

    一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。

    二是利用 Flink 来读写 Hive 的表。

    HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

    Maven依赖

    主要包含三部分的依赖:flink和hive的连接器,hive的依赖和hadoop的依赖。

    <!-- Flink Dependency -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-hive_2.11</artifactId>
      <version>1.11.0</version>
      <scope>provided</scope>
    </dependency>
    
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_2.11</artifactId>
      <version>1.11.0</version>
      <scope>provided</scope>
    </dependency>
    
    <!-- Hive Dependency -->
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>${hive.version}</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>${hadoop.version}</version>
        <!--            <scope>provided</scope>-->
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>${hadoop.version}</version>
        <!--<scope>provided</scope>-->
    </dependency>
    
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    View Code

    示例程序

    先在hive中新建数据库和表

    create database mydb;
    use mydb;
    create table if not exists t_user(id string, name string);
    insert into table t_user values ('1','huangbo'), ('2','xuzheng'),('3','wangbaoqiang');

    然后编写程序,将数据流写入到hive中

    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api._
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.catalog.hive.HiveCatalog
    
    object TestHiveStreaming {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val tableEnv = StreamTableEnvironment.create(env)
    
        val stream = env
          .fromElements(
            ("10", "haha"),
            ("11", "hehe")
          )
    
        val name            = "myhive"
        val defaultDatabase = "mydb"
        val hiveConfDir     = "/Users/yuanzuo/Downloads/apache-hive-3.1.2-bin/conf" // a local path
        val version         = "3.1.2"
    
        val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
        tableEnv.registerCatalog("myhive", hive)
    
        // set the HiveCatalog as the current catalog of the session
        tableEnv.useCatalog("myhive")
        tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
        tableEnv.useDatabase("mydb")
    
        tableEnv.createTemporaryView("users", stream, 'id, 'name)
    
        tableEnv.executeSql("insert into t_user select id, name from users")
        tableEnv.executeSql("select * from t_user")
      }
    }

    一个复杂一点的程序

    import java.sql.Timestamp
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api._
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.catalog.hive.HiveCatalog
    
    object TestHiveStreaming {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val tableEnv = StreamTableEnvironment.create(env)
    
        val stream = env.fromElements(
          ("1", 1000, new Timestamp(1000L)),
          ("2", 2000, new Timestamp(2000L)),
          ("3", 3000, new Timestamp(3000L))
        )
    
        val name            = "myhive"
        val defaultDatabase = "mydb"
        val hiveConfDir     = "/Users/yuanzuo/Downloads/apache-hive-3.1.2-bin/conf" // a local path
        val version         = "3.1.2"
    
        val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
        tableEnv.registerCatalog("myhive", hive)
    
        // set the HiveCatalog as the current catalog of the session
        tableEnv.useCatalog("myhive")
        tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
        tableEnv.useDatabase("mydb")
    
        tableEnv.createTemporaryView("users", stream, 'userId, 'amount, 'ts)
    
        val hiveSql = "CREATE external TABLE fs_table (
    " +
                         "  user_id STRING,
    " +
                         "  order_amount DOUBLE" +
                         ") partitioned by (dt string,h string,m string) " +
                         "stored as ORC " +
                         "TBLPROPERTIES (
    " +
                         "  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',
    " +
                         "  'sink.partition-commit.delay'='0s',
    " +
                         "  'sink.partition-commit.trigger'='partition-time',
    " +
                         "  'sink.partition-commit.policy.kind'='metastore'" +
                         ")"
    
        tableEnv.executeSql(hiveSql)
    
        val insertSql = "insert into fs_table SELECT userId, amount, " +
          " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users"
        tableEnv.executeSql(insertSql)
      }
    }

    彻底重置hadoop和hive的方法

    stop-all.sh
    hadoop namenode -format
    # 在mysql中删除hive的元数据库
    start-all.sh
    hadoop fs -mkdir /tmp
    hadoop fs -mkdir -p /user/hive/warehouse
    hadoop fs -chmod g+w /tmp
    hadoop fs -chmod g+w /user/hive/warehouse
    schematool -dbType mysql -initSchema
    hive --service metastore
    hive

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13764972.html

  • 相关阅读:
    UVA748 Exponentiation 解题报告
    uva465 解题报告
    UVA 10494 If We Were a Child Again 解题报告
    【今日所得】1.29。。。
    题型总结之高精度
    cf 163 div2 c题 YY 思路
    寒假计划
    Facebook Hacker Cup 2013 Qualification Round 解题报告
    戴戒指的含义(以后要结婚的必看)
    男人心疼女人的13种方式
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13764972.html
Copyright © 2011-2022 走看看