zoukankan      html  css  js  c++  java
  • Flink--time-window 的高级用法

    1.现实世界中的时间是不一致的,在 flink 中被划分为事件时间,提取时间,处理时间三种。
    2.如果以 EventTime 为基准来定义时间窗口那将形成 EventTimeWindow,要求消息本身就应该携带 EventTime 
    3.如果以 IngesingtTime 为基准来定义时间窗口那将形成 IngestingTimeWindow,以 source 的 systemTime 为准。 
    4.如果以 ProcessingTime 基准来定义时间窗口那将形成 ProcessingTimeWindow,以 operator 的 systemTime 为准。

    EventTime 

    1.要求消息本身就应该携带 EventTime
    2.时间对应关系如下 

    需求:

    EventTime 3 数据: 

    1527911155000,boos1,pc1,100.0 1527911156000,boos2,pc1,200.0 1527911157000,boos1,pc1,300.0 1527911158000,boos2,pc1,500.0 1527911159000,boos1,pc1,600.0 1527911160000,boos1,pc1,700.0 1527911161000,boos2,pc2,700.0 1527911162000,boos2,pc2,900.0 1527911163000,boos2,pc2,1000.0 1527911164000,boos2,pc2,1100.0 1527911165000,boos1,pc2,1100.0 1527911166000,boos2,pc2,1300.0 1527911167000,boos2,pc2,1400.0 1527911168000,boos2,pc2,1600.0
    1527911169000,boos1,pc2,1300.0
    View Code

    代码实现: 

    object EventTimeExample {
    def main(args: Array[String]) {
    //1.创建执行环境,并设置为使用 EventTime
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //置为使用 EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //2.创建数据流,并进行数据转化
    val source = env.socketTextStream("localhost", 9999)
    case class SalePrice(time: Long, boosName: String, productName: String, price: Double)
    val dst1: DataStream[SalePrice] = source.map(value => {
    val columns = value.split(",")
    SalePrice(columns(0).toLong, columns(1), columns(2), columns(3).toDouble)
     })
    //3.使用 EventTime 进行求最值操作
    val dst2: DataStream[SalePrice] = dst1
    //提取消息中的时间戳属性
    .assignAscendingTimestamps(_.time)
    .keyBy(_.productName)
    .timeWindow(Time.seconds(3))//设置 window 方法一
    .max("price")
    //4.显示结果
    dst2.print()
    //5.触发流计算
     
     env.execute()
    }
    }
    View Code

    当前代码理论上看没有任何问题,在实际使用的时候就会出现很多问题,甚至接 收不到数据或者接收到的数据是不准确的;这是因为对于 flink 最初设计的时 候,就考虑到了网络延迟,网络乱序等问题,所以提出了一个抽象概念基座水印 

    (WaterMark); 

    水印分成两种形式:
    
    第一种:
    

    第二种: 

    所以,我们需要考虑到网络延迟的状况,那么代码中就需要添加水印操作:
    
    object EventTimeOperator {
      def main(args: Array[String]): Unit = {
        //创建执行环境,并设置为使用EventTime
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)//注意控制并发数
        //置为使用EventTime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        val source = env.socketTextStream("localhost", 9999)
        val dst1: DataStream[SalePrice] = source.map(value => {
          val columns = value.split(",")
          SalePrice(columns(0).toLong, columns(1), columns(2), columns(3).toDouble)
        })
        //todo 水印时间  assignTimestampsAndWatermarks
        val timestamps_data = dst1.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SalePrice]{
    
          var currentMaxTimestamp:Long = 0
          val maxOutOfOrderness = 2000L //最大允许的乱序时间是2s
          var wm : Watermark = null
          val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
          override def getCurrentWatermark: Watermark = {
            wm = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
            wm
          }
    
          override def extractTimestamp(element: SalePrice, previousElementTimestamp: Long): Long = {
            val timestamp = element.time
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
           
          }
        })
        val data: KeyedStream[SalePrice, String] = timestamps_data.keyBy(line => line.productName)
        val window_data: WindowedStream[SalePrice, String, TimeWindow] = data.timeWindow(Time.seconds(3))
        val apply: DataStream[SalePrice] = window_data.apply(new MyWindowFunc)
        apply.print()
        env.execute()
    
      }
    }
    case class SalePrice(time: Long, boosName: String, productName: String, price: Double)
    class MyWindowFunc extends WindowFunction[SalePrice , SalePrice , String, TimeWindow]{
      override def apply(key: String, window: TimeWindow, input: Iterable[SalePrice], out: Collector[SalePrice]): Unit = {
        val seq = input.toArray
        val take: Array[SalePrice] = seq.sortBy(line => line.price).reverse.take(1)
        for(info <- take){
          out.collect(info)
        }
      }
    }

    ProcessingTime 

    对于 processTime 而言,是 flink 处理数据的时间,所以就不关心发过来的数据 是不是有延迟操作,只关心数据具体的处理时间,所以不需要水印处理,操作相 对来说简单了很多 

    object ProcessingTimeExample {
      def main(args: Array[String]) {
        //创建执行环境,并设置为使用EventTime
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(2)//注意控制并发数
        //置为使用ProcessingTime
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    
        val source = env.socketTextStream("localhost", 9999)
        case class SalePrice(time: Long, boosName: String, productName: String, price: Double)
    
        val dst1: DataStream[SalePrice] = source.map(value => {
          val columns = value.split(",")
          SalePrice(columns(0).toLong, columns(1), columns(2), columns(3).toDouble)
        })
        //processTime不需要提取消息中的时间
    //    val timestamps_data: DataStream[SalePrice] = dst1.assignAscendingTimestamps(line => line.time)
        val keyby_data: KeyedStream[SalePrice, String] = dst1.keyBy(line => line.productName)
        //TODO 窗口事件是:TumblingProcessingTimeWindows
        val window_data: WindowedStream[SalePrice, String, TimeWindow] = keyby_data.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        val max_price: DataStream[SalePrice] = window_data.max("price")
        max_price.print()
        env.execute()
      }
    }
    View Code
  • 相关阅读:
    数据结构和算法(Golang实现)(9)基础知识-算法复杂度及渐进符号
    基于深度学习方法的dota2游戏数据分析与胜率预测(python3.6+keras框架实现)
    基于CBOW网络手动实现面向中文语料的word2vec
    《Machine Learning Yearing》读书笔记
    使用神经网络预测航班起飞准点率
    使用LSTM-RNN建立股票预测模型
    基于selenium+phantomJS的动态网站全站爬取
    TensorFlow保存、加载模型参数 | 原理描述及踩坑经验总结
    学习笔记--python中使用多进程、多线程加速文本预处理
    通过外汇对冲手段稳定获利的可行性验证
  • 原文地址:https://www.cnblogs.com/niutao/p/10548578.html
Copyright © 2011-2022 走看看