zoukankan      html  css  js  c++  java
  • flink分流合流

    数据:

    sensor_1,1547718101,35.8
    sensor_1,1547718102,22.2
    sensor_1,1547718101,55.3
    sensor_1,1547718102,24.1
    sensor_1,1547718103,57
    sensor_1,1547718103,58
    sensor_1,1547718103,59
    sensor_6,1547718101,15.4
    sensor_7,1547718102,6.7
    sensor_10,1547718205,38.1
    

      

    代码

    import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichMapFunction}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.scala._
    
    object TransformTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        //0.从文件中读取数据
        val inputPath = "D:\ideaDemo\maven_flink\src\main\resources\sensor.txt";
        val inputStream = env.readTextFile(inputPath)
    
        //1.先转换成样例类类型(简单转换操作)
        val dataStream = inputStream.map(data => {
          val arr = data.split(",")
          SensorReding(arr(0), arr(1).toLong, arr(2).toDouble)
        })
    
    
    //4.多流转换
        //4.1分流了,将传感器温度数据分成低温、高温两条流
        val splitSteam = dataStream
          .split(data => {
            if (data.temperature > 30.0) Seq("high") else Seq("low")
          })
        val highTempSteam = splitSteam.select("high")
        val lowTempSteam = splitSteam.select("low")
        val allTempSteam = splitSteam.select("high", "low")
    
    //    highTempSteam.print()
    //    lowTempSteam.print()
    //    allTempSteam.print()
    
        //4.2合流 connect
        val warningSteam = highTempSteam.map(data => (data.id, data.temperature))
        val connectedSteams = warningSteam.connect(lowTempSteam)
    
        //用coMap队数据进行分别处理
        val coMapResultStream: DataStream[Product with Serializable] = connectedSteams
          .map(
            waringData => (waringData._1, waringData._2, "warning"),
            lowTempData => (lowTempData.id, "healthy")
          )
    //    coMapResultStream.print()
    //
    //    //4.3union合流
        val unionStream = highTempSteam.union(lowTempSteam)
    
    
        env.execute()
      }
    author@nohert
  • 相关阅读:
    在线|九月月考选填题
    函数$f(x)=e^xpm e^{-x}$相关
    偶函数性质的推广
    2020年全国卷Ⅱ卷文科数学选填题解析版
    2020年全国卷Ⅱ卷文科数学解答题解析版
    待定系数法
    特殊方法求函数解析式
    phd文献阅读日志-4.1
    phd文献阅读日志-1.2~3.2(1.2,2.1,2.2,3.1,3.2)
    完美解决linux下vim在终端不能用鼠标复制的问题
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/14928190.html
Copyright © 2011-2022 走看看