zoukankan      html  css  js  c++  java
  • flnk算子reduce的简单使用和自定义reduce

    import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichMapFunction}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.scala._
    
    object TransformTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        //0.从文件中读取数据
        val inputPath = "D:\ideaDemo\maven_flink\src\main\resources\sensor.txt";
        val inputStream = env.readTextFile(inputPath)
    
        //1.先转换成样例类类型(简单转换操作)
        val dataStream = inputStream.map(data => {
          val arr = data.split(",")
          SensorReding(arr(0), arr(1).toLong, arr(2).toDouble)
        })
    
        //3.需要输出当前最小温度值,以及最近时间戳,要用reduce
        val resultStream = dataStream
          .keyBy("id")
         .reduce((curSate, newData) =>
           SensorReding(curSate.id, newData.timestamp, curSate.temperature.min(newData.temperature))
          )
            .reduce((v1,v2)=>
              SensorReding(v1.id,v2.timestamp,v2.temperature.min(v2.temperature))
            )
        resultStream.print()
    
    
        env.execute()
      }

     数据

    sensor_1,1547718101,35.8
    sensor_1,1547718102,22.2
    sensor_1,1547718101,55.3
    sensor_1,1547718102,24.1
    sensor_1,1547718103,57
    sensor_1,1547718103,58
    sensor_1,1547718103,59
    sensor_6,1547718101,15.4
    sensor_7,1547718102,6.7
    sensor_10,1547718205,38.1
    

      

    自定义reduce

      class MyReduceFunction extends ReduceFunction[SensorReding]{
        override def reduce(t: SensorReding, t1: SensorReding): SensorReding =
          SensorReding(t.id,t1.timestamp,t.temperature.min(t1.timestamp))
      }
    author@nohert
  • 相关阅读:
    CSS 对浏览器的兼容性技巧总结
    后台拿webshell的方法总结
    WEBSHELL权限提升技巧
    学习linq处理数据 遥远的青苹果
    在asp.net中怎么导出excel表
    SQL提取数据库记录按字的笔画排序
    主板前置音频线怎么接
    Oracle导入导出
    DevExpress GridControl界面汉化(摘自王铭)
    ASP.NET中如何防范SQL注入式攻击
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/14928183.html
Copyright © 2011-2022 走看看