zoukankan      html  css  js  c++  java
  • Flink之Table初探

    知识点

    Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么 几步:首先创建执行环境,然后定义 source、transform 和 sink。

    1、依赖:Table API 和 SQL 需要引入的依赖

     <!-- old planner flink table-->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner_2.12</artifactId>
          <version>1.10.1</version>
        </dependency>
        <!--new planner-->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner-blink_2.12</artifactId>
          <version>1.10.1</version>
        </dependency>

    2、代码案例

    package table
    
    import com.yangwj.api.SensorReading
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.Table
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.table.api.scala._
    /**
     * @author yangwj
     * @date 2021/1/12 21:17
     * @version 1.0
     */
    object TableExample {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val inputFile:String = "G:\Java\Flink\guigu\flink\src\main\resources\sensor.txt"
        val input: DataStream[String] = env.readTextFile(inputFile)
    
        val dataStream = input.map(data => {
          val arr: Array[String] = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        })
    
        val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    
        //1、基于流创建表
        val table: Table = tableEnv.fromDataStream(dataStream)
    
        //2、调用table api进行转换
        val result: Table = table.select("id,temperature").filter("id == 'sensor_1'")
        result.toAppendStream[(String, Double)].print("result")
    
        //2、sql实现
        tableEnv.createTemporaryView("tabel",table)
        val sql = "select id, temperature from tabel where id = 'sensor_1'"
    
        val sqlResult: Table = tableEnv.sqlQuery(sql)
        sqlResult.toAppendStream[(String, Double)].print("sqlResult")
        env.execute("table api")
      }
    }
  • 相关阅读:
    Dart语言概览
    Flutter开发环境配置(MAC版)
    Jetpack系列:Paging组件帮你解决分页加载实现的痛苦
    Jetpack系列:应用内导航的正确使用方法
    Jetpack系列:LiveData入门级使用方法
    在Android平台使用SNPE应链接libc++库
    Android binder流程简图
    使用Visual Studio Code进行远程开发
    用clock()函数计时的坑
    OpenCV Mat格式存储YUV图像
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14269738.html
Copyright © 2011-2022 走看看