zoukankan      html  css  js  c++  java
  • Flink之API的使用(2):Transform算子的使用

    相关文章链接

    Flink之API的使用(1):Sink的使用

    Flink之API的使用(2):Transform算子的使用

    Flink之API的使用(3):Source的使用

    具体代码实现如下所示:

    // 执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    
    // 获取数据,并转换成流
    val fileStream: DataStream[String] = env.readTextFile("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt")
    
    // 1、map
    val mapStream: DataStream[SensorReading] = fileStream.map(data => {
        val fields: Array[String] = data.split(",")
        SensorReading(fields(0).trim, fields(1).trim.toLong, fields(2).trim.toDouble)
    })
    
    // 2、filter
    val filterStream: DataStream[SensorReading] = mapStream.filter(_.id == "sensor_1")
    
    // 3、flatMap
    val flatMapStream: DataStream[String] = fileStream.flatMap(_.split(","))
    
    // 4、keyBy
    val keyByStream: KeyedStream[SensorReading, String] = mapStream.keyBy(_.id)
    
    // 5、Rolling Aggregation(滚动聚合算子,需要先进行keyBy,才进行聚合,包括sum,min,max,minBy,maxBy)
    val sumStream: DataStream[SensorReading] = keyByStream.sum("temperature")
    val maxStream: DataStream[SensorReading] = keyByStream.max(2)
    
    // 6、reduce
    val reduceStream: DataStream[SensorReading] = keyByStream.reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    // 7、Split 和 Select
    val splitStream: SplitStream[SensorReading] = mapStream.split(sensorReading => {
        if (sensorReading.temperature < 30) Seq("low") else Seq("high")
    })
    val lowStream: DataStream[SensorReading] = splitStream.select("low")
    val highStream: DataStream[SensorReading] = splitStream.select("high")
    val allStream: DataStream[SensorReading] = splitStream.select("low", "high")
    
    // 8、Connect和 CoMap
    val warning: DataStream[(String, Double)] = highStream.map(sonsorData => (sonsorData.id, sonsorData.temperature))
    val connected: ConnectedStreams[(String, Double), SensorReading] = warning.connect(lowStream)
    val connectedResult: DataStream[Product] = connected.map(
        warningData => (warningData._1, warningData._2, "warning"),
        lowData => (lowData.id, "healthy")
    )
    
    // 9、Union
    val unionStream: DataStream[SensorReading] = lowStream.union(highStream)
    
    // 打印数据
    unionStream.print()
    
    // 启动执行环境,开始任务
    env.execute("TransformDemo")
    你现在所遭遇的每一个不幸,都来自一个不肯努力的曾经
  • 相关阅读:
    Twitter网站架构学习笔记
    优化和架构之服务切分
    现代浏览器的工作原理
    可伸缩性原则
    图解:2013年百度搜索引擎工作原理
    构建的可伸缩性和达到的性能:一个虚拟座谈会
    提升可伸缩性的8项最佳实践
    Oracle odbc配置
    Oracle Error
    java与java web数组括号的不同
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14133307.html
Copyright © 2011-2022 走看看