相关文章链接
Flink之TableAPI和SQL(2):表和外部系统的连接方式
Flink之TableAPI和SQL(3):通过TableAPI和SQL表的一些操作(包括查询,过滤,聚集等)
Flink之TableAPI和SQL(4):表的Sink实现
具体实现如下代码所示:
// 1、创建执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(2) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) // 2、从流中获取表 val sensorStream: DataStream[SensorReading] = env .readTextFile("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt") .map(new MyMapToSensorReading) val sensorTable: Table = tableEnv.fromDataStream(sensorStream) // 将流转换成表时可以指定获取字段,或给字段修改别名 val sensorTable_tmp: Table = tableEnv.fromDataStream(sensorStream, 'id, 'temperature as 'temp) // 使用流创建临时视图时,可以指定获取字段,或给字段修改别名 tableEnv.createTemporaryView("sensorTable_tmp", sensorStream, 'id, 'temperature as 'temp) // 3、使用TableAPI对表进行操作 // 查询过滤操作 val resultApiTable_1: Table = sensorTable .select('id, 'temperature) .filter('id === "sensor_1") // 分组聚合操作 val resultApiTable_2: Table = sensorTable .groupBy('id) .select('id, 'id.count as 'cnt) // 4、使用sql对表进行操作(需先使用临时视图将表注册到catalog中) tableEnv.createTemporaryView("sensorTable", sensorTable) // 查询过滤操作 val resultSqlTable_1: Table = tableEnv.sqlQuery( """ |select id, temperature |from sensorTable |where id = 'sensor_1' |""".stripMargin) // 分组聚合操作 val resultSqlTable_2: Table = tableEnv.sqlQuery( """ |select id, count(id) as cnt |from sensorTable |group by id |""".stripMargin) // 5、打印表 // resultSqlTable_1.printSchema() // resultSqlTable_1.toAppendStream[Row].print() resultSqlTable_2.printSchema() resultSqlTable_2.toRetractStream[Row].print() // 启动执行器,执行任务 env.execute("OperationTableDemo")