zoukankan      html  css  js  c++  java
  • Flink学习(七) 多流转换算子 拆分合并流

    一、Split 和 Select (使用split切分过的流是不能被二次切分的)

     DataStream --> SplitStream : 根据特征把一个DataSteam 拆分成两个或者多个DataStream.

     SplitStream --> DataStream:从一个SplitStream中获取一个或者多个DataStream。

    二、Connect 和 CoMap / CoFlatMap

     DataStream,DataStream --> ConnectedStream:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持着各自的数据和形式,不发生变化,两个流相互独立。

     ConnectedStream --> DataStream:作用与 ConnectedStream上,功能与map和Flatmap一样,对 ConnectedStream中的每一个Stream分别进行map和flatmap处理。

    三、Union

     DataStream --> DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream

    注意:Connect 与 Union区别:

    1、Union之前两个流的类型必须是一样的,Conect可以不一样,并且Connect之后进行coMap中调整为一样的。

    2、Connect只能操作两个流,Union可以操作多个。

    综合代码:(可直接运行,数据在注释中)

    package com.wyh.streamingApi.Transform
    
    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.streaming.api.scala._
    
    
    //温度传感器读数样例类
    case class SensorReading(id: String, timestamp: Long, temperature: Double)
    
    object TransformTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        /**
          * sensor_1,1547718199,35.80018327300259
          * sensor_6,1547718201,15.402984393403084
          * sensor_7,1547718202,6.720945201171228
          * sensor_10,1547718205,38.1010676048934444
          * sensor_1,1547718199,35.1
          * sensor_1,1547718199,31.0
          * sensor_1,1547718199,39
          */
        val streamFromFile = env.readTextFile("F:\flink-study\wyhFlinkSD\data\sensor.txt")
    
    
        //基本转换算子和滚动聚合算子=======================================================================================
        /**
          * map keyBy sum
          */
        val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
        })
    
        //    dataStream.keyBy(0).sum(2).printToErr("keyBy test")
    
        //scala强类型语言 只有_.id 可以指定返回类型
        val aggStream: KeyedStream[SensorReading, String] = dataStream.keyBy(_.id)
        val stream1: DataStream[SensorReading] = aggStream.sum("temperature")
        //    stream1.printToErr("scala强类型语言")
    
    
        /**
          * reduce
          *
          * 输出当前传感器最新的温度要加10,时间戳是上一次数据的时间加1
          */
        aggStream.reduce(new ReduceFunction[SensorReading] {
          override def reduce(t: SensorReading, t1: SensorReading): SensorReading = {
            SensorReading(t.id, t.timestamp + 1, t1.temperature + 10)
          }
        }) //.printToErr("reduce test")
    
    
        //多流转换算子====================================================================================================
        /**
          * 分流
          * split select
          * DataStream --> SplitStream --> DataStream
          *
          * 需求:传感器数据按照温度高低(以30度为界),拆分成两个流
          */
        val splitStream = dataStream.split(data => {
          //盖上戳 后面进行分拣
          if (data.temperature > 30) {
            Seq("high")
          } else if (data.temperature < 10) {
            Seq("low")
          } else {
            Seq("health")
          }
        })
    
        //根据戳进行分拣
        val highStream = splitStream.select("high")
        val lowStream = splitStream.select("low")
        val healthStream = splitStream.select("health")
    
        //可以传多个参数,一起分拣出来
        val allStream = splitStream.select("high", "low")
    
    
        //    highStream.printToErr("high")
        //    lowStream.printToErr("low")
        //    allStream.printToErr("all")
        //    healthStream.printToErr("healthStream")
    
        /**
          * 合并      注意: Connect 只能进行两条流进行合并,但是比较灵活,不同流的数据结构可以不一样
          * Connect CoMap/CoFlatMap
          *
          * DataStream --> ConnectedStream --> DataStream
          */
        val warningStream = highStream.map(data => (data.id, data.temperature))
        val connectedStream = warningStream.connect(lowStream)
    
        val coMapDataStream = connectedStream.map(
          warningData => (warningData._1, warningData._2, "温度过高报警!!"),
          lowData => (lowData.id, lowData.temperature, "温度过低报警===")
        )
    
        //    coMapDataStream.printToErr("合并流")
    
        /**
          * 合并多条流  注意: 要求数据结构必须要一致,一样
          *
          * Union   DataStream --> DataSteam    就没有一个中间转换操作了
          *
          */
    
        val highS = highStream.map(h => (h.id, h.timestamp, h.temperature, "温度过高报警!!"))
        val lowS = lowStream.map(l => (l.id, l.timestamp, l.temperature, "温度过低报警==="))
        val healthS = healthStream.map(l => (l.id, l.timestamp, l.temperature, "健康"))
    
        val unionStream = highS.union(lowS).union(healthS)
    
        unionStream.printToErr("union合并")
    
    
        env.execute("transform test")
      }
    
    }
  • 相关阅读:
    WPF中C#代码触发鼠标点击事件
    PHP代码实现强制换行
    C#中判断系统的架构(32位,还是64位)
    WPF的System.Windows.Threading.DispatcherTimer的使用(每隔一定的时间重复做某事)
    LINQ to Objects系列(2)两种查询语法介绍
    LINQ to Objects系列(1)相关技术准备
    常用技术社区和网站总结
    .net项目技术选型总结
    java与.net比较学习系列(7) 属性
    java与.net比较学习系列(6) 数组
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12892277.html
Copyright © 2011-2022 走看看