zoukankan      html  css  js  c++  java
  • flink Sink file

    import it.bigdata.flink.study.SensorReding
    import org.apache.flink.api.common.serialization.SimpleStringEncoder
    import org.apache.flink.core.fs.Path
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
    import org.apache.flink.streaming.api.scala._
    
    object FileSink {
      def main(args: Array[String]): Unit = {
        //创建环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //读取文件数据
        val inputPath = "D:\ideaDemo\maven_flink\src\main\resources\sensor.txt"
        val inputSteam = env.readTextFile(inputPath)
    
        //简单的转换
        val dataSteam = inputSteam.map(data => {
          val arr = data.split(",")
          SensorReding(arr(0), arr(1).toLong, arr(2).toDouble).toString
        })
    
        //sink到文件
    //    dataSteam.writeAsCsv("D:\ideaDemo\maven_flink\src\main\resources\out.txt")
    
    
        dataSteam.addSink(
          StreamingFileSink.forRowFormat(
            new Path("D:\ideaDemo\maven_flink\src\main\resources\out1.txt"),
            new SimpleStringEncoder[String]()
          ).build()
        )
    
        dataSteam.print()
    
        //调用执行环境
        env.execute("file sink test")
    
      }
    }
    author@nohert
  • 相关阅读:
    软件工程课程建议
    结对编程2
    结对编程---《四则运算》
    AVAudioPlayer播放音乐
    《问吧》需求分析
    有关结对编程的感想
    UItabBarController
    ViewController 视图控制器的常用方法
    <问吧>调查问卷心得体会
    UINavigationController导航控制器
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/14928233.html
Copyright © 2011-2022 走看看