zoukankan      html  css  js  c++  java
  • flink-SQL

    Table API和SQL捆绑在flink-table Maven工件中。必须将以下依赖项添加到你的项目才能使用Table API和SQL:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table_2.11</artifactId>
      <version>1.5.0</version>
    </dependency>

    另外,你需要为Flink的Scala批处理或流式API添加依赖项。对于批量查询,您需要添加:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.11</artifactId>
      <version>1.5.0</version>
    </dependency>

    Table API和SQL程序的结构

    Flink的批处理和流处理的Table API和SQL程序遵循相同的模式;

    所以我们只需要使用一种来演示即可

    要想执行flink的SQL语句,首先需要获取SQL的执行环境:

    两种方式(batch和streaming):

    // ***************
    // STREAMING QUERY
    // ***************
    val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
    // create a TableEnvironment for streaming queries
    val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
    
    // ***********
    // BATCH QUERY
    // ***********
    val bEnv = ExecutionEnvironment.getExecutionEnvironment
    // create a TableEnvironment for batch queries
    val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

    通过getTableEnvironment可以获取TableEnviromment;这个TableEnviromment是Table API和SQL集成的核心概念。它负责:

    - 在内部目录中注册一个表
    - 注册外部目录
    - 执行SQL查询
    - 注册用户定义的(标量,表格或聚合)函数
    - 转换DataStream或DataSet成Table
    - 持有一个ExecutionEnvironment或一个参考StreamExecutionEnvironment

    在内部目录中注册一个表

    TableEnvironment维护一个按名称注册的表的目录。有两种类型的表格,输入表格输出表格

    输入表可以在Table API和SQL查询中引用并提供输入数据。输出表可用于将表API或SQL查询的结果发送到外部系统

    输入表可以从各种来源注册:

    - 现有`Table`对象,通常是表API或SQL查询的结果。
    - `TableSource`,它访问外部数据,例如文件,数据库或消息传递系统。
    - `DataStream`或`DataSet`来自DataStream或DataSet程序。

    输出表可以使用注册TableSink

    注册一个表

    // get a TableEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    
    // register the Table projTable as table "projectedX"
    tableEnv.registerTable("projectedTable", projTable)
    
    // Table is the result of a simple projection query 
    val projTable: Table = tableEnv.scan("projectedTable ").select(...)

    注册一个tableSource

    TableSource提供对存储在诸如数据库(MySQL,HBase等),具有特定编码(CSV,Apache [Parquet,Avro,ORC],...)的文件的存储系统中的外部数据的访问或者消息传送系统(Apache Kafka,RabbitMQ,...)

    // get a TableEnvironment 
    val tableEnv = TableEnvironment.getTableEnvironment(env) 
    // create a TableSource
     val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
     // register the TableSource as table "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource)

    注册一个tableSink

    注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [Parquet ,Avro,ORC],...)

    // get a TableEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    
    // create a TableSink
    val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
    
    // define the field names and types
    val fieldNames: Array[String] = Array("a", "b", "c")
    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
    
    // register the TableSink as table "CsvSinkTable"
    tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

    例子:

     //创建batch执行环境
        val env = ExecutionEnvironment.getExecutionEnvironment
        //创建table环境用于batch查询
        val tableEnvironment = TableEnvironment.getTableEnvironment(env)
        //加载外部数据
        val csvTableSource = CsvTableSource.builder()
          .path("data1.csv")//文件路径
          .field("id" , Types.INT)//第一列数据
          .field("name" , Types.STRING)//第二列数据
          .field("age" , Types.INT)//第三列数据
          .fieldDelimiter(",")//列分隔符,默认是","
          .lineDelimiter("
    ")//换行符
          .ignoreFirstLine()//忽略第一行
          .ignoreParseErrors()//忽略解析错误
          .build()
        //将外部数据构建成表
        tableEnvironment.registerTableSource("tableA" , csvTableSource)
        //TODO 1:使用table方式查询数据
        val table = tableEnvironment.scan("tableA").select("id , name , age").filter("name == 'lisi'")
        //将数据写出去
        table.writeToSink(new CsvTableSink("bbb" , "," , 1 , FileSystem.WriteMode.OVERWRITE))
        //TODO 2:使用sql方式
        //    val sqlResult = tableEnvironment.sqlQuery("select id,name,age from tableA where id > 0 order by id limit 2")
    ////    //将数据写出去
    //    sqlResult.writeToSink(new CsvTableSink("aaaaaa.csv", ",", 1, FileSystem.WriteMode.OVERWRITE))
        env.execute()
  • 相关阅读:
    ORACLE SQL性能优化系列 (十一)
    ORACLE SQL性能优化系列 (七)
    ORACLE SQL性能优化系列 (十三)
    Oracle绑定变量
    ORACLE SQL性能优化系列 (九)
    C#中&与&&的区别
    简单代码生成器原理剖析
    C#线程系列讲座(1):BeginInvoke和EndInvoke方法
    ClearCanvas DICOM 开发系列 一
    C# winform 获取当前路径
  • 原文地址:https://www.cnblogs.com/niutao/p/10548674.html
Copyright © 2011-2022 走看看