zoukankan      html  css  js  c++  java
  • Flink 基本算子map、keyBy、sum、reduce

    核心代码:

    object TransformTest {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val streamFromFile = env.readTextFile("C:\Users\Mi\Documents\project\idea\FlinkTitorial\src\main\resources\sensor.txt")
    
        //------------map--------------
        val dataStream1: DataStream[SensorReading] = streamFromFile.map(d => {
          val arr = d.split(",")
          SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
        })
    
        //------------keyBY--------------
        //此时key是一个元组
        val dataStream2: KeyedStream[SensorReading, Tuple] = dataStream1.keyBy(0)
        val dataStream3: KeyedStream[SensorReading, Tuple] = dataStream1.keyBy("id")
        //此时key是字段的类型
        val dataStream4: KeyedStream[SensorReading, String] = dataStream1.keyBy(_.id)
    
        //------------sum--------------
        val dataStream5 = dataStream4.sum(2)
        val dataStream6 = dataStream4.sum("temperature")
    
        //------------reduce-----------
        //温度求和。sr1是上一次的结果,sr2是遍历到的当前的SensorReading对象
        val dataStream7 = dataStream4.reduce((sr1, sr2) => SensorReading(sr2.id, sr2.timestamp, sr1.temperature + sr2.temperature))
    
        dataStream7.print
    
        env.execute("transform test")
      }
    
    }
    
    case class SensorReading(id: String, timestamp: Long, temperature: Double)

    sensor.txt文件内容:

    sensor_1, 1547718199, 35.80018327300259
    sensor_6, 1547718201, 15.402984393403084
    sensor_7, 1547718202, 6.720945201171228
    sensor_10, 1547718205, 38.101067604893444
    sensor_1, 1547718200, 30.8
    sensor_1, 1547718201, 40.8
    

    输出结果:

    SensorReading(sensor_1,1547718199,35.80018327300259)
    SensorReading(sensor_6,1547718201,15.402984393403084)
    SensorReading(sensor_7,1547718202,6.720945201171228)
    SensorReading(sensor_10,1547718205,38.101067604893444)
    SensorReading(sensor_1,1547718200,66.60018327300259)
    SensorReading(sensor_1,1547718201,107.40018327300258)

      

  • 相关阅读:
    [日常] Go-逐行读取文本信息
    [日常] nginx的错误日志error_log设置
    [日常] nginx记录post数据
    [PHP] PHP在CLI环境下的错误日志
    [PHP] 2018年终总结
    [MySQL] INFORMATION_SCHEMA 数据库包含所有表的字段
    前端吐槽的后端接口那些事
    读《猫力乱步》 | 如果你走得够远,你也能有那么多故事
    js获取隐藏元素宽高的方法
    RequireJS使用注意地方
  • 原文地址:https://www.cnblogs.com/noyouth/p/12735072.html
Copyright © 2011-2022 走看看