zoukankan      html  css  js  c++  java
  • Flink之Table初探

    知识点

    Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么 几步:首先创建执行环境,然后定义 source、transform 和 sink。

    1、依赖:Table API 和 SQL 需要引入的依赖

     <!-- old planner flink table-->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner_2.12</artifactId>
          <version>1.10.1</version>
        </dependency>
        <!--new planner-->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner-blink_2.12</artifactId>
          <version>1.10.1</version>
        </dependency>

    2、代码案例

    package table
    
    import com.yangwj.api.SensorReading
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.Table
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.table.api.scala._
    /**
     * @author yangwj
     * @date 2021/1/12 21:17
     * @version 1.0
     */
    object TableExample {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val inputFile:String = "G:\Java\Flink\guigu\flink\src\main\resources\sensor.txt"
        val input: DataStream[String] = env.readTextFile(inputFile)
    
        val dataStream = input.map(data => {
          val arr: Array[String] = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        })
    
        val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    
        //1、基于流创建表
        val table: Table = tableEnv.fromDataStream(dataStream)
    
        //2、调用table api进行转换
        val result: Table = table.select("id,temperature").filter("id == 'sensor_1'")
        result.toAppendStream[(String, Double)].print("result")
    
        //2、sql实现
        tableEnv.createTemporaryView("tabel",table)
        val sql = "select id, temperature from tabel where id = 'sensor_1'"
    
        val sqlResult: Table = tableEnv.sqlQuery(sql)
        sqlResult.toAppendStream[(String, Double)].print("sqlResult")
        env.execute("table api")
      }
    }
  • 相关阅读:
    斗鱼扩展--localStorage备份与导出(九)
    斗鱼扩展--管理移除房间(八)
    斗鱼扩展--让你看到更多内容(七)
    Ubuntu18.04 安装水星1300M无线网卡
    Course1_Week1_ProgrammingHomeWork
    找出3个数中不为-1的最小数
    马拉车算法
    偏差-方差分解
    决策树如何防止过拟合
    可视化数据集两个类别变量的关系
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14269738.html
Copyright © 2011-2022 走看看