zoukankan      html  css  js  c++  java
  • flink file source

     1 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
     2 
     3 object FlinkDemo04_CommonSource_fromFile {
     4     case class Flight(avgTicketPrice:String, cancelled:String, carrier:String, dest:String, destAirportID:String, origin:String, originAirportID:String)
     5     def main(args: Array[String]): Unit = {
     6         
     7         //1 创建环境
     8         val env = StreamExecutionEnvironment.getExecutionEnvironment
     9         //2 获取DataStream
    10         val dStream: DataStream[String] = env.readTextFile("I:\projectImplement\dataWareHouse\test-es\data\630data.csv")
    11         //3 计算
    12         import org.apache.flink.api.scala._
    13         dStream.map{
    14             line=>
    15             val vals = line.split(",")
    16             Flight(vals(0),vals(1),vals(2),vals(3),vals(4),vals(5),vals(6))
    17         }.print()
    18         //4 执行
    19         env.execute("stream job")
    20     }
    21 }
    View Code
  • 相关阅读:
    Easy Code 自定义的模板
    LINUX批量修改文件名
    解决FTP登录太慢
    linux 命令
    Linux rename命令
    MySQL字段重复出现多少次
    kafka安装
    Redis 5.0简单安装
    Tomcat常用配置
    jenkins安装和简单配置
  • 原文地址:https://www.cnblogs.com/xiefeichn/p/13174974.html
Copyright © 2011-2022 走看看