zoukankan      html  css  js  c++  java
  • flink Sink file

    import it.bigdata.flink.study.SensorReding
    import org.apache.flink.api.common.serialization.SimpleStringEncoder
    import org.apache.flink.core.fs.Path
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
    import org.apache.flink.streaming.api.scala._
    
    object FileSink {
      def main(args: Array[String]): Unit = {
        //创建环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //读取文件数据
        val inputPath = "D:\ideaDemo\maven_flink\src\main\resources\sensor.txt"
        val inputSteam = env.readTextFile(inputPath)
    
        //简单的转换
        val dataSteam = inputSteam.map(data => {
          val arr = data.split(",")
          SensorReding(arr(0), arr(1).toLong, arr(2).toDouble).toString
        })
    
        //sink到文件
    //    dataSteam.writeAsCsv("D:\ideaDemo\maven_flink\src\main\resources\out.txt")
    
    
        dataSteam.addSink(
          StreamingFileSink.forRowFormat(
            new Path("D:\ideaDemo\maven_flink\src\main\resources\out1.txt"),
            new SimpleStringEncoder[String]()
          ).build()
        )
    
        dataSteam.print()
    
        //调用执行环境
        env.execute("file sink test")
    
      }
    }
    author@nohert
  • 相关阅读:
    聊一聊Java泛型的擦除
    微信退款通知信息解密
    Spring Boot 初识
    shiro初识
    Redis 初探
    Java Json库的选取准则
    JAVA 几款Json library的比较
    FUSE简介
    Lab 2 源码分析
    Lab2
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/14928233.html
Copyright © 2011-2022 走看看