zoukankan      html  css  js  c++  java
  • Flink Table环境配置、读取外部数据(File、Kafka)以及查询转换

    知识点

    FlinkTable步骤:
      // 1、创建表的执行环境 
      val tableEnv = ... 
      // 2、创建一张表,用于读取数据 
      tableEnv.connect(...).createTemporaryTable("inputTable") 
      // 3、1通过 Table API 查询算子,得到一张结果表 
      val result = tableEnv.from("inputTable").select(...) 
      // 3、2通过 SQL 查询语句,得到一张结果表 
      val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...") 
      // 4、注册一张表,用于把计算结果输出 
      tableEnv.connect(...).createTemporaryTable("outputTable") 
      // 5、将结果表写入输出表中 
      result.insertInto("outputTable")

    1、CSV文件依赖

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-csv</artifactId>
          <version>1.10.1</version>
        </dependency>

        <!-- old planner flink table-->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner_2.12</artifactId>
        <version>1.10.1</version>

        </dependency>

        <!--new planner-->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner-blink_2.12</artifactId>
          <version>1.10.1</version>
        </dependency>

    2、代码案例

    package table
    
    import org.apache.flink.api.scala.ExecutionEnvironment
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table, TableEnvironment}
    import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
    import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, OldCsv, Schema}
    import org.apache.flink.table.api.scala._
    import org.apache.flink.streaming.api.scala._
    /**
     * @author yangwj
     * @date 2021/1/12 21:53
     * @version 1.0
     */
    object TableApiTest {
      def main(args: Array[String]): Unit = {
        //1、创建表执行环境、就得使用流式环境
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    /**
        //1、1老版本planner的流处理
        val setttings = EnvironmentSettings.newInstance()
          .useOldPlanner()
          .inStreamingMode()
          .build()
        val oldStreamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, setttings)
        //1.2老版本的批处理
        val batchEnv = ExecutionEnvironment.getExecutionEnvironment
        val oldBatchTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(batchEnv)
    
        //1.1新版本,基于blink planner的流处理
        val blinkStreamSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
          .useBlinkPlanner()
          .inStreamingMode()
          .build()
        val blinkStreamTableEnv = StreamTableEnvironment.create(env,blinkStreamSettings)
    
        //1.2新版本,基于blink planner的批处理
        val blinkBatchSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
          .useBlinkPlanner()
          .inBatchMode()
          .build()
        val blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings)
    
      **/
        //2、连接外部系统,读取数据,注册表
        //2.1读取文件
        val inputFile:String = "G:\Java\Flink\guigu\flink\src\main\resources\sensor.txt"
        tableEnv.connect(new FileSystem().path(inputFile))
          // new OldCsv()是一个非标的格式描述
          .withFormat(new Csv())
          .withSchema(new Schema().field("id",DataTypes.STRING())
          .field("timestamp",DataTypes.BIGINT())
          .field("temperature",DataTypes.DOUBLE())
          )
          .createTemporaryTable("inputTable")
    
        val inputTable: Table = tableEnv.from("inputTable")
        inputTable.toAppendStream[(String,Long,Double)].print("result")
    
    
        //2.2读取kafka数据
        tableEnv.connect(new Kafka()
        .version("0.11")
        .topic("Demo")
        .property("zookeeper.connect","localhost:2181")
        .property("bootstrap.servers","localhost:9092")
        )
            .withFormat(new Csv())
            .withSchema(new Schema().field("id",DataTypes.STRING())
              .field("timestamp",DataTypes.BIGINT())
              .field("temperature",DataTypes.DOUBLE())
            ).createTemporaryTable("kafkaTable")
    
        val kafkaTable: Table = tableEnv.from("kafkaTable")
        kafkaTable.toAppendStream[(String,Long,Double)].print("kafkaResult")
    
    
        //3、查询转换
        //3.1 使用table api
        val sensorTable: Table = tableEnv.from("inputTable")
        val apiResult: Table = sensorTable.select('id, 'temperature)
          .filter('id === "sensor_1")
    
        //3.2sql实现
        val sqlResult: Table = tableEnv.sqlQuery(
          """
            |select id ,temperature
            |from inputTable
            |where id = 'sensor_1'
            """.stripMargin)
    
        apiResult.toAppendStream[(String, Double)].print("apiResult")
        sqlResult.toAppendStream[(String, Double)].print("sqlResult")
        env.execute("table api test")
      }
    
    }
  • 相关阅读:
    reset代码
    将博客搬至CSDN
    超简单的JNI——NDK开发教程
    浅谈Backbone的defaults
    为什么NaN !== NaN
    javascript小技巧[转]
    new function()随笔
    小记js中普通function和arrow function内this的使用区别
    evernote出现“Sync failed due to unexpected problem at server side”的问题
    强迫症和拖延症患者如何应对马桶4(遨游Maxthon)“上次未关闭页面”丢失的问题
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14269770.html
Copyright © 2011-2022 走看看