zoukankan      html  css  js  c++  java
  • flink分组取最小值

    import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichMapFunction}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.scala._
    object TransformTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        //0.从文件中读取数据
        val inputPath = "D:\ideaDemo\maven_flink\src\main\resources\sensor.txt";
        val inputStream = env.readTextFile(inputPath)
    
        //1.先转换成样例类类型(简单转换操作)
        val dataStream = inputStream.map(data => {
          val arr = data.split(",")
          SensorReding(arr(0), arr(1).toLong, arr(2).toDouble)
        })
        //2.分组聚合,输出 每个传感器当前最小值
        val aggSteam = dataStream
          .keyBy("id") //根据id进行分组
          .minBy("temperature")
    
        aggSteam.print()
    
       env.execute()
      }

     数据

    sensor_1,1547718101,35.8
    sensor_1,1547718102,22.2
    sensor_1,1547718101,55.3
    sensor_1,1547718102,24.1
    sensor_1,1547718103,57
    sensor_1,1547718103,58
    sensor_1,1547718103,59
    sensor_6,1547718101,15.4
    sensor_7,1547718102,6.7
    sensor_10,1547718205,38.1
    

      

    author@nohert
  • 相关阅读:
    linux strace 命令详解
    Redis执行Lua脚本示例
    getconf
    rc.sysinit 解析
    Linux系统启动内幕
    syslinux 和 grub
    isolinux.cfg 文件是干什么的
    C++中构造函数调用构造函数
    static和extern的作用域--题目
    构造函数与析构函数不能被继承
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/14928172.html
Copyright © 2011-2022 走看看