zoukankan      html  css  js  c++  java
  • Flink编程练习

    1.wordcount

    利用socket作为数据源,对输入的每行数据进行单词计数。计算频率为process time的每10秒一次,结果输出到terminal。

    object SocketWindowWordCount {
        def main(args: Array[String]) : Unit = {
    
            val port: Int = try {
                ParameterTool.fromArgs(args).getInt("port")
            } catch {
                case e: Exception => {
                    System.err.println("No port specified. Please run 'xx.jar --port <port>'")
                    return
                }
            }
    
            val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
            val text = env.socketTextStream("localhost", port, '
    ')
    
            val windowCounts = text
                .flatMap(_.split("\s"))
                .map(WordWithCount(_,1)) 
                .keyBy(_.word)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .sum("count")
    
            windowCounts.print()
    
            env.execute("Socket Window WordCount")
        }
    
        case class WordWithCount(word: String, count: Long)
    }
    

    数据格式

    case class SensorReading(id: String, timestamp: Long, temperature: Double)
    
    object SmokeLevel extends Enumeration {
      type SmokeLevel = SmokeLevel.Value
      val High, Low = Value
    }
    
    case class Alert(message: String, timestamp: Long)
    

    2.双流警报EventTime

    时间特征为event time,每1s更新一次watermark,watermark由SensorReading内部的timestamp推进,允许5s的延迟(过滤掉迟到数据)。数据源SensorReading并发处理,数据源SmokeLevel并发度为1,但能够被每个并发的SensorReading流访问。假设两个流的数据是源源不断的。当SensorReading的temperature大于100且SmokeLevel为High时触发警报。警报包含当时SensorReading的timestamp。

    下面例子,迟到数据,即数据晚于WM依然会被处理

    注意:如果某个流在connect前assignTimestampsAndWatermarks,connect后的流是不会更新WM的。

    def main(args: Array[String]): Unit = {
    
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      env.getConfig.setAutoWatermarkInterval(1000L)
    
      val sensorInput = env
        .addSource(new SensorSource)
        .assignTimestampsAndWatermarks(
          new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(5)) {
            override def extractTimestamp(element: SensorReading): Long = {
              element.timestamp
            }
          })
      val smokeLevelInput = env
        .addSource(new SmokeLevelSource)
        .setParallelism(1)
    
      val res = sensorInput
        .process(new MyKeyedProcessFunction) // 这里的实现省略,其实就是if和collect
        .connect(smokeLevelInput)
        .flatMap(new MyFlatMapFunc)
      
      res.print()
    
      env.execute("multiple streamss")
    
    }
    
    class MyFlatMapFunc extends CoFlatMapFunction[SensorReading, SmokeLevel, Alert] {
      private private var curSmokeLevel = SmokeLevel.Low
    
      override def flatMap1(value: SensorReading, out: Collector[Alert]): Unit = {
        if (curSmokeLevel.equals(SmokeLevel.High) && value.temperature > 100) {
          out.collect(Alert("Alert! ", value.timestamp))
        }
      }
    
      override def flatMap2(value: SmokeLevel, out: Collector[Alert]): Unit = {
        curSmokeLevel = value
      }
    }
    

    3.持续计数stateful + timer + SideOutputs

    对每个key的数据量进行累加计数,如果1分钟没有新数据,就输出key-count对。对每一个数据进行处理时,sideoutput当前所处理的key的state数据(更新后的)

    val realTimeInfo: OutputTag[String] =
      new OutputTag[String]("real-time_info")
    
    def main(args: Array[String]): Unit = {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.getConfig.setAutoWatermarkInterval(1000L)
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
      val countStream = env.addSource(new SensorSource)
        .keyBy(_.id)
        .process(new MyProcessFunc)
      
      countStream.getSideOutput(realTimeInfo)
        .print()
    
      env.execute()
    }
    
    case class CountWithTimeStamp(key: String, count: Int, ts: Long)
    
    class MyProcessFunc extends KeyedProcessFunction[String, SensorReading, (String, Int)] {
    
      lazy val state = getRuntimeContext
        .getState(new ValueStateDescriptor[CountWithTimeStamp]("muState", classOf[CountWithTimeStamp]))
    
      override def processElement(value: SensorReading,
                                  ctx: KeyedProcessFunction[String, SensorReading, (String, Int)]#Context,
                                  out: Collector[(String, Int)]): Unit = {
        val current = state.value() match {
          case null =>
            CountWithTimeStamp(value.id, 1, ctx.timestamp())
          case CountWithTimeStamp(key, count, lastModified) =>
            // 删除上一次的timer
            ctx.timerService().deleteEventTimeTimer(lastModified + 6000)
            CountWithTimeStamp(key, count + 1, ctx.timestamp())
        }
    
        state.update(current)
    
        ctx.timerService().registerEventTimeTimer(current.ts + 6000)
        
        ctx.output(realTimeInfo, current)
      }
    
      override def onTimer(timestamp: Long,
                           ctx: KeyedProcessFunction[String, SensorReading, (String, Int)]#OnTimerContext,
                           out: Collector[(String, Int)]): Unit = {
        state.value() match {
          case CountWithTimeStamp(key, count, lastModified) =>
            if (timestamp == lastModified) out.collect((key, count)) else None
          case _ =>
        }
      }
    }
    

    4.一定时间范围内的极值windowfunction + checkpoint

    利用tumbling window计算各个sensor在15s内的最大最小值,返回结果包含窗口的结束时间。另外,window只存储极值,不保留原数据。

    checkpoint间隔为10s,watermark刷新间隔1s

    def main(args: Array[String]): Unit = {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.getConfig.setAutoWatermarkInterval(1000)
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
      env.enableCheckpointing(10 * 1000L)
    
      env.addSource(new SensorSource)
        .assignTimestampsAndWatermarks(new MyPeriodicAssigner)
        .map(r => (r.id, r.temperature, r.temperature))
        .keyBy(_._1)
        .window(TumblingEventTimeWindows.of(Time.seconds(15)))
        .reduce(
          (item1: (String, Double, Double), item2: (String, Double, Double)) => {
            (item1._1, item1._2.min(item2._2), item1._3.max(item2._3))
          },
          new MyWindowEndProcessFunction()
        )
    }
    
    case class MaxMinTemperature(key: String, min: Double, max: Double, ts: Long)
    
    class MyWindowEndProcessFunction
      extends ProcessWindowFunction[(String, Double, Double), MaxMinTemperature, String, TimeWindow] {
      override def process(key: String, context: Context, elements: Iterable[(String, Double, Double)],
                           out: Collector[MaxMinTemperature]): Unit = {
        out.collect(MaxMinTemperature(key, elements.head._2, elements.head._3, context.window.getEnd))
      }
    }
    
  • 相关阅读:
    web自动化学习1--环境搭建以及web初识
    python接口自动化之openpyxl学习
    python接口自动化之requests学习
    接口自动化面试
    面试基础-linux
    git的使用
    php_PDO的事务处理和异常处理
    PHP_PDO简单操作
    PHP_MySql扩展主要操作
    Linux_使用yum快速安装、配置amp环境
  • 原文地址:https://www.cnblogs.com/code2one/p/10162715.html
Copyright © 2011-2022 走看看