将DataStream或DataSet转换为表格
在上面的例子讲解中,直接使用的是:registerTableSource注册表
对于flink来说,还有更灵活的方式:比如直接注册DataStream或者DataSet转换为一张表。
然后DataStream或者DataSet就相当于表,这样可以继续使用SQL来操作流或者批次的数据
语法:
// get TableEnvironment // registration of a DataSet is equivalent Env:DataStream val tableEnv = TableEnvironment.getTableEnvironment(env) val stream: DataStream[(Long, String)] = ... // register the DataStream as Table "myTable" with fields "f0", "f1" tableEnv.registerDataStream("myTable", stream)
object SQLToDataSetAndStreamSet { def main(args: Array[String]): Unit = { // set up execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) //构造数据 val orderA: DataStream[Order] = env.fromCollection(Seq( Order(1L, "beer", 3), Order(1L, "diaper", 4), Order(3L, "rubber", 2))) val orderB: DataStream[Order] = env.fromCollection(Seq( Order(2L, "pen", 3), Order(2L, "rubber", 3), Order(4L, "beer", 1))) // 根据数据注册表 tEnv.registerDataStream("OrderA", orderA) tEnv.registerDataStream("OrderB", orderB) // union the two tables val result = tEnv.sqlQuery( "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " + "SELECT * FROM OrderB WHERE amount < 2") result.writeToSink(new CsvTableSink("ccc" , "," , 1 , FileSystem.WriteMode.OVERWRITE)) env.execute() } } case class Order(user: Long, product: String, amount: Int)