zoukankan      html  css  js  c++  java
  • Flink基础(二十五):FLINK-SQL语法 (一)DQL(一)查询语句(一)基本查询

    来源:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html

    0 简介

    SELECT 语句和 VALUES 语句需要使用 TableEnvironment 的 sqlQuery() 方法加以指定。这个方法会以 Table 的形式返回 SELECT (或 VALUE)的查询结果。Table 可以被用于 随后的SQL 与 Table API 查询 、 转换为 DataSet 或 DataStream 或 输出到 TableSink 。SQL 与 Table API 的查询可以进行无缝融合、整体优化并翻译为单一的程序。

    为了可以在 SQL 查询中访问到表,你需要先 在 TableEnvironment 中注册表 。表可以通过 TableSource、 TableCREATE TABLE 语句、 DataStream 或 DataSet 注册。 用户也可以通过 向 TableEnvironment 中注册 catalog 的方式指定数据源的位置。

    为方便起见 Table.toString() 将会在其 TableEnvironment 中自动使用一个唯一的名字注册表并返回表名。 因此, Table 对象可以如下文所示样例,直接内联到 SQL 语句中。

    注意: 查询若包括了不支持的 SQL 特性,将会抛出 TableException。批处理和流处理所支持的 SQL 特性将会在下述章节中列出。

    1 指定查询

    以下示例显示如何在已注册和内联表上指定 SQL 查询。

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)
    
    //  从外部数据源读取 DataStream 
    val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
    
    // 使用 SQL 查询内联的(未注册的)表
    val table = ds.toTable(tableEnv, $"user", $"product", $"amount")
    val result = tableEnv.sqlQuery(
      s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
    
    // 使用名称 "Orders" 注册一个 DataStream 
    tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount")
    // 在表上执行 SQL 查询并得到以新表返回的结果
    val result2 = tableEnv.sqlQuery(
      "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
    
    // 创建并注册一个 TableSink
    val schema = new Schema()
        .field("product", DataTypes.STRING())
        .field("amount", DataTypes.INT())
    
    tableEnv.connect(new FileSystem().path("/path/to/file"))
        .withFormat(...)
        .withSchema(schema)
        .createTemporaryTable("RubberOrders")
    
    // 在表上执行插入操作,并把结果发出到 TableSink
    tableEnv.executeSql(
      "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

    2 执行查询

    SELECT 语句或者 VALUES 语句可以通过 TableEnvironment.executeSql() 方法来执行,将选择的结果收集到本地。该方法返回 TableResult 对象用于包装查询的结果。和 SELECT 语句很像,一个 Table 对象可以通过 Table.execute() 方法执行从而将 Table 的内容收集到本地客户端。 TableResult.collect() 方法返回一个可以关闭的行迭代器。除非所有的数据都被收集到本地,否则一个查询作业永远不会结束。所以我们应该通过 CloseableIterator#close() 方法主动地关闭作业以防止资源泄露。 我们还可以通过 TableResult.print() 方法将查询结果打印到本地控制台。TableResult 中的结果数据只能被访问一次,因此一个 TableResult 实例中,collect() 方法和 print() 方法不能被同时使用。

    对于流模式,TableResult.collect() 方法或者 TableResult.print 方法保证端到端精确一次的数据交付。这就要求开启 checkpointing。默认情况下 checkpointing 是禁止的,我们可以通过 TableConfig 设置 checkpointing 相关属性(请参考 checkpointing 配置)来开启 checkpointing。 因此一条结果数据只有在其对应的 checkpointing 完成后才能在客户端被访问。

    注意: 对于流模式,当前仅支持追加模式的查询语句,并且应该开启 checkpoint。因为一条结果只有在其对应的 checkpoint 完成之后才能被客户端访问到。

    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    val tableEnv = StreamTableEnvironment.create(env, settings)
    // enable checkpointing
    tableEnv.getConfig.getConfiguration.set(
      ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
    tableEnv.getConfig.getConfiguration.set(
      ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))
    
    tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
    
    // execute SELECT statement
    val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders")
    val it = tableResult1.collect()
    try while (it.hasNext) {
      val row = it.next
      // handle row
    }
    finally it.close() // close the iterator to avoid resource leak
    
    // execute Table
    val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute()
    tableResult2.print()

    3 语法

    Flink 通过支持标准 ANSI SQL的 Apache Calcite 解析 SQL。

    以下 BNF-语法 描述了批处理和流处理查询中所支持的 SQL 特性的超集。其中 操作符 章节展示了所支持的特性的样例,并指明了哪些特性仅适用于批处理或流处理。

    query:
      values
      | {
          select
          | selectWithoutFrom
          | query UNION [ ALL ] query
          | query EXCEPT query
          | query INTERSECT query
        }
        [ ORDER BY orderItem [, orderItem ]* ]
        [ LIMIT { count | ALL } ]
        [ OFFSET start { ROW | ROWS } ]
        [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
    
    orderItem:
      expression [ ASC | DESC ]
    
    select:
      SELECT [ ALL | DISTINCT ]
      { * | projectItem [, projectItem ]* }
      FROM tableExpression
      [ WHERE booleanExpression ]
      [ GROUP BY { groupItem [, groupItem ]* } ]
      [ HAVING booleanExpression ]
      [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
    
    selectWithoutFrom:
      SELECT [ ALL | DISTINCT ]
      { * | projectItem [, projectItem ]* }
    
    projectItem:
      expression [ [ AS ] columnAlias ]
      | tableAlias . *
    
    tableExpression:
      tableReference [, tableReference ]*
      | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
    
    joinCondition:
      ON booleanExpression
      | USING '(' column [, column ]* ')'
    
    tableReference:
      tablePrimary
      [ matchRecognize ]
      [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
    
    tablePrimary:
      [ TABLE ] [ [ catalogName . ] schemaName . ] tableName [ dynamicTableOptions ]
      | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
      | UNNEST '(' expression ')'
    
    dynamicTableOptions:
      /*+ OPTIONS(key=val [, key=val]*) */
    
    key:
      stringLiteral
    
    val:
      stringLiteral
    
    values:
      VALUES expression [, expression ]*
    
    groupItem:
      expression
      | '(' ')'
      | '(' expression [, expression ]* ')'
      | CUBE '(' expression [, expression ]* ')'
      | ROLLUP '(' expression [, expression ]* ')'
      | GROUPING SETS '(' groupItem [, groupItem ]* ')'
    
    windowRef:
        windowName
      | windowSpec
    
    windowSpec:
        [ windowName ]
        '('
        [ ORDER BY orderItem [, orderItem ]* ]
        [ PARTITION BY expression [, expression ]* ]
        [
            RANGE numericOrIntervalExpression {PRECEDING}
          | ROWS numericExpression {PRECEDING}
        ]
        ')'
    
    matchRecognize:
          MATCH_RECOGNIZE '('
          [ PARTITION BY expression [, expression ]* ]
          [ ORDER BY orderItem [, orderItem ]* ]
          [ MEASURES measureColumn [, measureColumn ]* ]
          [ ONE ROW PER MATCH ]
          [ AFTER MATCH
                ( SKIP TO NEXT ROW
                | SKIP PAST LAST ROW
                | SKIP TO FIRST variable
                | SKIP TO LAST variable
                | SKIP TO variable )
          ]
          PATTERN '(' pattern ')'
          [ WITHIN intervalLiteral ]
          DEFINE variable AS condition [, variable AS condition ]*
          ')'
    
    measureColumn:
          expression AS alias
    
    pattern:
          patternTerm [ '|' patternTerm ]*
    
    patternTerm:
          patternFactor [ patternFactor ]*
    
    patternFactor:
          variable [ patternQuantifier ]
    
    patternQuantifier:
          '*'
      |   '*?'
      |   '+'
      |   '+?'
      |   '?'
      |   '??'
      |   '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
      |   '{' repeat '}'

    Flink SQL 对于标识符(表、属性、函数名)有类似于 Java 的词法约定:

    • 不管是否引用标识符,都保留标识符的大小写。
    • 且标识符需区分大小写。
    • 与 Java 不一样的地方在于,通过反引号,可以允许标识符带有非字母的字符(如:"SELECT a AS `my field` FROM t")。

    字符串文本常量需要被单引号包起来(如 SELECT 'Hello World' )。两个单引号表示转移(如 SELECT 'It''s me.')。字符串文本常量支持 Unicode 字符,如需明确使用 Unicode 编码,请使用以下语法:

    • 使用反斜杠()作为转义字符(默认):SELECT U&'263A'
    • 使用自定义的转义字符: SELECT U&'#263A' UESCAPE '#'

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/14053065.html

  • 相关阅读:
    静态方法中访问类的实例成员
    静态初始化块
    Java字段初始化的规律
    java中函数重载
    哈姆雷特观后感 一把辛酸泪
    枚举
    验证码
    四则运算
    JAVA输出中+号的作用以及如何使用
    dev控件ASPxComboBox设置ReadOnly="true"后
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14053065.html
Copyright © 2011-2022 走看看