zoukankan      html  css  js  c++  java
  • Flink--将表转换为DataStream或DataSet

    A Table可以转换成a DataStream或DataSet。通过这种方式,可以在Table API或SQL查询的结果上运行自定义的DataStream或DataSet程序

    将表转换为DataStream

    有两种模式可以将 Table转换为DataStream:

    1:Append Mode

    将一个表附加到流上

    2:Retract Mode

    将表转换为流

    语法格式:

     

    // get TableEnvironment. 
    // registration of a DataSet is equivalent
    // ge val tableEnv = TableEnvironment.getTableEnvironment(env)
    
    // Table with two fields (String name, Integer age)
    val table: Table = ...
    
    // convert the Table into an append DataStream of Row
    val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
    
    // convert the Table into an append DataStream of Tuple2[String, Int]
    val dsTuple: DataStream[(String, Int)] dsTuple = 
      tableEnv.toAppendStream[(String, Int)](table)
    
    // convert the Table into a retract DataStream of Row.
    //   A retract stream of type X is a DataStream[(Boolean, X)]. 
    //   The boolean field indicates the type of the change. 
    //   True is INSERT, false is DELETE.
    val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

    例子:

    object TableTODataSet_DataStream {
      def main(args: Array[String]): Unit = {
        //构造数据,转换为table
        val data = List(
          Peoject(1L, 1, "Hello"),
          Peoject(2L, 2, "Hello"),
          Peoject(3L, 3, "Hello"),
          Peoject(4L, 4, "Hello"),
          Peoject(5L, 5, "Hello"),
          Peoject(6L, 6, "Hello"),
          Peoject(7L, 7, "Hello World"),
          Peoject(8L, 8, "Hello World"),
          Peoject(8L, 8, "Hello World"),
          Peoject(20L, 20, "Hello World"))
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val tEnv = TableEnvironment.getTableEnvironment(env)
        val stream = env.fromCollection(data)
        val table: Table = tEnv.fromDataStream(stream)
        //TODO 将table转换为DataStream----将一个表附加到流上Append Mode
        val appendStream: DataStream[Peoject] = tEnv.toAppendStream[Peoject](table)
        //TODO 将表转换为流Retract Mode true代表添加消息,false代表撤销消息
        val retractStream: DataStream[(Boolean, Peoject)] = tEnv.toRetractStream[Peoject](table)
        retractStream.print()
        env.execute()
    
      }
    }
    
    case class Peoject(user: Long, index: Int, content: String)

    将表转换为DataSet

    语法格式:

    // get TableEnvironment 
    // registration of a DataSet is equivalent
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    
    // Table with two fields (String name, Integer age)
    val table: Table = ...
    
    // convert the Table into a DataSet of Row
    val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
    
    // convert the Table into a DataSet of Tuple2[String, Int]
    val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

    例子:

    case class Peoject(user: Long, index: Int, content: String)
    
    object TableTODataSet{
      def main(args: Array[String]): Unit = {
    
        //构造数据,转换为table
        val data = List(
          Peoject(1L, 1, "Hello"),
          Peoject(2L, 2, "Hello"),
          Peoject(3L, 3, "Hello"),
          Peoject(4L, 4, "Hello"),
          Peoject(5L, 5, "Hello"),
          Peoject(6L, 6, "Hello"),
          Peoject(7L, 7, "Hello World"),
          Peoject(8L, 8, "Hello World"),
          Peoject(8L, 8, "Hello World"),
          Peoject(20L, 20, "Hello World"))
        //初始化环境,加载table数据
        val env = ExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val tableEnvironment = TableEnvironment.getTableEnvironment(env)
        val collection: DataSet[Peoject] = env.fromCollection(data)
        val table: Table = tableEnvironment.fromDataSet(collection)
        //TODO 将table转换为dataSet
        val toDataSet: DataSet[Peoject] = tableEnvironment.toDataSet[Peoject](table)
        toDataSet.print()
    //    env.execute()
      }
    }
  • 相关阅读:
    JSON学习笔记
    Java面试题之对static的理解
    【知了堂学习笔记】java基础知识之继承
    【知了堂学习笔记】多态基本知识
    Final关键字
    子父类构造函数特点
    原来学编程这么简单,如何理解程序的本质(今天听了【遇见狂神说】发布的《从HelloWorld到程序本质的思考》这个视频,有了自己的一些感悟,在这里和大家做一个分享)
    浅谈c3p0连接池和dbutils工具类的使用
    Mysql数据库重要知识点
    Express安装与调试
  • 原文地址:https://www.cnblogs.com/niutao/p/10548703.html
Copyright © 2011-2022 走看看