zoukankan      html  css  js  c++  java
  • Flink--Table和DataStream和DataSet的集成

    将DataStream或DataSet转换为表格

    在上面的例子讲解中,直接使用的是:registerTableSource注册表

    对于flink来说,还有更灵活的方式:比如直接注册DataStream或者DataSet转换为一张表。

    然后DataStream或者DataSet就相当于表,这样可以继续使用SQL来操作流或者批次的数据

    语法:

    // get TableEnvironment 
    // registration of a DataSet is equivalent
    Env:DataStream
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    
    val stream: DataStream[(Long, String)] = ...
    
    // register the DataStream as Table "myTable" with fields "f0", "f1"
    tableEnv.registerDataStream("myTable", stream)
    object SQLToDataSetAndStreamSet {
      def main(args: Array[String]): Unit = {
    
        // set up execution environment
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val tEnv = TableEnvironment.getTableEnvironment(env)
        //构造数据
        val orderA: DataStream[Order] = env.fromCollection(Seq(
          Order(1L, "beer", 3),
          Order(1L, "diaper", 4),
          Order(3L, "rubber", 2)))
        val orderB: DataStream[Order] = env.fromCollection(Seq(
          Order(2L, "pen", 3),
          Order(2L, "rubber", 3),
          Order(4L, "beer", 1)))
        // 根据数据注册表
        tEnv.registerDataStream("OrderA", orderA)
        tEnv.registerDataStream("OrderB", orderB)
        // union the two tables
        val result = tEnv.sqlQuery(
          "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
            "SELECT * FROM OrderB WHERE amount < 2")
        result.writeToSink(new CsvTableSink("ccc" , "," , 1 , FileSystem.WriteMode.OVERWRITE))
        env.execute()
      }
    }
    case class Order(user: Long, product: String, amount: Int)
  • 相关阅读:
    html的基本框架和常用标签
    防火墙
    Zenmap
    每日一招:熟练掌握变盘方向
    每日一招:赚钱最快的选股策略
    操盘策略:黄金做单时间
    每日一招:坚守六大方式选出优质股
    如何保卫你的牛市胜利果实?
    名家看后市:长阴破位不必慌
    每日一招:补仓需遵守的技巧
  • 原文地址:https://www.cnblogs.com/niutao/p/10548690.html
Copyright © 2011-2022 走看看