zoukankan      html  css  js  c++  java
  • Flink之TableAPI和SQL(4):表的Sink实现

    相关文章链接

    Flink之TableAPI和SQL(1):基本功能描述

    Flink之TableAPI和SQL(2):表和外部系统的连接方式

    Flink之TableAPI和SQL(3):通过TableAPI和SQL表的一些操作(包括查询,过滤,聚集等)

    Flink之TableAPI和SQL(4):表的Sink实现

    Flink之TableAPI和SQL(5):表的时间特性

    flink中表的输出Sink可以分为3种:

    1、追加模式(Append Mode):

      在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。

    2、撤回模式(Retract Mode):

      在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。

    3、Upsert(更新插入)模式:

      在Upsert模式下,动态表和外部连接器交换Upsert和Delete消息。

    具体实现如下代码所示:

    // 创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    
    // 获取输入流,并转换为表
    val sensorStream: DataStream[SensorReading] = env
        .readTextFile("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt")
        .map(new MyMapToSensorReading)
    val sensorTable: Table = tableEnv.fromDataStream(sensorStream)
    
    // 1、输出到文件
    tableEnv
        .connect(new FileSystem().path("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\out.txt"))
        .withFormat(new Csv())
        .withSchema(new Schema()
            .field("id", DataTypes.STRING())
            .field("timestamp", DataTypes.BIGINT())
            .field("temperature", DataTypes.DOUBLE())
        )
        .createTemporaryTable("outputTable")
    sensorTable.insertInto("outputTable")
    
    // 2、输出到Kafka
    tableEnv
        .connect(new Kafka()
            .version("0.11")
            .topic("flinkTestTopic")
            .property("zookeeper.connect", "cdh1:2181")
            .property("bootstrap.servers", "cdh1:9092")
        )
        .withFormat(new Csv())
        .withSchema(new Schema()
            .field("id", DataTypes.STRING())
            .field("timestamp", DataTypes.BIGINT())
            .field("temperature", DataTypes.DOUBLE())
        )
        .createTemporaryTable("kafkaOutputTable")
    sensorTable.insertInto("kafkaOutputTable")
    
    // 3、输出到es
    tableEnv
        .connect(new Elasticsearch()
            .version("7")
            .host("cdh1", 9200, "http")
            .index("sensor")
            .documentType("temp")
        )
        .inUpsertMode()                                         // 指定为Upsert模式(es支持更新模式)
        .withFormat(new Json())                                 // es只支持json,flink下的json格式
        .withSchema( new Schema()
            .field("id", DataTypes.STRING())
            .field("count", DataTypes.BIGINT())
        )
        .createTemporaryTable("esOutputTable")
    sensorTable.insertInto("esOutputTable")
    
    // 输出到 Mysql
    val sinkDDL: String =
        """
          |create table jdbcOutputTable (
          |  id varchar(20) not null,
          |  cnt bigint not null
          |) with (
          |  'connector.type' = 'jdbc',
          |  'connector.url' = 'jdbc:mysql://localhost:3306/test',
          |  'connector.table' = 'sensor_count',
          |  'connector.driver' = 'com.mysql.jdbc.Driver',
          |  'connector.username' = 'root',
          |  'connector.password' = '123456'
          |""".stripMargin
    tableEnv.sqlUpdate(sinkDDL)
    sensorTable.insertInto("jdbcOutputTable")
    
    // 启动执行器,执行任务
    env.execute("OutputTableDemo")
  • 相关阅读:
    Linux删除文件相关命令
    Bing语句
    VS2013配置Winpcap
    node10-mongoose
    node09-cookie
    node08-express
    node07-http
    node06-path
    node05-fs
    node04-buffer
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14074262.html
Copyright © 2011-2022 走看看