zoukankan      html  css  js  c++  java
  • Flink 多流转换算子

    1.拆分流

    代码片段:

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    val streamFromFile = env.readTextFile("C:\Users\Mi\Documents\project\idea\FlinkTitorial\src\main\resources\sensor.txt")
    
    val stream1: DataStream[SensorReading] = streamFromFile.map(d => {
      val arr = d.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
    })
    
    //拆分流
    val splitStream: SplitStream[SensorReading] = stream1.split(data => if (data.temperature > 30) Seq("high") else Seq("low"))
    val highStream = splitStream.select("high")
    val lowStream = splitStream.select("low")
    val allStream = splitStream.select("high", "low")
    

    2.collect合并流

    collect合并的两个流,数据类型可以不一样。

    代码片段:

    //合并流
    val warning: DataStream[(String, Double)] = highStream.map(data => (data.id, data.temperature))
    //connect()方法合并流
    val connectedStream: ConnectedStreams[(String, Double), SensorReading] = warning.connect(lowStream)
    //合并流的map操作,为每个流分别传入一个映射函数
    val coMapStream: DataStream[Product] = connectedStream.map(
      data1 => (data1._1, data1._2, "high temperature warning"),
      data2 => (data2.id, "healthy")
    )
    

    3.union合并流

    union合并操作可以传多个流参数,并且因为返回类型为DataStream,支持链式操作。

    代码片段:

    val unionStream: DataStream[SensorReading] = highStream.union(lowStream,highStream).union(lowStream)

      

  • 相关阅读:
    Linux 安装网络yum地址
    MYSQL登录错误:mysqladmin: connect to server at ‘localhost’ failed
    linux 中截取字符串
    screen 调到后台使用
    Yum 安装memcached 与缓存清空
    LAMP 环境搭建
    DELL--R420 CPU报警“CPU0000 cpu2 internal error (IERR)contact support”
    解决vim粘贴时格式混乱的问题
    DELL 管理软件安装
    windows 使用SVN命令
  • 原文地址:https://www.cnblogs.com/noyouth/p/12741181.html
Copyright © 2011-2022 走看看