zoukankan      html  css  js  c++  java
  • flink 读取 CSV 文件,并将 DataStream 转 Table 对象

    package com.myflink
    
    import java.lang.reflect.Field
    import java.util
    
    import org.apache.flink.api.common.typeinfo._
    import org.apache.flink.api.java.io.{PojoCsvInputFormat, RowCsvInputFormat}
    import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
    import org.apache.flink.core.fs.Path
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.table.api.Tumble
    import org.apache.flink.table.api.scala._
    import org.apache.flink.types.Row
    
    object Main {
    
      def main(args: Array[String]): Unit = {
        import org.apache.flink.streaming.api.scala._
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment;
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
        val senSor = SenSor("a", 0L, 0.2);
        val lst = new util.ArrayList[PojoField]();
        val arrFields: Array[Field] = senSor.getClass.getDeclaredFields;
        for (field <- arrFields) {
          lst.add(new PojoField(field, TypeInformation.of(field.getType)));
        }
    
        val path = new Path("D:\allspace\flink0906\src\main\resources\input\test.csv");
        val ds: DataStream[SenSor] = env
          .createInput(new PojoCsvInputFormat(path, new PojoTypeInfo(classOf[SenSor], lst)))
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SenSor](Time.seconds(10)) {
            override def extractTimestamp(element: SenSor): Long = element.timestamp * 1000
          });
    
        val tableEnv = StreamTableEnvironment.create(env);
        val table = tableEnv.fromDataStream(ds, 'id, 'timestamp.rowtime, 'uempearture);
        table.window(Tumble over 10.seconds on 'timestamp as 'tw)
          .groupBy('id, 'tw)
          .select('id, 'id.count, 'uempearture.avg, 'tw.end)
          .toRetractStream[Row].print()
    
    
        env.execute();
      }
    
    }
    
    case class SenSor(var id: String, var timestamp: Long, var uempearture: Double) {};
    

      这里面坑:

    SenSor 对象的属性,在构造 PojoTypeInfo 时按照名字重排序,这直接造成对 csv 文件解析出错。

    csv文件内容:
    sensor_1,1547718199,35.8
    sensor_6,1547718201,15.4

    flink版本为 1.10.1




  • 相关阅读:
    WHERE col1=val1 AND col2=val2;index exists on col1 and col2, the appropriate rows can be fetched directly
    MySQL 交集 实现方法
    MBProgressHUD的使用
    Xcode4 使用 Organizer 分析 Crash logs(转)
    SimpleXML 使用详细例子
    PHP的XML Parser(转)
    iPhone,iPhone4,iPad程序启动画面的总结 (转)
    Pop3得到的Email 信件格式介绍
    yii总结
    隐藏Tabbar的一些方法
  • 原文地址:https://www.cnblogs.com/wudeyun/p/13688745.html
Copyright © 2011-2022 走看看