zoukankan      html  css  js  c++  java
  • Flink之API的使用(2):Transform算子的使用

    相关文章链接

    Flink之API的使用(1):Sink的使用

    Flink之API的使用(2):Transform算子的使用

    Flink之API的使用(3):Source的使用

    具体代码实现如下所示:

    // 执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    
    // 获取数据,并转换成流
    val fileStream: DataStream[String] = env.readTextFile("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt")
    
    // 1、map
    val mapStream: DataStream[SensorReading] = fileStream.map(data => {
        val fields: Array[String] = data.split(",")
        SensorReading(fields(0).trim, fields(1).trim.toLong, fields(2).trim.toDouble)
    })
    
    // 2、filter
    val filterStream: DataStream[SensorReading] = mapStream.filter(_.id == "sensor_1")
    
    // 3、flatMap
    val flatMapStream: DataStream[String] = fileStream.flatMap(_.split(","))
    
    // 4、keyBy
    val keyByStream: KeyedStream[SensorReading, String] = mapStream.keyBy(_.id)
    
    // 5、Rolling Aggregation(滚动聚合算子,需要先进行keyBy,才进行聚合,包括sum,min,max,minBy,maxBy)
    val sumStream: DataStream[SensorReading] = keyByStream.sum("temperature")
    val maxStream: DataStream[SensorReading] = keyByStream.max(2)
    
    // 6、reduce
    val reduceStream: DataStream[SensorReading] = keyByStream.reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    // 7、Split 和 Select
    val splitStream: SplitStream[SensorReading] = mapStream.split(sensorReading => {
        if (sensorReading.temperature < 30) Seq("low") else Seq("high")
    })
    val lowStream: DataStream[SensorReading] = splitStream.select("low")
    val highStream: DataStream[SensorReading] = splitStream.select("high")
    val allStream: DataStream[SensorReading] = splitStream.select("low", "high")
    
    // 8、Connect和 CoMap
    val warning: DataStream[(String, Double)] = highStream.map(sonsorData => (sonsorData.id, sonsorData.temperature))
    val connected: ConnectedStreams[(String, Double), SensorReading] = warning.connect(lowStream)
    val connectedResult: DataStream[Product] = connected.map(
        warningData => (warningData._1, warningData._2, "warning"),
        lowData => (lowData.id, "healthy")
    )
    
    // 9、Union
    val unionStream: DataStream[SensorReading] = lowStream.union(highStream)
    
    // 打印数据
    unionStream.print()
    
    // 启动执行环境,开始任务
    env.execute("TransformDemo")
    你现在所遭遇的每一个不幸,都来自一个不肯努力的曾经
  • 相关阅读:
    WEB常见漏洞合集
    SQL注入个人理解及思路(包括payload和绕过的一些方式)
    渗透测试流程
    kali 中文乱码解决方法
    python编写的banner获取代码的两种方式
    python编写banner获取的常用模块
    Python安全基础编写
    oracle数据库(四)
    oracle数据库(三)
    oracle数据库(二)
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14133307.html
Copyright © 2011-2022 走看看