zoukankan      html  css  js  c++  java
  • Flink 多流转换算子

    1.拆分流

    代码片段:

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    val streamFromFile = env.readTextFile("C:\Users\Mi\Documents\project\idea\FlinkTitorial\src\main\resources\sensor.txt")
    
    val stream1: DataStream[SensorReading] = streamFromFile.map(d => {
      val arr = d.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
    })
    
    //拆分流
    val splitStream: SplitStream[SensorReading] = stream1.split(data => if (data.temperature > 30) Seq("high") else Seq("low"))
    val highStream = splitStream.select("high")
    val lowStream = splitStream.select("low")
    val allStream = splitStream.select("high", "low")
    

    2.collect合并流

    collect合并的两个流,数据类型可以不一样。

    代码片段:

    //合并流
    val warning: DataStream[(String, Double)] = highStream.map(data => (data.id, data.temperature))
    //connect()方法合并流
    val connectedStream: ConnectedStreams[(String, Double), SensorReading] = warning.connect(lowStream)
    //合并流的map操作,为每个流分别传入一个映射函数
    val coMapStream: DataStream[Product] = connectedStream.map(
      data1 => (data1._1, data1._2, "high temperature warning"),
      data2 => (data2.id, "healthy")
    )
    

    3.union合并流

    union合并操作可以传多个流参数,并且因为返回类型为DataStream,支持链式操作。

    代码片段:

    val unionStream: DataStream[SensorReading] = highStream.union(lowStream,highStream).union(lowStream)

      

  • 相关阅读:
    小程序自定义组件(3)子向父传参
    postgresql插件安装
    二进制减法的实现
    mysql锁表问题
    mysql查看修改参数
    众数问题-找出超过一半的数
    只出现一次的数
    元素最大间距离
    第一个缺失数字
    局部最小值位置
  • 原文地址:https://www.cnblogs.com/noyouth/p/12741181.html
Copyright © 2011-2022 走看看