zoukankan      html  css  js  c++  java
  • Flink基础(二十一):Table API 和 Flink SQL(六)Flink和Hive集成

    Apache Hive 已经成为了数据仓库生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。

    Flink 与 Hive 的集成包含两个层面。

    一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。

    二是利用 Flink 来读写 Hive 的表。

    HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

    Maven依赖

    主要包含三部分的依赖:flink和hive的连接器,hive的依赖和hadoop的依赖。

    <!-- Flink Dependency -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-hive_2.11</artifactId>
      <version>1.11.0</version>
      <scope>provided</scope>
    </dependency>
    
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_2.11</artifactId>
      <version>1.11.0</version>
      <scope>provided</scope>
    </dependency>
    
    <!-- Hive Dependency -->
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>${hive.version}</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>${hadoop.version}</version>
        <!--            <scope>provided</scope>-->
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>${hadoop.version}</version>
        <!--<scope>provided</scope>-->
    </dependency>
    
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    View Code

    示例程序

    先在hive中新建数据库和表

    create database mydb;
    use mydb;
    create table if not exists t_user(id string, name string);
    insert into table t_user values ('1','huangbo'), ('2','xuzheng'),('3','wangbaoqiang');

    然后编写程序,将数据流写入到hive中

    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api._
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.catalog.hive.HiveCatalog
    
    object TestHiveStreaming {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val tableEnv = StreamTableEnvironment.create(env)
    
        val stream = env
          .fromElements(
            ("10", "haha"),
            ("11", "hehe")
          )
    
        val name            = "myhive"
        val defaultDatabase = "mydb"
        val hiveConfDir     = "/Users/yuanzuo/Downloads/apache-hive-3.1.2-bin/conf" // a local path
        val version         = "3.1.2"
    
        val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
        tableEnv.registerCatalog("myhive", hive)
    
        // set the HiveCatalog as the current catalog of the session
        tableEnv.useCatalog("myhive")
        tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
        tableEnv.useDatabase("mydb")
    
        tableEnv.createTemporaryView("users", stream, 'id, 'name)
    
        tableEnv.executeSql("insert into t_user select id, name from users")
        tableEnv.executeSql("select * from t_user")
      }
    }

    一个复杂一点的程序

    import java.sql.Timestamp
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api._
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.catalog.hive.HiveCatalog
    
    object TestHiveStreaming {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val tableEnv = StreamTableEnvironment.create(env)
    
        val stream = env.fromElements(
          ("1", 1000, new Timestamp(1000L)),
          ("2", 2000, new Timestamp(2000L)),
          ("3", 3000, new Timestamp(3000L))
        )
    
        val name            = "myhive"
        val defaultDatabase = "mydb"
        val hiveConfDir     = "/Users/yuanzuo/Downloads/apache-hive-3.1.2-bin/conf" // a local path
        val version         = "3.1.2"
    
        val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
        tableEnv.registerCatalog("myhive", hive)
    
        // set the HiveCatalog as the current catalog of the session
        tableEnv.useCatalog("myhive")
        tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
        tableEnv.useDatabase("mydb")
    
        tableEnv.createTemporaryView("users", stream, 'userId, 'amount, 'ts)
    
        val hiveSql = "CREATE external TABLE fs_table (
    " +
                         "  user_id STRING,
    " +
                         "  order_amount DOUBLE" +
                         ") partitioned by (dt string,h string,m string) " +
                         "stored as ORC " +
                         "TBLPROPERTIES (
    " +
                         "  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',
    " +
                         "  'sink.partition-commit.delay'='0s',
    " +
                         "  'sink.partition-commit.trigger'='partition-time',
    " +
                         "  'sink.partition-commit.policy.kind'='metastore'" +
                         ")"
    
        tableEnv.executeSql(hiveSql)
    
        val insertSql = "insert into fs_table SELECT userId, amount, " +
          " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users"
        tableEnv.executeSql(insertSql)
      }
    }

    彻底重置hadoop和hive的方法

    stop-all.sh
    hadoop namenode -format
    # 在mysql中删除hive的元数据库
    start-all.sh
    hadoop fs -mkdir /tmp
    hadoop fs -mkdir -p /user/hive/warehouse
    hadoop fs -chmod g+w /tmp
    hadoop fs -chmod g+w /user/hive/warehouse
    schematool -dbType mysql -initSchema
    hive --service metastore
    hive

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13764972.html

  • 相关阅读:
    input 只能输入数字
    “学生宿舍管理系统”主要内容及特点
    web_03Java ee实现定时跳转,使用C3P0,DBUtils类重构数据库操作
    DBUtils工具类的使用
    C3P0连接池
    java ee 中 Jsp 页面的定时的跳转(数字倒数)
    JSP中实现网页访问统计的方法【转】
    Java web验证码
    web_02Java ee实现验证码,网站访问次数功能
    web_01Java ee实现登陆注册功能
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13764972.html
Copyright © 2011-2022 走看看