zoukankan      html  css  js  c++  java
  • flink 读取 CSV 文件,并将 DataStream 转 Table 对象

    package com.myflink
    
    import java.lang.reflect.Field
    import java.util
    
    import org.apache.flink.api.common.typeinfo._
    import org.apache.flink.api.java.io.{PojoCsvInputFormat, RowCsvInputFormat}
    import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
    import org.apache.flink.core.fs.Path
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.table.api.Tumble
    import org.apache.flink.table.api.scala._
    import org.apache.flink.types.Row
    
    object Main {
    
      def main(args: Array[String]): Unit = {
        import org.apache.flink.streaming.api.scala._
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment;
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
        val senSor = SenSor("a", 0L, 0.2);
        val lst = new util.ArrayList[PojoField]();
        val arrFields: Array[Field] = senSor.getClass.getDeclaredFields;
        for (field <- arrFields) {
          lst.add(new PojoField(field, TypeInformation.of(field.getType)));
        }
    
        val path = new Path("D:\allspace\flink0906\src\main\resources\input\test.csv");
        val ds: DataStream[SenSor] = env
          .createInput(new PojoCsvInputFormat(path, new PojoTypeInfo(classOf[SenSor], lst)))
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SenSor](Time.seconds(10)) {
            override def extractTimestamp(element: SenSor): Long = element.timestamp * 1000
          });
    
        val tableEnv = StreamTableEnvironment.create(env);
        val table = tableEnv.fromDataStream(ds, 'id, 'timestamp.rowtime, 'uempearture);
        table.window(Tumble over 10.seconds on 'timestamp as 'tw)
          .groupBy('id, 'tw)
          .select('id, 'id.count, 'uempearture.avg, 'tw.end)
          .toRetractStream[Row].print()
    
    
        env.execute();
      }
    
    }
    
    case class SenSor(var id: String, var timestamp: Long, var uempearture: Double) {};
    

      这里面坑:

    SenSor 对象的属性,在构造 PojoTypeInfo 时按照名字重排序,这直接造成对 csv 文件解析出错。

    csv文件内容:
    sensor_1,1547718199,35.8
    sensor_6,1547718201,15.4

    flink版本为 1.10.1




  • 相关阅读:
    uoj #2 【NOI2014】起床困难综合症 贪心+位运算
    codeforces 620E. New Year Tree dfs序+线段树+bitset
    leetcode 29. Divide Two Integers
    leetcode 15. 3Sum 双指针
    leetcode 211. Add and Search Word
    codeforces 464C. Substitutes in Number
    在线CDN代码-jq jquery
    渐变色--浏览器兼容性
    URL编码表%20Base64编码表%20HTTP消息含义
    《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南
  • 原文地址:https://www.cnblogs.com/wudeyun/p/13688745.html
Copyright © 2011-2022 走看看