zoukankan      html  css  js  c++  java
  • Flink 流处理API之Join

    1、Window Join

    stream.join(otherStream)
        .where(<KeySelector>)
        .equalTo(<KeySelector>)
        .window(<WindowAssigner>)
        .apply(<JoinFunction>)

    1.1 Tumbling Window Join

    数据无重叠

    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    ...
    
    val orangeStream: DataStream[Integer] = ...
    val greenStream: DataStream[Integer] = ...
    
    orangeStream.join(greenStream)
        .where(elem => /* select key */)
        .equalTo(elem => /* select key */)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
        .apply { (e1, e2) => e1 + "," + e2 }

    1.2 Sliding Window Join

    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    ...
    
    val orangeStream: DataStream[Integer] = ...
    val greenStream: DataStream[Integer] = ...
    
    orangeStream.join(greenStream)
        .where(elem => /* select key */)
        .equalTo(elem => /* select key */)
        .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
        .apply { (e1, e2) => e1 + "," + e2 }

    1.3 Session Window Join

    import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
     
    ...
    
    val orangeStream: DataStream[Integer] = ...
    val greenStream: DataStream[Integer] = ...
    
    orangeStream.join(greenStream)
        .where(elem => /* select key */)
        .equalTo(elem => /* select key */)
        .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
        .apply { (e1, e2) => e1 + "," + e2 }

    2、Interval Join

    This can also be expressed more formally as

    b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

    The interval join currently only supports event time.

    Using the more formal notation again this will translate to

    orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

    import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    ...
    
    val orangeStream: DataStream[Integer] = ...
    val greenStream: DataStream[Integer] = ...
    
    orangeStream
        .keyBy(elem => /* select key */)
        .intervalJoin(greenStream.keyBy(elem => /* select key */))
        .between(Time.milliseconds(-2), Time.milliseconds(1))
        .process(new ProcessJoinFunction[Integer, Integer, String] {
            override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
             out.collect(left + "," + right); 
            }
          });
        });
    // TODO 1. 获取订单日志数据
    val fileDS1: DataStream[String] = orderTransactionAnalysisDao.readTextFile(source1)
    // TODO 2. 获取交易日志数据
    val fileDS2: DataStream[String] = orderTransactionAnalysisDao.readTextFile(source2)
    
    // TODO 3. 将日志数据转换成对应的样例类数据
    val orderDS = fileDS1.map(
        line => {
            val datas = line.split(",")
            OrderLogData( datas(0).toLong, datas(1), datas(2), datas(3).toLong)
        }
    )
    
    val payKS = orderDS
        .filter(_.txId != "")
        .assignAscendingTimestamps(_.timestamp * 1000L).keyBy(_.txId)
    
    val txDS = fileDS2.map(
        line => {
            val datas = line.split(",")
            TXLogData( datas(0), datas(1), datas(2).toLong)
        }
    )
    
    val txKS = txDS.assignAscendingTimestamps(_.timestamp * 1000L).keyBy(_.txId)
    
    // TODO 4. 将两个流数据进行连接
    payKS.intervalJoin(txKS)
        .between(Time.minutes(-5), Time.minutes(5))
        .process(
            new ProcessJoinFunction[OrderLogData, TXLogData, (OrderLogData, TXLogData)] {
                override def processElement(left: OrderLogData, right: TXLogData, ctx: ProcessJoinFunction[OrderLogData, TXLogData, (OrderLogData, TXLogData)]#Context, out: Collector[(OrderLogData, TXLogData)]): Unit = {
                    out.collect(( left, right ))
                }
            }
        )
  • 相关阅读:
    node.js入门(二) 第一个程序 Hello World
    node.js 入门(一)安装
    Windows平台字符的存储和输出分析
    设定MS SQL Server 2008定期自动备份
    OpenCV学习笔记(一)安装及运行第一个OpenCV程序
    Emgu学习笔记(一)安装及运行Sample
    vue脚手架 build-config文件夹(跨域/打包)相关配置
    fetch下载文件--统一拦截导出文件(含JAVA)
    git 避免重复输入密码
    form serialize获取不到上传文件数据解决办法
  • 原文地址:https://www.cnblogs.com/hyunbar/p/12633269.html
Copyright © 2011-2022 走看看