zoukankan      html  css  js  c++  java
  • Flink之Table初探

    知识点

    Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么 几步:首先创建执行环境,然后定义 source、transform 和 sink。

    1、依赖:Table API 和 SQL 需要引入的依赖

     <!-- old planner flink table-->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner_2.12</artifactId>
          <version>1.10.1</version>
        </dependency>
        <!--new planner-->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner-blink_2.12</artifactId>
          <version>1.10.1</version>
        </dependency>

    2、代码案例

    package table
    
    import com.yangwj.api.SensorReading
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.Table
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.table.api.scala._
    /**
     * @author yangwj
     * @date 2021/1/12 21:17
     * @version 1.0
     */
    object TableExample {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val inputFile:String = "G:\Java\Flink\guigu\flink\src\main\resources\sensor.txt"
        val input: DataStream[String] = env.readTextFile(inputFile)
    
        val dataStream = input.map(data => {
          val arr: Array[String] = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        })
    
        val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    
        //1、基于流创建表
        val table: Table = tableEnv.fromDataStream(dataStream)
    
        //2、调用table api进行转换
        val result: Table = table.select("id,temperature").filter("id == 'sensor_1'")
        result.toAppendStream[(String, Double)].print("result")
    
        //2、sql实现
        tableEnv.createTemporaryView("tabel",table)
        val sql = "select id, temperature from tabel where id = 'sensor_1'"
    
        val sqlResult: Table = tableEnv.sqlQuery(sql)
        sqlResult.toAppendStream[(String, Double)].print("sqlResult")
        env.execute("table api")
      }
    }
  • 相关阅读:
    二分、冒泡、选择、插入排序
    15行python代码,帮你理解令牌桶算法
    mybatis 的排序方式用参数传入 但是无法正常排序
    js事件篇
    ajax详解
    kafka概要设计
    HttpClient简述
    双十一问题:在洪峰数据来临的瞬间,redis出现连接超时异常
    双十一问题:kafka消费能力低下原因思考
    Timer类注意事项
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14269738.html
Copyright © 2011-2022 走看看