zoukankan      html  css  js  c++  java
  • 尚硅谷Flink2020教程.md

    1. 001-005

    传统数据处理架构

    chrome_aZxHk3tTN3.png
    chrome_aZxHk3tTN3.png

    分析处理

    chrome_YZw6TQFcwP.png
    chrome_YZw6TQFcwP.png

    有状态数据处理

    chrome_PkYdmUH3W4.png
    chrome_PkYdmUH3W4.png

    Flink的主要特点-事件驱动
    chrome_bJUfEakw7R.png

    流的世界观
    chrome_42A9WYOMut.png

    分层抽象级别的API
    chrome_VK8HvSAeWL.png

    Flink其他特点

    • 支持事件时间(event-time)和处理时间(processing-time)语义
    • 支持精确一次(exactly-once)的状态一致性语义
    • 低延迟,每秒处理数百万个事件,毫秒级延迟
    • 与众多常用存储系统对接
    • 高可用,动态扩展,实现7*24H运行
      Flink和SparkStreaming架构对比
      chrome_gl8ff4pgwN.png

    2. 006 批wordcount

    1. No implicits found for parameter evidence$16: TypeInformation[String] 

    2. //解决:导入以下包,实现隐士转换 

    3. import org.apache.flink.api.scala._ 

    3. 007 流wordcount

    比批处理多了启动任务执行的语句防止任务直接结束,进程关闭

    4. 008 流处理wc扩展测试和说明

    设置并行度:计算在哪个线程上依赖于单词的hash
    如果TranseForm并行度跟action不一致,最终action会乱序问题,所以要有时间语义。

    5. 009 flink 单机部署

    1. 安装jdk
    2. 安装flink
    3. 配置环境变量source /etc/profile
    4. 启动集群 bin/start-cluster.sh

    注意关闭防火墙

    6. 010 flink界面提交job

    task manage(executor)的内存分配
    chrome_K0v9oDi8OI.png
    savePoint:手动存盘,checkPoint自动存盘。

    并行度与资源slot,与task manage关系
    chrome_hRj6H3tR3l

    两阶段parallelism->1,slot->2,task manage->1的情况报错资源不足。
    两阶段parallelism->1,slot->1,task manage->1的情况报错资源不足。

    6-1. 解决

    flink版本从1.12变更为1.10.1跟源码版本保持一致
    效果
    SecureCRT_NoU8SO6yBd.png

    7. 011 命令提交job和取消job

    1. #提交job 

    2. ./bin/flink run -c top.majia.wc.StreamWordCount -p 1 /opt/software/ShangGuiGuFlink-1.0-SNAPSHOT-jar-with-dependencies.jar --host localhost --port 7777 

    3. #取消job 

    4. ##查看jobid 

    5. ./bin/flink list 

    6. ## 取消job 

    7. ./bin/flink cancel $JOBID 

    8. ## 查看所有job状态 

    9. ./bin/flink list -a 

    8. 012 其他集群部署方式

    yarn模式要求Flink(1.10.1)是有hadoop支持的版本的jar包

    flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

    flink on yarn 两种模式Session-cluster和Per-Job-Cluter模式

    8-1. Seesion-cluster模式

    • yarn总初始化一个flink集群固定的资源,占满后不能提交任务,所有任务共享该固定资源。

    chrome_MJmtNh1PL0.png
    chrome_MJmtNh1PL0.png

    8-2. Per-Job-Cluster

    • 每个job都会在yarn中启动一个flink集群

    chrome_mCkLsApurm.png
    chrome_mCkLsApurm.png

    8-3. 启动集群

    1. 启动hadoop
    2. 启动yarn模式
    • yarn-session模式
    1. ./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d 

    2. # -n(--container): TaskManager 的数量 

    3. # -s(--slots):每个TaskManager的slot数量,默认一个slot一个core,默认每个taksmanger的slot数量为1,有时可以多一些taskmanager,做冗余 

    4. # -jm:JobManager内存(单位GB) 

    5. # -tm:每个taskManager的内存(单位GB) 

    6. # - nm:yarn的appName(现在yarn的ui上的名字)。 

    7. # -d:后台执行 

    8. # 取消yarn-session的job 

    9. yarn application --kill $applicationID 

    • PerJobCluter模式
    1. # 不启动yarn-session,直接执行job 

    2. ./bin/flink run -m yarn-cluster -c top.majia.wc.StreamWordCount /opt/software/ShangGuiGuFlink-1.0-SNAPSHOT-jar-with-dependencies.jar --host localhost --port 7777 

    8-1. Kubernetes部署

    1. 搭建k8s集群
    2. 配置各组件的yaml文件

    在k8s上构建Flink Session Cluter,需要将Flink集群的组件对应的docker镜像分别在k8s上启动,包括JobManager,TaskManger,JobManagerService 三个镜像服务。每个镜像服务都可以从中央仓库获取。

    1. 启动Flink Sessiong Cluster
    1. # 启动Jobmanager-service服务 

    2. kubectl create -f jobmanager-service.yaml 

    3. # 启动jobmanager-deployment服务 

    4. kubectl create -f jobmanager-deployment.yaml 

    5. # 启动taskmanager-deployment服务 

    6. kubectl create -f taskmanager-deployment.yaml 

    1. 访问flink web ui界面

    集群启动后,就可以通过JobManagerServices中配置的webUI端口,用浏览器输入以下URL来访问UI页面

    http://{JobManagerHost:Port}/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy

    9. 013 运行时架构-组件关系

    四大组件
    chrome_7TDJcJX765.png

    • JobManager 作业管理器
    1. 控制一个应用程序执行的主进程,yejiushishuo8,每个应用程序都会被一个不同的JobManager所控制执行。
    2. JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph),逻辑数据流图(Logical dataflow graph)和打包了的所有的类,库及其他资源的JAR包
    3. JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做"执行图"(ExecutionGraph),包含了所有可以并发执行的任务。
    4. JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot).一旦它获取了足够的资源,就会将执行图分发到真正运行他们的TaskManager上。而在运行的过程中,JobManager会负责所有需要中央细条的操作,比如检查点的协调。
    • TaskManager 任务管理器
    1. Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots).插槽的数量限制了TaskManager能够执行的任务数量。
    2. 启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(task)来执行了。
    3. 在执行过程中,一个TaskManager可以跟其他运行同一应用程序的TaskManager交换数据。
    • ResourceManager 资源管理器
    1. 主要负责管理任务管理器(TaskManager)和插槽(slot),TaskManager插槽是flink中定义的处理资源单元
    2. Flink为不同的环境和资源管理工具提供了不同的资源管理器,比如YARN,MESOS,K8S以及StandAlone部署。
    3. 当JobManager申请插槽资源时,ResourceManager会将空闲插槽的TaskManager分配给JobManager.如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
    • Dispatcher 分发器
    1. 可以跨作业运行,它为应用提交提供了REST接口
    2. 当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。
    3. Dispatcher也会启动一个WebUI,用来方便的展示和监控作业执行的信息
    4. Dispatcher在架构中可能并不是必须的。这取决于应用提交运行的方式。

    10. 014 作业提交流程

    • FLINK standalone模式
      chrome_NXHNk0i81b.png
    • yarn session模式跟flink standalone模式差不多;只不过先起YRM和FJM,来job后才起FTM(flink taskManager)
    • Yarn PerJob模式
      chrome_uPvW3qx2vb.png

    11. 015 flink任务调度原理

    chrome_PAEFr5eEiu.png
    chrome_PAEFr5eEiu.png

    • 怎么实现并行计算?
    • 并行的任务,需要占用多少slot?
    • 一个流处理程序,到底包含多少个任务?

    并行度概念:跟partion有关系吗?
    chrome_A0KdH5v77v.png

    TaskManager和slot关系
    chrome_X5uQMPNXXX.png

    12. 016 Task和Slot

    因为cpu不隔离:因此两个task可以放到一个slot里;
    chrome_DPH6l0BaWm.png
    chrome_rvVmcRG0Wc.png

    13. 017 任务调度wordcount task slot分析

    chrome_PHL8qpsCVb.png
    chrome_PHL8qpsCVb.png

    chrome_xxOxhY2KvT.png

    14. 018 任务调度task任务合并分析

    在大部分情况下,程序中的转换操作(transfomations)跟dataflow中的算子(operator)是一一对应关系。

    Flink中的执行图分四层StreamGrapth->JobGraph->ExecutionGraph->物理执行图

    • StreamGraph:是根据用户通过StreamAPI编写的代码生成的最初的图。用来表示程序的拓扑结构
    • JobGraph: StreamGraph经过优化后生成了JobGraph,提交给JobManager的数据结构。主要的优化为,将多个符合条件的节点chain在一起作为一个节点。
    • ExecutionGraph: JobManager根据JobGraph生成ExecutionGraph.ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
    • 物理执行图: JobManager根据ExecutionGraph对Job进行调度后,在各个TaskManager上部署Task后形成的“图”,并不是一个具体的数据结构。
      chrome_dJ7rg7qLro.png

    14-1. 怎么合并算子?数据传输形式

    • 一个程序中,不同的算子可能具有不同的并行度
    • 算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪种形式取决于算子的种类。
    • One-2-one:stream维护着分区及元素的顺序(比如source和map之间)。这意味着map算子的子任务看到的元素的个数以及顺序跟source算子的子任务生产的元素的个数,顺序相同。map,filter,flatMap都是One-2One的对应关系。
    • Resitributing: stream的分区会发生改变。每一个算子的子任务依据所选择的transformationg发送数据到不同的目标任务。例如,keyby基于hashCode重分区,而braodcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark的shuffle过程。

    shuffle和并行度调整都会redistributing算子无法合并。

    14-2. 任务链

    • Flink采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信的开销。为了满足任务链的要求,必须将两个或多个算子设为相同的并行度,并通过本地转发(local forward)的方式进行连接
    • 相同并行度one-2-one操作,flink这样相连的算子链接在一起形成一个task,原来的算子成伟里面的subtask

    ::: alert-info
    并行度相同,并且是one2one操作两个条件缺一不可

    15. 019 任务调度原理:业务逻辑耗时的符合合并任务链的算子优化。

    • disableChaining()算子
    • startNewChain()算子
    • slotSharingGroup("a") :不同组下的slot最大并行度叠加就是需要的资源。
    • env.disableOperatorChaining() 全局禁用任务链

    16. 020 流处理-API_SOURCE01

    chrome_fwOABEHw30.png
    chrome_fwOABEHw30.png

    根据提交方式自动判断使用哪类执行环境

    • createLocalEnvironment(1) 并行度 (本地执行环境)
    • createRemoteEnvironment(jobmanage-hostname,port,jarPath) (集群环境)

    17. 021 API_source02 源码sensor.scala

    18. 022 API_SOURCE03 自定义source 源码sensor.scala

    即addSource里传入自定义的SourceFunction

    19. 023-027 流处理API_Transform 源码TransformTest

    简单算子

    • map
    • flatMap
    • Filter

    键控流算子

    • KeyBy

    DataStream->KeyedStream:逻辑上讲一个流拆分成不相交的分区,每个分区包含相同key的元素,在内部以hash实现。

    • 滚动聚合算子Rolling Agg(sum,min,max,minBy,maxBy)

    这些算子可以针对KeyedStream的每一个支流做聚合。

    1. - min,minBy的区别:min=>同一个对象min只取当前字段的最小值,对象的其他字段还是第一个出现的对象的字段值,而不是最小值字段对应对象的其他字段值。 

    2. - 设置全局并行度为1可以看出最小值怎么的出来的,其实是与下一个同元素比较然后取最小 

    • Reduce

      • 可以对前后两个对象的字段数据进行比较等处理。

    多流操作

    1.分流操作

    • Split和Select(就像keyby和RollingAgg前后一般一起使用)

      • split 根据某些特征把流分类,实际还是一个流splitStream,真正把流分成多个流要结合select

    chrome_jUX5KHymqZ.png
    chrome_jUX5KHymqZ.png

    chrome_1fnCeEvy4o.png

    ::: alert-warn
    需要学习Traversable顶层数据结构与底层结构set,list,map的关系。
    :::

    2.合流操作

    • Connect和CoMap

    chrome_ZGmU8774N1.png
    chrome_ZGmU8774N1.png

    DataStream,DataStream->ConnectedStreams:连接两个保持他们类型的数据流,两个流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生变化,两个流相互独立(一国两制)

    chrome_pmISmaoOiU.png
    chrome_pmISmaoOiU.png

    ConnectedStreams->DataStream:作用于ConnectedStreams上,功能与map和flatmap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

    • Union:要求两个流类型要匹配

    chrome_urZs0GKxNo.png
    chrome_urZs0GKxNo.png

    DataStream->DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。

    20. 028 flink支持的数据类型

    • 对类型进行序列化反序列化
    • 对类型进行提取分析输入和输出类型(推导类型等)
    1. java和scala基础类型 Int,Double,Long,String,...
    2. java和scala 元组(flink元祖是Tuple0->Tuple25;scala默认是到22)
    3. scala样例类 case class
    4. Java简单对象(POJOs)
    5. 其他(Arrays,Lists,Maps,Enums,等等)

    21. 029 函数类和富函数类

    21-1. 函数类

    FLink暴露了udf函数的接口(实现方式为接口或者抽象类)。例如:MapFunction,FilterFunction,ProcessFunction(最底层API)等
    下面梨子实现FilterFunction接口

    1. //定义函数类 

    2. class MyFilter(keyWord:String) extends FilterFunction[String]{ 

    3. override def filter(value:String):Boolean={value.contains(keyWord)} 

    4. } 

    5. //调用 

    6. tweets.filter(new MyFilter("flink")) 

    21-2. 匿名函数

    1. tweets.filter( 

    2. new RichFilterFunction[String] { //匿名类调用函数不是直接new对象而是为了实现这个函数。 

    3. override def filter(value:String):Boolean=??? 

    4. } 

    5. ) 

    6. //匿名函数lambda写法tweets.filter(_.contains("flink")) 

    21-3. 富函数(Rich Functions)

    几乎所有Flink函数类都有其Rich版本,他的不同之处在于可以获取运行环境的上下文,并拥有一些生命周期方法,可以实现更复杂的功能,

    chrome_HvmaDZtnEp.png
    chrome_HvmaDZtnEp.png

    类的各种方法执行过程:类加载--》构造函数--》生成运行时环境--》open等初始化(如数据库连接)方法--》普通方法--》关闭初始化信息close释放资源。

    Flink没有foreach让用户迭代,输出操作都要利用Sink完成通过Stream.addSink(new MySink(xxx))。可以自定义Sink

    官方支持的组件都在Bundle Connectors(kafka,nifi)
    其他常用组件支持都在Apache Bahir(flume,activeMq,redis,akka,netty)

    23. 031 sinkapi-kafka 源码KafkaSinkTest

    24. 032 sinkapi-redis

    1. Error:(37, 24) type mismatch; 

    2. found : org.apache.flink.streaming.connectors.redis.RedisSink[top.majia.apitest.sourcetransform.SensorReading] 

    3. required: org.apache.flink.streaming.api.functions.sink.SinkFunction[String] 

    4. dataStream.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper)) 

    5. #对于大多数Sink报此编译错误,明明自定义Sink顶层实现了SinkFunction,运行也报错 

    6. #因为把对象转成了String如:SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString 

    7. #解决去掉toString=》编译报错,运行不报错。 

    8.  

    25. 033 sinkapi-es ESSinkTest

    注:Builder里的ElasticsearchSinkFunction elasticsearchSinkFunction extend Function只是普通的function,不是SinkFunction

    1. curl "127.0.0.1:9200/_cat/indices?v" //注意不能用localhost报不能识别uri 

    26. 034 sinkapi-jdbc mysql

    可以结合自定义source实时更新同id的字段数据,大屏实时取mysql该id的数据就能展示实时监控数据。

    27. 035- 039 window 01 源码WindowTest

    27-1. 概念

    chrome_dyTGeyIN4f.png
    chrome_dyTGeyIN4f.png

    • 一般真实的流都是无界的,怎样出来无界的数据
    • 可以把无限的流进行切分,得到有限的数据集进行处理--也就是有界流
    • 窗口(window)就是将无限流切割成有限流的一种方式,它会将流数据分发到有限大小的桶(bucket)中进行分析。

    chrome_bbuvaNN427.png
    chrome_bbuvaNN427.png

    27-2. 类型

    27-2-1. 时间窗口(time window)

    • 滚动时间窗口(Tumbling Windows):前闭后开:滚动窗口滑动为一个窗口长度的时候就是滚动窗口

    chrome_F4wTeFLl4a.png
    chrome_F4wTeFLl4a.png

    .timeWindow(Time.seconds(15))

    • 滑动时间窗口(Sliding Windows):有两个长度一个窗口长度参数一个滑动size参数;窗口有重叠。

    chrome_Mx8ucIVyHf.png
    chrome_Mx8ucIVyHf.png

    .timeWindow(Time.seconds(15),Time.seconds(5))

    • 会话窗口:一系列timeout间隙组成。时间无对齐。

    chrome_hUA9CLS64v.png
    chrome_hUA9CLS64v.png

    .window(EventTimeSessionWindows.withGap(Time.minutes(10))

    27-2-2. 计数窗口(count window)

    • 滚动计数窗口(tumbling count window)
      .countWindow(5)
    • 滑动计数窗口(sliding count window)
      .countWindow(10,2)

    27-3. API

    • 窗口分配器-window()方法
      • 我们可以用.window()定义一个窗口,然后基于这个window去做一些聚合或者其他处理操作,注意window方法必须在keyBy之后才能用
      • Flink提供了更加简单的.timeWindow和.countWindow方法,用于定义时间窗口和计数窗口
    1. val minTemPerWindow=dataStream.map(r=>(r.id,r.temperature)).keyBy(_._1).timeWindow(Time.seconds(15)).reduce((r1,r2)=>(r1._1,r1._2.min(r2._2))) 

    如果是window就是windowStream window操作比较多;如果是dataStream也有window操作不过比较少

    27-3-1. 窗口分配器:只是数据分桶(窗口)

    • window()方法接收输入参数是一个WIndowAssigner
    • WindowAssigner负责将每条输入的数据分发到正确的window中
    • Flink提供了通用的WindowAssigner
      • 滚动窗口(tumbling window)
      • 滑动窗口(sliding window)
      • 会话窗口(session window)
      • 全局窗口(global window)

    27-3-2. 窗口函数=>最后输出都是DataStream

    • window function定义了要对窗口中收集的数据做的计算操作
    • 分两类
      • 增量聚合函数(incremental aggregation functions):累加操作
        • 每条数据到来就进行计算,保持一个简单的状态
        • ReduceFunction,AggregateFunxtion=》windowsStream.reduce或者agg
      • 全窗口函数(Full window Functions):中位数,排序用的到=》有界批处理
        • 先把窗口所有数据收集起来,等到计算的时候会便利所有数据
        • ProcessWindowFunction=> windowStream.process或者apply底层调用WindowFunction

    27-3-3. 可选api

    • .trigger()触发器
      • 定义window什么时候关闭,触发计算并输出结果
    • .evictor()移除器
      • 定义移除某些数据的逻辑
    • .allowedLateness() 允许处理迟到的数据
    • .sideOutputLateData() 将迟到的数据放入侧输出流
    • .getSideOutput() 获取侧输出流

    27-3-4. 总览window api

    chrome_n50pl5dFFq.png
    chrome_n50pl5dFFq.png

    不推荐使用windowAll
    有数据才会创建桶
    chrome_35gbhWhuZv.png
    reduce比较大小只在当前批次窗口输入的数据比较。无法与之前窗口的结果数据比较。

    28. 040 Flink的时间语义

    28-1. Flink中的时间语义

    chrome_gpeFKB7Fj3.png
    chrome_gpeFKB7Fj3.png

    • 事件c创建时间 event Time
    • 进入框架flink时间 ingestion Time
    • 算子执行操作本地系统时间process time,与机器有关。
      chrome_cIC2b1DoJw.png

    28-2. 设置 Event Time

    chrome_BFh1yPH70b.png
    chrome_BFh1yPH70b.png

    • 某些场合不应该使用Processing Time:比如上图需要按照客户端时间算用户通关(原点)是否完成
      event time可以从日志的时间戳提取
    • 如果用EVENT time的窗口函数操作受用户日志是否到来影响有延迟,如果权衡实时要求高就要用process time.
    • 定义时间语义event time,然后从数据提取时间事件
      env.setStreamTimeCharateristic(TimeCharacteristic.EVENTTIME)//默认时间语义是process time=>可以用等待的方式处理数据,数据一般时间本地有序

    29. 042水位线(waterMark)

    chrome_dDhaiUdjOw.png
    chrome_dDhaiUdjOw.png

    watermark是一个特殊的记录。
    chrome_wFVfWKD8FB.png

    • 怎么避免乱序数据带来的计算不正确?
    • 遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口
      • Watermark是一种衡量eventtime进展的机制,可以设定延迟触发
      • Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现
      • 数据流中的watermark用于表示timestamp小于waterMark的数据都已经到达了,因此,window的执行也是由watermark触发的
      • watermark用来让程序自己平衡延迟和结果正确性。

    30. 043atermark的原理和特点

    chrome_13THH5jACX.png
    chrome_13THH5jACX.png

    chrome_xhQd3vIMds.png

    31. 044 waterMark的传递,引入和设定

    chrome_DA9etUq4Pp.png
    chrome_DA9etUq4Pp.png

    32. 045 watermark 代码中引入

    • Event Time的使用一定要指定数据源中的时间戳
    • 调用assignTimestampAndWatermarks方法,传入一个BoundedOutOfOrdernessTimestampExtractor,就可以指定watermark
    1. dataStream.assignTimestampAndWatermarks( 

    2. new BoundedOutOfOrdernessTimestampExtractor[SensorReading](//有界乱序时间戳提取器 

    3. Time.milliseconds(1000) 

    4. ){ 

    5. override def extractTimestamp(element:SendorReading):Long={ 

    6. element.timestamp*1000 

    7. } 

    8. }  

    9. ) 

    10. // .assingAscendingTimestamps(_.timestamp*1000L)//升序数据提起时间戳10位是秒,*1000是毫秒;生序不需要定义watermark 

    32-1. extractor 提取器

    assignTimestampAndWatermarks实现类有两种

    • AssignerWithPeriodicWatermarks[T]:周期性生成watermark
      • 周期性生成watermark默认周期是200ms,可以使用ExecutionConfig.setAutoWatermarkInterval()方法设置:默认process语义是0ms,event语义是200ms
      • 升序和前面乱序的处理BoundedOutOfOrderness,都是基于周期性watermark的。
    • AssignerWithPunctuatedWatermarks[T]:来个数据就生成一个watermark:适合数据零散少的时候
      • 没有周期规律,可打断的生成watermark
        底层都是TimestampAssigner接口

    getCurrentWatermark方法currentMaxTimestamp-maxOutOfOrderness当前最大时间戳-最大延迟(表示最大乱序程度)

    32-2. watermark的设定

    • 在flink中watermark由应用开发人员生成,通常需要对响应领域有了解
    • 如果watermark设置延迟太久,收到结果就慢,解决办法是在到达水位线之前输出一个近似结果
    • 而watermark到达的太早,则可能收到错误结果,不过flink处理迟到的数据机制可以解决这个问题。
    1. .timeWindow(Time.seconds(15))//设置窗口 

    2. .allowedLateness(Time.minutes(1))//设置延迟时间 

    3. .sideOutputLateDate(new OutputTag[(String,Double,Long)]("late"))//如果水位线机制没有解决把数据放到侧流(迟到流) 

    33. 046 自定义watermark生成

    重写getCurrentWatermark(周期性)或者间断期checkAndGetNextWatermark方法和extractTimestamp方法
    简化版
    chrome_uQ1BPgjQMO.png
    chrome_jCyf3J3Zxg.png
    chrome_Kp7KwbbhhY.png
    chrome_9YsLLxpveg.png

    34. 047 时间语义下的窗口测试 见源码TimeYuYiWatermarkTest.scala:3s,15s,1m三个参数的作用?

    34-1. 窗口的判断15s

    OEu46HQoJ2.png
    OEu46HQoJ2.png

    输入时间是1547718210的时候最大时间戳(后缀)即210,210-3(最大延迟)=207;如果有207之前的窗口,该关闭就该关闭并且输出数据了,没有输出说明没有207之前的窗口
    输入时间是1547718213的时候最大时间戳是213,213-3=210,第一个窗口关闭输出数据;并且没有统计210时间的记录即第一个时间窗口为[195-210)如下
    chrome_LN5Ynk401s.png
    下一个窗口应该是[210-225)

    34-2. watermark(3s)的判断及乱序也能收到数据:这个时基于水位线触发的准时输出=》窗口批量处理

    chrome_iqrdfyqjAk.png
    chrome_iqrdfyqjAk.png

    时间戳输出的时212而不是213:因为在第二个窗口[210-225)范围内213,212数据都会接收但是因为代码逻辑时返回最新接收记录的时间戳212在213后面所以比大小温度时212的28.1,时间戳也是212最新记录的时间戳。

    34-3. allowedLateness窗口等待1分钟:这个才是真正意义上的数据迟到处理输出=>来一条处理一条

    (已经关闭还要再等待)没有关闭时到watermark关闭进行输出等待1分钟时来一条就基于之前的结果比较输出一次。
    213,19.5;125,27.6就是迟到的数据:如下两条分两次输出一次一条
    chrome_H0CKJyUr3S.png

    34-4. 1分钟后:这个时间是跟watermark比较的;迟到时间公式(maxtimestamp-maxOutOfOrderness)=watermark;watermark-windowStartTimestampOfCurrenttimestamp

    1.(第三个窗口[225,240)
    chrome_8bb6p9Joxo.png
    2. 输入285,watermark变成282;282-225=57s<1m因此如下:第二个窗口还未关闭还能继续输出218的记录数据
    chrome_eVrxZuJmZN.png
    3. 超过迟到时间1m的数据:late跑到侧流
    chrome_fZId65OxlC.png

    35. 048窗口起始点的确定

    powershell_fRZSO5vSQH.png
    powershell_fRZSO5vSQH.png

    1. public static long getWindowStartWithOffset(long timestamp,long offset,long windowSize){ 

    2. return timestamp-(timestamp-offset+windowSize)%windowSize; 

    3. } 

    4. //第一个记录时间戳[1547718199000ms-0+15000(15s)]%15000=4000;1547718199000ms-4000ms=1547718195s 

    36. 049 状态管理

    36-1. 状态的概念

    chrome_OyN6ZOHkdd.png
    chrome_OyN6ZOHkdd.png

    • 由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态
    • 可以任务状态就是一个本地变量,可以被任务的业务逻辑访问
    • flink会进行状态管理,包括状态一致性,故障处理以及高效存储和访问,方便开发人员专注于应用程序的逻辑。

    37. 050 两种算子相关状态

    在flink中,状态始终与特定算子相关联;为了时运行时的flink了解算子的状态,算子需要预先注册其状态;总的来说有两种类型的状态

    37-1. 算子状态(operatior state)

    算子状态的作用范围限定为算子任务
    chrome_rcnRFGlCZV.png

    37-1-1. 算子状态的数据结构

    • 列表状态(List state)
      将状态表示为一组数据的列表
    • 联合列表状态(union list state)
      也将状态标识为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复
    • 广播状态(broadcast state)
      如果一个算子有多项任务,而它的没想任务状态又都相同,那么这种特殊情况最适合应用广播状态。

    37-2. 键控状态(keyed state):为每个key做一个状态

    根据输入数据流中定义的键key来维护和访问
    chrome_MW1YqOZE44.png

    37-2-1. 键控状态的数据结构

    • 值状态(value state)
      将状态表示为单个的值
    • 列表状态(list state)
      将数据表示为一组数据的列表
    • 映射状态(Map state)
      将状态表示为一组Key-Value对
    • 聚合状态(Reducing state & Aggregating State)
      将状态表示为一个用于聚合操作的列表。

    38. 051 状态在代码中定义和使用

    38-1. 键控状态

    1. //声明一个键控状态 

    2. lazy val lastTemp:ValueState[Double]=getRuntimeContext.getState[Double](//富函数才能获取运行时环境上下文 

    3. new ValueStateDescriptor[Double]("lastTemp",calssOf[Double]) 

    4. ) 

    5. //读取状态 

    6. val prevTemp=lastTemp.value() 

    7. //对状态赋值 

    8. lastTemp.update(value.temperature) 

    39. 052 状态示例1

    idea64_FKi9EfE2n6.png
    idea64_FKi9EfE2n6.png

    40. 053 状态示例写法2

    1. Error:(40, 27) missing parameter type for expanded function 

    2. The argument types of an anonymous function must be fully known. (SLS 8.5) 

    3. Expected type was: (top.majia.apitest.sourcetransform.SensorReading, Option[?]) => (TraversableOnce[?], Option[?]) 

    4. .flatMapWithState({ 

    5. #解决 

    6. .flatMapWithState({})算子改成 

    7. .flatMapWithState[(String,Double,Double),Double]{}//前面三元组表示List[SensorReading]展开后的数据,后面double表示state的值类型 

    效果初始值0就可以通过case 去掉了
    QQPCRealTimeSpeedup_JqJ3HZ7neq.png

    40-1. 状态后端(state backends)

    41. 054 processfunciton 概念(底层api)

    我们之前学习的转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如MapFunction这样的map转换算子就无法访问时间戳和当前事件时间
    基于此,DataStream api提供了一系列Low-Level转换算子。可以访问时间戳,watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。
    process function 用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,FlinkSQL就是使用process function实现的。
    Flink提供了8个process Function:

    • ProcessFunction
    • KeyedProcessFunction 对应keyStream的processfunction继承实现;keyby后调用
    • CoProcessFunction 对应coMap,Union
    • ProcessJoinFunction
    • BroadcastProcessFunction
    • KEyedBroadcastProcessFunction
    • ProcessWindowFunction :window后调用对应windowStream的实现
    • ProcessAllWindowFunction

    41-1. KeyedProcessFunction 源码见ProcessFunction.scala

    大部分process function集成Richfunction;CoProcessFunction没有集成直接集成的Function

    KeyedProcessFunction 用来操作KeyedStream,该函数会处理流的每一个元素,输出为0个,1个或者多个元素。大部分有的ProcessFunction都继承自RichFunction接口都有open,close和getRuntieontext等方法。而KeyedProcessFunction[Key,In,OUT]还额外提供了两个方法:

    • processElement(v:IN,ctx:Context,out:Collector[OUT]),流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。Context可以访问元素的时间戳,元素的key,以及TimerServer时间服务。Context还可以将结果输出到别的流(side outputs)
    • onTimer(timestamp:Long,ctx:OnTimerContext,out:Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用。参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext和processElement的context参数一样,提供上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。

    41-1-1. TimerService和定时器(Timers)

    Context和OnTimerContext所持有的TimerService对象拥有以下方法:

    • currentProcessTime():Long 返回当前处理时间
    • currentWatermark():Long 返回的昂前watermark的时间戳
    • registerProcessingTimeTimer(timestamp:Long):Unit 会注册当前key的processing time的定时器;当processing time到达定时时间时,触发timer.
    • registerEventTimeTimer(timestamp:Long):Unit 会注册当前key的event time定时器;当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
    • deleteProcessingTimeTimer(timestamp:Long):Unit 删除之前注册处理时间定时器;如果没有这个时间戳的定时器,则不执行。
    • deleteEventTimeTimer(timestamp:Long):Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

    当定时器timer触发时,会执行回调函数onTimer(),注意定时器timer智能在keyedStreams上面使用。

    42. 055 定时器应用示例

    窗口问题:10次条件阀值可以改成10s内连续上升不管次数。
    chrome_ed13g9Am90.png
    会话窗口也无法解决温度10s内连续上升就报警这个业务需求;因为会话间隔时间,一般情况数据非常密集没办法通过会话间隔时间区分开窗口
    可以理解为通过定时器开启一个窗口来解决这个需求:10s后开启一个等待窗口等待10s内的数据都到齐然后比较计算。
    chrome_J8m1N7BSVg.png

    1. Error:(38, 8) overloaded method value process with alternatives: 

    2. [R](keyedProcessFunction: org.apache.flink.streaming.api.functions.KeyedProcessFunction[org.apache.flink.api.java.tuple.Tuple,top.majia.apitest.sourcetransform.SensorReading,R])(implicit evidence$2: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and> 

    3. [R](processFunction: org.apache.flink.streaming.api.functions.ProcessFunction[top.majia.apitest.sourcetransform.SensorReading,R])(implicit evidence$1: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] 

    4. cannot be applied to (top.majia.apitest.processfunction.TempIncreWarning) 

    5. .process(new TempIncreWarning(10000L)) 

    6. //解决把keyBy("id")改成keyBy(_.id) 

    43. 056 process function侧输出流 SideOutputTest.scala

    44. 057 状态后端

    • 每传入一条数据,有状态的算子任务都会读取和更新状态
    • 由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问
    • 状态的存储,访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)
    • 状态后端主要负责两件时: 本地的状态管理,以及将检查点(checkpoint)状态写入远程存储。

    44-1. 选择一个状态后端

    • MemoryStateBackend
      • 内存级的状态后端,会将键控状态作为内存中的对象进行管理,将他们存储在TaskManager的JVM堆上,而将checkpoint存储在JobManager的内存中
      • 特点:快速,低延迟,但不稳定
    • FsStateBackend
      • 将checkpoint 存到远程的持久化文件系统(filesystem)上,而对于本地状态跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。
      • 同时拥有内存记得访问速度和更好的容错保证
    • RocksDBStateBackend
      • 将所有状态序列化后,存入本地的RocksDB中存储。

    45. 058 flink的容错机制

    • 一致性检查点(checkpoint)
    • 从检查点恢复状态
    • flink 检查点算法
    • 保存点(save points)

    45-1. 检查点概念和原理

    6=2+4;9=1+3+5=>checkpoint实际记录的是5处理完的状态;偏移量source记录5,两个下游算子分别记录5处理完后的计算结果1,3,5三个key放到第二个分区相加的和是9;第一个分区是2和4相加的和6

    chrome_xjBKbr47jt.png
    chrome_xjBKbr47jt.png

    5保存了检查点,在6处理完,正在处理7的时候flink挂了

    chrome_TFQWwKgIVk.png
    chrome_TFQWwKgIVk.png

    重启flink任务:从5恢复,开始source 6消费,sum_even=12;然后source 7,sum_odd=1+3+5+ 7=16

    chrome_e3u08Q80Ip.png
    chrome_e3u08Q80Ip.png

    chrome_AFWMPOS7J1.png

    最终保证精确一次的特性。

    46. 059 检查点的实现算法。

    • 一种简单的想法
      • 暂停应用,保存状态到检查点,再重新恢复应用
    • Flink改进实现
      • 基于Chandy-Lamport算法的分布式快照
      • 将检查点的保存和数据处理分离开,不暂停整个应用。

    46-1. 检查点算法:jobmanager发给source算子分界线数据点

    • 检查点分界线(checkpoint barrier)
      • flink的检查点算法用到了一种称为分界线(barrier)的特殊数据形式,用来把一条流上数据按照不同的检查点分开
      • 分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所属的检查点中国,而基于分界线之后的数据导致的所有更改,就会被包含在之后的检查点中。

    46-1-1. 算法的barrir传递策略(各个子任务,分区的barrir对齐)

    chrome_T7ECM2w80G.png
    chrome_T7ECM2w80G.png

    chrome_Tidph8C4Wl.png
    chrome_va2Oar0j2q.png
    chrome_XkHWCiHwdy.png

    barrier有一个到达该task就要做保存checkpoint操作;后面的数据进行缓存(这个过程叫暂停算子的该task)其他算子正常运行。
    sum even,sum odd完成保存后通知jobmanager保存完成

    chrome_1c6PaPJcEK.png
    chrome_1c6PaPJcEK.png

    chrome_pG6e1J9Ukt.png

    同上流程sink完成checkpoint后通知jobmanager

    chrome_6NvsPOfX5n.png
    chrome_6NvsPOfX5n.png

    47. 060 容错机制-checkpoint 配置造成的ck结果情况案例说明

    chrome_qsJHZh1yW8.png
    chrome_qsJHZh1yW8.png

    setPreferCheckpointForRecovery是否用checkpoint机制恢复;如果false(默认值),如果savePoint比checkpoint数据更近就用savePoint恢复

    1. // env.setStateBackend(new MemoryStateBackend()) 

    2. // env.setStateBackend(new FsStateBackend(""))//参数是checkpoint路径 

    3. // env.setStateBackend(new RocksDBStateBackend("")) 

    4. env.enableCheckpointing(1000L)//默认500ms;1000L标识1s触发一次checkpoint保存checkpoint花的时间最好小于这个时间,这个时间实际是jobmanager让source触发checkpoint的时间间隔。 

    5. env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) 

    6. env.getCheckpointConfig.setCheckpointTimeout(60000L)//checkpoint所有算子中完成checkpoint最后一个算子通知jobmanager的时间超时时间;还可以配置异步snapshot 

    7. env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)//每一个任务根据barrier都把状态保存完的合照才是checkpoint保存完毕。所以可以出现两个checkpoint同时被触发执行。 

    8. env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500L)//两个checkpoint之间最小间隔时间。会把上面的参数2覆盖掉 

    9. env.getCheckpointConfig.setPreferCheckpointForRecovery(true) 

    10. // env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)//能允许checkpoint多少次失败,覆盖下面的配置 

    11. // env.getCheckpointConfig.setFailOnCheckpointingErrors(true)//如果checkpoint失败认为整个job就挂了就重启;true即上面的配置参数值为0的情况;false即上面参数值为数字最大值 

    48. 061 重启策略

    1. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000L))//固定时间间隔重启策略;在固定的时间间隔后重启次数 

    2. env.setRestartStrategy(RestartStrategies.failureRateRestart(5,Time.of(5,TimeUnit.MINUTES),Time.of(10,TimeUnit.SECONDS))) //在5分钟内每间隔10s重启一次最多失败5次。 

    49. 062 保存点

    • Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints)
    • 原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有额外元数据的检查点
    • Flink不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作
    • 保存点是一个强大的功能,除了故障恢复外,保存点还可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用等等。
    1. 状态名称不能变
    2. 算子不能变:算子的id不能变(可以通过.uid("1"))指定算子id.

    50. 063 状态一致性-基本概念

    • 状态一致性概念及分类
      chrome_5K9euaFNfg.png

      • AT-MOST-ONCE(最多一次)
        • 当任务故障时,最简单的做法是什么都不干,既不恢复丢失的状态,也不重播丢失的数据。At-most-once语义的含义是最多处理一次事件;应用场景:UDP协议及视频流等。
      • AT-LEAST-ONCE(至少一次)
        • 在大多数的真实应用场景,我们希望不丢失事件,这种类型的保章称为at-least-once,意思是所有的事件都得到了处理,而一些时间还可能被处理多次。=》比如事件是否出现这种bool判断。
      • EXACTLY-ONCE (精确一次)
        • 恰好处理一次是最严格的的保证,也是最难实现的。恰好一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次。
    • 一致性检查点(checkpoint):算子的操作只有一次实际就是状态只计算了一次。

      • Flink使用了一种轻量级快照机制-检查点(checkpoint)来保证exactly-once语义
      • 有状态流应用的一致检查点,其实就是:所有任务的状态,在某个时间点的拷贝(一份快照)。而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。
      • 应用状态的一致检查点,是Flink故障恢复机制的核心。
        chrome_p08UX0MRF4.png

      保存了检查点是5这个数的所有算子最终状态。重放机制sum_even如果保存完5这个数的处理结果状态后(结果是6=2+4)继续处理内存中的黄6最后结果12=2+4+6 下发sink了。
      重放机制理论上讲source会重新消费6,7;sum_even应该会继续多加一次6(即sink端=6+12);所以进行优化只局部重放,sum_event这条线只重放7继续消费;sum_odd重放6,7两个数继续消费。

    • 端到端(end-to-end)状态一致性(source,sink两端一致性)
      支持重放机制:source端必须支持偏移量保存和提交。

      • 目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还宝行了数据源(如Kafka)和输出到持久系统
      • 端到端一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都宝恒了它自己的一致性。
      • 整个端到端的一致性级别却绝育所有组件中一致性最弱的组件。
    • 端到端的精确一次(exactly-once)保证

      • 内部保证-checkpoint
      • source端-可重设数据的读取位置
      • sink端--从故障恢复时,数据不会重复写入外部系统
        • 幂等写入(Idempotent Writes)
        • 事务写入(Transactional Writes)
    • Flink+Kafka端到端状态一致性的保证

    51. 064 状态一致性-端到端一致性保证 065 状态一致性-幂等写入和事务写入

    • 幂等写入概念

    chrome_Py2WOFsmSQ.png
    chrome_Py2WOFsmSQ.png

    可能出现问题:使用缓存的情况下的大屏结果不正确。

    chrome_gmyoQKL6id.png
    chrome_gmyoQKL6id.png

    • 事务写入概念
      • 事务
        • 应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤销
        • 具有原子性:一个事务中的一系列的操作要么全部成功,要么一个都不做。
      • 实现思想:构建的事务对应着checkpoint,等到checkpoint真正完成的时候,才把所有对应的结果写入sink系统中。
      • 实现方式
        • 预写日志(Write-Ahead-Log,WAL)
          • 把结果数据先当成状态保存到缓存,然后在收到checkpoint完成的通知时,一次性写入Sink系统。
          • 简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink系统,都能用这种方式一批搞定(如果sink端本质如mysql一次也是一条条写入即使批处理了,也有可能插入了一批中的一部分后面的数据在flink重放机制时重复插入前一部分的数据。缓存丢失等问题)
          • DataStream API提供了一个模板类:GenericWriteAheadSink,来实现这种事务性Sink
        • 两阶段提交(Two-Phase-Commit,2PC) 幂等写入和预写日志事务都不完美,这个两阶段完美。
          • 对于每个checkpoint,sink任务会启动一个事务,并将接下来所有接收的数据添加到事务里
          • 然后将这些数据写入外部sink系统,但不提交他们--这时只是“预提交”
          • 当它收到checkpoint完成的通知时,它才正式提交事务,实现结果的真正写入
          • 这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统。Flink提供了TwoPhaseCommitSinkFunction接口(如kafka sink连接器就是实现这个接口)

    51-1. 2PC对外部sink系统的要求

    • 外部sink系统必须提供事务支持,或者sink任务必须能够模拟外部系统上的事务
    • 在checkpoint嗯嗯间隔期间里,必须能够开启一个事务并接受数据写入
    • 在收到checkpoint完成的通知之前,事务必须是“等待提交”状态;在故障恢复的情况下,这可能需要一些时间,如果这个时候sink系统关闭(例如超时了),那么未提交的数据就会丢失。
    • sink任务必须能够在进程失败后恢复事务
    • 提交事务必须是幂等操作。

    实现代价:主要是程序员实现2PC接口(更checkpoint捆绑,创建事务,提交事务,超时管理等逻辑);对于flink运行而言整个过程捆绑在checkpoint上,操作的时候还是来一个数据sink一个数据,额外的开销就是等待checkpoint完成通知开启事务关闭事务这个操作对整个Flink性能来说影响是很小的!!!

    52. 066 状态一致性-Flink和kafka连接状态一致性

    52-1. 不同source和sink的一致性保证

    chrome_EV3vGElJJW.png
    chrome_EV3vGElJJW.png

    52-2. 状态一致性-Flink和kafka连接状态一致性

    • flink内部--利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性
    • source--kafka consumer作为source,可以讲偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
    • sink--kafka producer作为sink,采用两阶段提交sink,需要实现一个TwoPhaseCommitSinkFunction

    52-2-1. FLINK 2PC过程:

    有可能sink的状态保存比window的状态保存快花费时间少,所以sink保存完成并不是checkpoint完成。

    1. sink事务预提交状态,数据都保存在事务中=》当前事务还不能关闭,会开启下一个事务。
      chrome_m7oJolRW4h.png
    2. 所有状态保存完成时才正式提交事务
      chrome_3twSB2AMSy.png
    3. 对kafka另外的要求:3.1下游消费kafka的数据,必须是已确认的数据,如果是sink未确认的数据就可能导致数据闪电回放等数据不一致。3.2 kafka事务关闭外部15分钟flink内部超时时间是1H,导致kafka事务失败,数据没有完成提交,但是flink checkpoint 认为完成了。
      chrome_lWganvnQth.png

    53. 067 Flink Table和Flink Sql 概念和示例

    • Flink对批处理和流处理,提供了统一的上层API
    • Table API是一套内嵌在Java和Scala语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询
    • Flink的SQL支持基于SQL标准的Apache Calcite
      chrome_FCgmEKh8J9.png

    flink table api的table-->外部存储的table有一个转换过程就是sql语法的表注册

    54. 068 table api和sql的程序基本流程结构

    • table api和sql的程序结构和流失处理的程序结构十分类似
    1. val tableEnv=... //创建表的执行环境 

    2. //创建一张表,用于读取数据 

    3. tableEnv.connect(...).createTemporaryTable("inputTable") 

    4. //注册一张表,用于把计算结果输出 

    5. tableEnv.connect(...).createTemporaryTable("outputTable") 

    6. //通过TableAPI查询算子,得到一张结果表 

    7. val result=tableEnv.from("inputTable").select(...) 

    8. //通过sql查询语句得到一张结果表 

    9. val sqlResult=tableEnv.sqlQuery("select ... from inputTable") 

    10. //讲结果表写入到输出表 

    11. result.insertInto("outputTable") 

    55. 069 api和sql表执行环境:源码TableApiTest.scala 4中类型的表环境

    • 创建表的执行环境,需要将flink流处理的执行环境传入。
    • TableEnvironment 是flink中集成table api和sql的核心概念,所有对表的操作都基于TableEnvironment
      • 注册Catalog:目录
      • 在Catalog中注册表
      • 执行SQL查询
      • 注册用户自定义函数(UDF)

    56. 070 表的概念和从文件读取数据

    56-1. 表:有默认catalog和database

    • TableEnvironment可以注册目录Catalog,并可以基于Catalog注册表
    • 表(table)是由一个“标识符”(identifier)来指定的,由3部分组成:Catalog名,数据库(database)和对象名
    • 表可以是常规的,也可以是虚拟的(视图,view)
    • 常规表(table)一般可以用来描述外部数据,比如文件,数据库表或消息队列的数据,也可以直接从DataStream转换而来
    • 视图(view)可以从现有的表中创建,通常是Table api或者SQL查询的一个结果集

    56-1-1. 创建表

    1. TableEnvironment可以调用.connect()方法,连接外部系统,并调用createTemporary()方法,在Catalog中注册表
    1. tableEnv 

    2. .connect(...)//定义表的数据来源,和外部系统建立连接 

    3. .withFormat(...)//定义数据格式化方法 

    4. .withSchema(...)//定义表结构 

    5. .createTemporaryTable("MyTable")//创建临时表(理解为内存,视图) 

    1. 可以创建table来描述文件数据,可以从文件中读取或者将数据写入文件

    57. 071 table api或者sql 从kafka读取数据

    58. 072 api和sql的 查询转换

    1. api
    • table api是集成在scala和java语言内的查询api
    • table api 基于代表“表”的table 类,并提供一整套操作处理的方法API;这些方法会返回一个新的Table对象,表示对输入表应用转换的结果
    • 有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构
    1. sql
    • flink的sql集成,基于实现了sql标准的apache calcite
    • flink中用常规字符串定义sql查询语句
    • sql查询的结果也是一个新的table

    59. 073 DataStream和表的转换

    chrome_SYRb2Y7H4W.png
    chrome_SYRb2Y7H4W.png

    59-1. 数据类型与schema对应 DataStream->Table

    • 基于名称:可以调换顺序,也可以重命名
    • 基于位置

    chrome_OWmWavCmnk.png
    chrome_OWmWavCmnk.png

    59-2. DataStream->View

    chrome_cNDbutfC0f.png
    chrome_cNDbutfC0f.png

    60. 074 table api和sql输出到文件

    • 表的输出,是通过数据写入TableSink来实现的
    • TableSink是一个通用接口,可以支持不同的文件格式,存储数据库和消息队列
    • 输出表最直接的方法,就是通过Table.inserInto()方法讲一个Table写入注册过的TableSink中

    chrome_LNMM8hTrK6.png
    chrome_LNMM8hTrK6.png

    对于聚合运算需要toRetractStream重新生成一个流(true是更新结果值),而不是toAppend追加数据到流中(新增记录)

    chrome_C0vb9awp3V.png
    chrome_C0vb9awp3V.png

    61. 075 table api 和sql 更新模式

    • 对于流式查询,需要声明如何在表和外部链接器之间执行转换
    • 与外部系统交换的消息类型,由更新模式(UpdateMode)指定
      • 追加模式(Append)模式
        • 表只做插入操作,和外部连接器只交换插入消息
      • 撤回(Retract)模式:更新操作需要发两条消息
        • 表和外部连接器交换添加(Add)和撤回(Retract)消息
        • 插入操作(insert)编码为Add消息;删除(Delete)编码为Retract消息;更新(Update)编码为上一条的Retract和下一条的Add两条消息
      • 更新插入(Upsert)模式
        • 更新和插入都被编码为Upsert消息;删除编码为Delete消息:key存在则更新,不存在则插入=》更新操作只需要发一次消息性能好一些。

    62. 076 table api和sql kafka管道模式

    只能append

    63. 077 table api和sql 输出到ES

    可以创建Table来描述ES中的数据,作为输出的TableSink

    chrome_x9SSTnIU7Q.png
    chrome_x9SSTnIU7Q.png

    对于kafkasource-->EsSink测不同;用filesource->essink通了。

    1. curl "http://localhost:9200/_cat/indices?v" 

    2. http://localhost:9200/sensor/_search?pretty 

    64. 078 table api和sql 输出到MYSQL之DDL风格非SQL风格还有一种风格connector风格

    • 可以创建Table来描述Mysql中的数据,作为输入和输出
    1. val sinkDDL:String= 

    2. """ 

    3. |create table jdbcOutputTable ( 

    4. | id varchar(20) not null, 

    5. | cnt bigint not null 

    6. | ) with ( 

    7. | 'connector.type'='jdbc', 

    8. | 'connector.url'='jdbc:mysql://localhost:3306/test', 

    9. | 'connector.table'='sensor_count', 

    10. | 'connector.driver'='com.mysql.jdbc.Driver', 

    11. | 'connector.username'='root', 

    12. | 'connector.password'='root' 

    13. | ) 

    14. """.stripMargin 

    15.  

    16. tableEnv.sqlUpdate(sinkDDL)//执行DDL创建表 

    17. aggResultSqlTable.insertInto("jdbcOutputTable") 

    65. 079 表转换成流

    • 表可以转换为DataStream或DataSet,这样自定义流处理或批处理程序就可以继续在TableApi或SQL查询的结果上运行了
    • 将表转换为DataStream或DataSet时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型
    • 表作为流式查询的结果,是动态更新的
    • 转换有两种模式:追加(Appende)模式和撤回(Restract)模式
    1. aggTable.toRestractStream[Row].print("agg")//对应流来说写入表要标记是true(新增数据)或者false(撤回数据) 

    2. //没有Upsert因为这个依赖外部系统key决定更新操作;覆水难收,流已经sink到下游(外部系统),就没法再修改了。 

    chrome_IgsgFDcYMw.png
    chrome_IgsgFDcYMw.png

    66. 080 流处理和sql查询的不同

    66-1. 查看执行计划

    • Table API提供了一种机制解释计算表的逻辑和优化查询计划
    • 查看执行计划可以通过TableEnvironment.explain(table)方法或者TableEnvironment.explain()方法完成,返回一个字符串,描述三个计划
      • 优化的逻辑查询计划
      • 优化后的逻辑查询计划
      • 实际执行计划
    1. val explaination:String=tableEnv.explain(resutlTable) 

    2. println(explaination) 

    66-2. 流处理和关系代数(批处理有界数据)的区别

    chrome_mHjCUCkcw8.png
    chrome_mHjCUCkcw8.png

    67. 081 动态表(Dynamic Tables)和持续查询

    1. 动态表
    • 动态表是Flink对流数据的TableAPI和SQL支持的核心概念
    • 与标识批处理数据的静态表不同,动态表是随时间变化的。
    1. 持续查询
    • 动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生持续查询(Continuous Query)
    • 连续查询永远不会终止,并会生成另一个动态表
    • 查询会不断的更新其动态结果表,以反映其动态输入表的更改。

    chrome_VEZFAngMZU.png
    chrome_VEZFAngMZU.png

    68. 082持续查询示例具体过程

    1. 将流转换成动态表
    • 为了处理带有关系查询的流,必须先将其转换成表
    • 从概念上讲,流的每个数据记录,都被解释为对结果表的插入(insert)修改操作

    chrome_Ce68ZUvPey.png
    chrome_Ce68ZUvPey.png

    chrome_9X8KwEXVEd.png

    1. 将动态表转换成DataStream
    • 与常规的数据表一样,动态表可以通过插入,更新和删除更改,进行持续的修改
    • 将动态表转换成流或讲其写入外部系统时,需要对这些更改进行编码
      • 仅追加(Append-only)流
        • 仅通过插入(insert)更改来修改的动态表,可以直接转换为仅追加流
      • 撤回(restract)流
        • 撤回流是包含两类消息的流,添加(ADD)消息和撤回(Restract)消息(更新操作:先删除再新增)
      • Upsert(更新) 流
        • Upsert流也包含两种类型的消息:Upsert=Update消息和删除(Delete)消息

    chrome_tCEhqmDjro.png
    chrome_tCEhqmDjro.png

    69. 083TABLE API和sql时间特性1—处理时间

    1. 时间特性(Tine Attributes)
    • 基于时间的操作(比如TABLE API和SQL中的窗口操作),需要定义相关的时间语义和时间数据来源的信息
    • Table可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问响应的时间戳
    • 时间属性,可以是每个表Schema的一部分。一旦定义了时间属性,它就可以作为一个字段引用,并且可以在基于时间的操作中使用
    • 时间属性的行为类似于常规时间戳,可以访问,并且进行计算。
    1. 定义处理时间(Processing Time)
    • 处理时间语义下,允许表处理程序根据机器的本地时间生成结果。它是时间的最简单概念。它既不需要提起时间戳,也不需要生成watermark
      • 由DataStream转换成表时指定
    • 在定义Schema期间,可以使用.proctime, 指定字段名定义处理时间字段
    • 这个proctime属性只能通过附加逻辑字段,来扩展物理schema。因此,只能在schema定义的末尾定义它
    1. //方式1.使用from流的方式定义processTime 

    2. val sensorTable=tableEnv.fromDataStream(dataStream,'id,'temperature,'timestamp,'pt.proctime) 

    3. //方式2.在连接器中定义schama时指定 

    4. .withSchema(new Schema() 

    5. .field("id",DataTypes.STRING()) 

    6. .field("timestamp",DataTypes.BIGINT()) 

    7. .field("temperature",DataTypes.DOUBLE()) 

    8. .field("pt",DataTypes.TIMESTAMP(3)).proctime()//这种定义方式可以在Kafka的连接器中定义使用因为kafka实现了DefinedProtimeAttribute,DefinedRowtimeAttribute接口 

    9. ) 

    10. //方式3 DDL中定义 

    11. val sinkDDL:String= 

    12. """ 

    13. |create table dataTable ( 

    14. |id varchar(20) not null, 

    15. |ts bigint, 

    16. |temperature double, 

    17. |pt as PROCTIME() 

    18. |) with ( 

    19. |'connector.type'='filesystem', 

    20. |'connector.path'='/sensor.txt', 

    21. |'format.type'='csv' 

    22. |) 

    23. """.stripMargin 

    24. tableEnv.sqlUpdate(sinkDDL) 

    70. 084TABLE API和sql时间特性2—事件时间

    1. 定义事件时间(Event Time)
    • 事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有乱序事件或者延迟事件时,也可以获得正确的结果。
    • 为了处理无序事件,并区分流中的准时和迟到事件;flink需要从事件数据中,提取时间戳,并用来推进事件时间的进展
    • 定义事件时间,同样有三种方法:
      • 由DataStream转换成表时指定
        chrome_94gAEHpDds.png
      • 定义TableSchema时指定
        new Rowtime()实际就是注册watermark时(.assignTimestampsAndWatermarks)同时要给出的数据事件时间和wartermark乱序边界
        chrome_s7dM1rL6S4.png
      • 在创建表DDL中指定(同process time三种方式):先转成格式时间再转成timestamp时间戳
        chrome_LVudt0yiiv.png

    71. 085TABLE API和sql窗口1—分组窗口

    71-1. 窗口

    • 时间语义,要配合窗口操作才能发挥作用
    • 在table api和sql中主要有两种窗口
      • Group Windows(分组窗口):每组一个结果
        • 根据时间活行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数
      • Over Windows:每行一个结果
        • 针对每个输入行,计算相邻行范围内的聚合。

    71-2. Group Windows

    • Gourp Windows是使用window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。
    • 为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用
    1. val table=input.window([w:GroupWindw] as 'w)//定义窗口,别名为w 

    2. .groupBy('w,'a)//按照字段a和窗口w分组;如果不给a相当于windowAll 

    3. .select('a,'b.sum)//聚合 

    • TableAPI提供了一组具有特定语义的window类,这些类会被转换为底层DataStream或DataSet的窗口操作
      预定义的窗口类
      • 滚动窗口:基于事件时间,处理时间划分10分钟的滚动窗口;基于处理时间创建滚动的计数窗口。
        chrome_VOXdJSCPy5.png
      • 滑动窗口:10分钟的滑动窗口每隔5分钟滑动一个
        chrome_lh9152gYUB.png
      • 会话窗口:超过10分钟时间间隔开启下一个会话窗口
        chrome_qhXSa8JcWT.png
    • SQL中使用的GroupWindows:提供特定语义的window
      • Group Windows定义在Sql 查询的Group By子句中
        • TUMBLE(time_attr,interval)
          - 定义一个滚动窗口,第一个参数时间字段,第二个参数是窗口长度
        • HOP(time_attr,interval,interval)
          - 定义一个滑动窗口,第一个参数是时间字段,第二个参数是滑动窗口滑动步长,第三个参数是窗口长度
        • SESSION(time_attr,interval)
          - 定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔。

    72. 086TABLE API和sql窗口2—分组窗口测试 见源码TableTimeWindowTest.scala

    72-1. 源码阅读:

    72-1-1. 一.window

    1. window()两种参数一种GroupWindow一种OverWindow
    2. GroupWindow返回GroupedWindowTable(先做window=》窗口(分桶)的table)对象只能掉GroupBy方法(再做分组)返回WindowGroupedTable(按group By分组后的table)=>group by后的分组才可以调用select 和agg后再调select)等方法。

    72-1-2. Tumble

    1. Tumble 调over方法返回一个TumbleWithSize
    2. TumbleWithSize再调on方法返回一个TumbleWithSizeOnTime
    3. TumbleWithSizeOnTime调用as方法返回一个TumbleWithSizeOnTimeWithAlias对象返回窗口别名:本质集成了GroupWindow

    73. 087TABLE API和sql窗口3—over窗口

    73-1. Over Windows

    • OverWindow 聚合是标准SQL中已有的(over子句),可以在查询select子句中定义
    • OverWindow聚合,会针对每个输入行,计算相邻行范围内的聚合
    • Over Window使用window(w:overwindows*)子句定义,并在select()
      方法中通过别名来引用
    1. val table=inpu.window([w:OverWindow] as 'w).select('a,'b.sum over 'w,'c.min over 'w) 

    • Table API提供了Over类,来配置Over窗口的属性:定义窗口后可以继续做聚合计算和select查询。
      • 无界Over Windows
        chrome_3kLwvA8RjH.png
      • 有界OverWindows
        chrome_cX8VaJdTfV.png
    • SQL中的Over Windows写法
      • 用Over 做窗口绝活时,所有聚合必须在同一窗口上定义,也就是说必须是相同的分区,排序和范围
      • 目前仅支持在当前行范围之前的窗口
      • ORDER BY 必须在单一的时间属性上指定
    1. select count(amount) over ( 

    2. PARTITION BY user --相同的分区 

    3. ORDER BY proctime --相同的排序 

    4. ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)--相同的范围 

    5. FROM Orders 

    74. 088TABLE API和sql窗口4—over窗口测试 见源码TableTieWindowTest.scala

    QQPCRealTimeSpeedup_ZXPRMH5Tdg.png
    QQPCRealTimeSpeedup_ZXPRMH5Tdg.png

    75. 089TABLE API和sql函数1—系统内置函数

    chrome_RToO4S6ak3.png
    chrome_RToO4S6ak3.png

    chrome_SScfXSgx5G.png

    76. 090TABLE API和sql函数2—UDF函数-标量函数:输入与输出是1对1关系

    • 用户自定义函数(USER-DEFAINED FUNCTIONS,UDF)是一个重要的特性,他们显著的扩展了查询的表达能力
    • 在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用
    • 函数通过调用registerFunction()方法在TableEnvironment中注册。当用户自定义的函数被注册时,它被插入到TableEnvironment的函数目录中,这样Table API或SQL解析器就可以识别并正确地解释它。

    76-1. 标量(就是值)函数(Scalar Functions)源码见ScalarFunctionTest.scala

    • 用户定义的标量函数,可以将0,1或者多个标量值,映射到新的标量值
    • 为了定义标量函数,必须在org.apache.flink.table.functions中扩展基类Scalar Function,并实现(一个或多个)求值(eval)的方法
    • 标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval
    1. class HashCode(factor:Int) extends ScalarFunction{ 

    2. def eval(s:String):Int={//必须实现这个基类手写这个方法名,又不是重写的 

    3. s.hashCode * factor 

    4. } 

    5. } 

    77. 091 UDF函数-表函数(TableFuctions) 源码见TableFunction.scala 输入与输出是1对多关系

    • 用户定义的表函数,也可以将0,1或多个标量值作为输入参数;与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值
    • 为了定义一个表函数,必须扩展org.apache.flink.table.functions中的基类TableFunction并实现(一个或多个)求值方法
    • 表函数的行为由其求值方法决定,求值方法必须是public的并命名为eval
    1. class Split(separator:String) extends TableFunction[(String,Int)]{ 

    2. def eval(str:String):Unit={ 

    3. str.split(separator).foreach( 

    4. word=>collect((word,word.length)) 

    5. ) 

    6. } 

    7. } 

    78. 092 UDF函数-聚合函数

    • 用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值
    • 用户定义的聚合函数,是通过继承AggregateFunction抽象类实现的
      chrome_Ln329MoAmL.png
      ACC 累加器

    78-1. 聚合函数:见源码AggregateFunctionTest.scala

    • AggregateFunction要求必须实现的方法

      • createAccumulator()
      • accumulate()
      • getValue()
    • AggregateFunction工作原理如下:

      • 首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构
        可以通过调用createAccumulator()方法创建空累加器
      • 随后,对每个输入行调用函数的accumulate()方法来更新累加器
      • 处理完所有行后,将调用函数的getValue()方法来计算并返回最终结果

      toRestract输出了false说明就是更新前的结果,true就是就是更新后的结果

    79. 093 UDF函数-表聚合函数(Table Agg Functions):输入与输出多对多关系

    • 用户自定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表
    • 用户定义表聚合函数,是通过继承TableAggregateFunction抽象类来实现的
      chrome_DWXYLT5ScZ.png

    79-1. 表聚合函数

    • TableAggregateFunction要求必须实现的方法
      • createAccumulator()
      • accumulate()
      • emitValue()
    • TableAggregateFunction工作原理如下:
      • 首先需要一个累加器,保存聚合中间结果的数据结构,通过调用createAccumulator方法创建空累加器
      • 随后对每个输入行调用accumulator方法更新累加器
      • 处理完所有行后,将调用函数的emitValue方法来计算并返回接最终结果。

    80. 094电商用户行为分析_批处理和流处理以及项目选型

    1. 批处理和流处理
      chrome_Br8EOkS3yc.png
      chrome_Z85IkrGexI.png

    81. 095用户行为分析_分析应用场景

    1. 电商用户行为分析
      chrome_EdPrYtPJkp.png
      chrome_7L0DS42TD3.png
      其中实时统计分析和风控分析一般做实时;画像和特征工程需要历史数据一般离线处理。

    82. 096模块设计与设计分析

    1. 项目模块划分
      chrome_M2owWW1oQQ.png
      chrome_J0uVPqQ5ZW.png
    2. 数据源解析
      40dc3257c1e68bccc024242154959928.png

    83. 097需求分析-实时热门商品统计1

    • 基本需求
      • 统计近一小时内的热门商品,每5分钟更新一次
      • 热门度用浏览次数("pv")来衡量
    • 解决思路
      • 在所有用户行为数据中,过滤出浏览(pv)行为进行统计
      • 构建滑动窗口,窗口长度为1小时,滑动距离为5分钟
        chrome_dp1rDeevVa.png
        chrome_FbljhOFaMD.png
        chrome_UKsrfdqaDL.png
        chrome_Yt216qQ6Xx.png
        chrome_oAQwiIgl8g.png

    83-1. AggCount函数参数

    chrome_gde9l4c7Kf.png
    chrome_gde9l4c7Kf.png

    83-2. window函数参数

    chrome_sNxgt9lsmR.png
    chrome_sNxgt9lsmR.png

    83-1. 098需求分析-实时热门商品统计2

    chrome_iZBdJ6OkyT.png
    chrome_iZBdJ6OkyT.png

    • 按count排序可以在窗口结束时间等待100ms然后再排序
    • 排序可以通过定时器实现等待触发,同时可以记录排序状态
      chrome_YHDSkWMiX4.png

    83-1-1. 最终排序输出--keyedProcessFunction

    • 针对有状态流的底层API
    • KeyedProcessFunction 会对分区后的每一条子流进行处理
    • 以windowEnd作为key,保证分流以后每一条流的数据都在一个时间窗口内
    • 从ListState中读取当前流的状态,存储数据进行排序输出。
      chrome_ZrFi6pTdxo.png

    84. 099需求分析-其他需求

    84-1. 实时流量统计-热门页面

    • 基本需求
      • 从web服务器的日志中,统计实时的热门访问页面
      • 统计每分钟的ip访问量,取出访问量最大的5个地址,每5秒更新一次
    • 解决思路
      • 将apache服务器日志中的时间,转换为时间戳,作为event time
      • 构建滑动窗口,窗口长度为1分钟,滑动距离为5秒

    84-2. 实时流量统计-PV和UV

    • 基本需求
      • 从埋点日志中,统计实时的PV和UV
      • 统计每小时的访问量PV,并且对用户进行去重UV
    • 解决思路
      • 统计埋点日志中的pv行为,利用set数据结构进行去重=》内存OOM大数据量问题布隆可以解决
      • 对于超大规模的数据,可以考虑用布隆过滤器进行去重

    84-3. 市场营销分析--APP市场推广统计

    • 基本需求
      • 从埋点日志中,统计APP市场推广的数据指标
      • 按照不同的推广渠道,分别统计数据
    • 解决思路
      • 通过过滤日志中的用户行为,按照不同的渠道进行统计
      • 可以用process function 处理,得到自定义的输出数据信息

    84-4. 页面广告统计:广告黑名单过滤的需求

    • 基本需求
      • 从埋点日志中,统计每小时页面广告的点击量,5秒刷新一次,并按照不同省份进行划分
      • 对于"刷单"式的频繁点击行为进行过滤,并将该用户加入黑名单。
    • 解决思路
      • 根据省份进行分组,创建长度为1小时,滑动距离为5s的时间窗口进行统计
      • 可以用process function进行黑名单过滤,检测同一周期用户同一广告的的点击量,如果超过上限则将用户信息以侧输出流输出到黑名单中。

    84-5. 恶意登录监控

    • 基本需求
      • 用户短时间内频繁登录失败,有程序恶意攻击的可能
      • 同一用户(可以是不同IP)在2秒内连续两次登录失败,需要报警。
    • 解决思路
      • 将用户的登录失败行为存入ListState,设定定时器2秒后触发,查看ListState中有几次失败登录
      • 更加精确的检测,可以使用CEP库实现事件流的模式匹配。

    84-6. 订单支付实时监控

    • 基本需求
      • 用户下单之后,应设置订单失效时间,以提高用户支付的意愿,并降低系统风险
      • 用户下单后15分钟未支付,则输出监控信息
    • 解决思路
      • 利用CEP库进行事件流的模式匹配,并设定匹配的时间间隔
      • 也可以利用状态编程,用process function实现处理逻辑

    84-7. 订单支付实时对账

    • 基本需求
      • 用户下单并支付后,应查询到账信息,进行实时对账
      • 如果有不匹配的支付信息或者到账信息,输出提示信息
    • 解决思路
      • 从两条流中分别读取订单支付信息和到账信息,合并处理
      • 用connect连接合并两条流,用coProcessFunction做匹配处理

    84-8. 100 项目框架搭建 见DianShangAna模块

    84-9. 101实时热门商品统计1_窗口聚合

    84-10. 102实时热门商品统计2_排序统计输出

    84-11. 103实时热门商品统计3_从kafka消费数据测试:见源码HotItems.scala

    第一个窗口触发输出301:输入和输出效果1
    chrome_eSIiLHCE2k.png
    第二个窗口:触发输出601:
    chrome_W5kuSjxTm8.png
    对比300,301的触发区别
    QQPCRealTimeSpeedup_fORpysMVK8.png

    84-12. 104实时热门商品统计4_批量从kafka消费数据测试KafkaProducerUtil.scala

    84-13. 105实时热门商品统计5_TableAPI和SQL实现HotItemsWithSql.scala

    85. 106 实时热门页面流量分析1_开窗聚合统计HotPagesNetworkFlow.scala

    86. 107实时热门页面流量分析2_统计结果排序输出

    87. 108实时热门页面流量分析3_乱序数据的处理:减少窗口等待时长1m太长。增加Lateness延迟触发机制处理(到时间就输出结果,后面迟到的数据重新计算可能推翻之前的计算结果。推进watermark的过程)

    真正输出到侧输出流的数据:数据所有所属的窗口都关闭,即所属的最后一个窗口都关闭了。
    chrome_drFT1kZsDc.png

    88. 109 实时热门页面流量分析4_保证状态更新结果正确

    问题:onTimer定时器每条记录都可能触发一次定时任务触发的操作
    chrome_375IN8KERH.png

    优化:保存状态用keyValue保存而非listState
    chrome_qyRgKIthex.png

    89. 110 pv统计1_基本实现PageView.scala

    90. 111 pv统计2_数据并行的优化

    1. .map(data=>("pv",1L))//定义一个pv字符串作为分组的dummy key(哑key):去掉env.set并行度之前并行度是cpu数 

    2. .keyBy(_._1)//所有数据会被分到同一个组:导致并行度都是1 

    91. 112 uv统计1-基本实现(Unique vistitor)UniqueVisitor.scala

    92. 113 uv统计2-布隆过滤器去重思路和程序架构UvWithBloom.scala

    108*23=800M存储;如果是一个userId10B * 108就是1Gb存储
    即一亿用户的userId,每个userId占8个字节(Bytes)=一个窗口上G存储明显性能不行。
    106=1M
    一条数据大概1KB-2KB

    我们不需要完成的存储用户Id的信息,只需要知道它在不在就行了,所以其实可以压缩处理,用一位(bit)就可以标识一个用户的状态。这个思想具体实现就是布隆过滤器(BloomFilter)
    1bit*108=10M

    hash碰撞:本来1亿用户需要一亿个bit存储状态,再用冗余一亿个big加上优化的hash函数进行稀疏存储碰撞就小。

    93. 114 uv统计3-布隆过滤器简单实现

    94. 115 uv统计3-uv去重的布隆过滤器实现

    95. 116 APP市场推广统计1_自定义source源 AppMarketByChannel.scala

    96. 117 APP市场推广统计1_开窗聚合统计输出

    97. 118 广告点击量统计分析1_基本需求实现:不同地区用户广告的点击量 AdClickAnalysis.scala

    98. 119 广告点击量统计分析2_刷单行为过滤思路和整体框架

    99. 120 广告点击量统计分析2_刷单行为过滤代码实现

    100. 121 恶意登录检测1_实现思路和代码框架

    继续在UserBehaviorAnalysis模块新建一个子模块LoginFailDetect,在此模块中引入flink CEP库来实现事件流的模式匹配。
    2s内连续登陆失败n次即恶意登录

    101. 122 恶意登录检测2_具体代码实现LoginFail.scala

    问题:如果2s内登陆一二十次成功了,没有报警不合适。如果1s登录失败多次不报警不合适等业务场景。

    102. 123 恶意登录检测3_具体代码改进LoginFailAdvance.scala

    chrome_OGWiGW2d6r.png
    chrome_OGWiGW2d6r.png

    如果事件戳42,43,44三次登录失败在43前插入登录成功45的时间戳事件。122,123两个代码只能检测出43-44两个时间戳范围的报警信息,只有一条报警。
    chrome_WJhWrHGZ1I.png

    103. 124 恶意登录检测4_CEP代码实现:LoginFailWithCep.scala 乱序等待(来的eventTime-等待时间=watermark到达)

    CEP:复杂事件处理

    104. 125 CEP简介1_cep介绍和Parttern API

    104-1. 什么是CEP

    • 复杂事件处理(Complex Event Processing)
    • Flink CEP是在flink中实现的复杂事件处理(CEP)库
    • CEP允许在无休止的事件流中国检测事件模式,让我们有机会掌握数据中重要的部分。
    • 一个或多个由简单事件构成的事件流(事件序列)通过一定的规则匹配,然后输出用户想得到的数据---满足规则的复杂事件。
      chrome_Xr5PwppRij.png

    104-2. Pattern

    104-2-1. 模式概念

    • 处理事件的规则,被叫做(模式)Pattern
    • Flink cep提供了Pattern api,用于对输入流数据进行复杂事件规则定义,用来提取符合规则的事件序列
    1. //定义一个Pattern 

    2. val pattern=Pattern.begin[Event].where(_.getId==42) 

    3. .next("middle").subtype(classOf[SubEvent]).where(_.getTemp >=10.0) 

    4. .followedBy("end").where(_.getName=="end") 

    5.  

    6. //将创建好的Pattern应用到输入事件流上 

    7. val patternStream=CEP.pattern(inputDataStream,pattern) 

    8. //获取事件序列,得到处理结果 

    9. val result:DataStream[Alert]=patternStream.select(createAlert(_)) 

    104-2-2. pattern api

    • 个体模式(Individual Patterns)

      • 组成复杂规则的每一个单独的模式定义,就是“个体模式”
        start.times(3).where(_.behavior.startWith("fav"))
    • 组合模式(Combining Patterns,也就奥模式序列)

      • 很多个体模式组合起来,就形成了整个的模式序列
      • 模式序列必须以一个“初始模式”开始
        val start=Pattern.begin("start")
    • 模式组(Groups of patterns)

      • 将一个模式序列作为条件嵌套在个体模式里,成为一组模式
        模式序列由个体模式组合而成,再嵌入到个体模式里做为组。=》逻辑上矛盾,倒像是依据个体模式做一个序列的声明。??

    105. 126 CEP简介2_个体模式

    • 个体模式可以包括“单例(singleton)模式”和"循环(looping)模式"
    • 单例模式只接收一个事件,而循环模式可以接收多个
      • 量词(Quantifier)
        • 可以在一个个体模式后追加量词,也就是指定循环次数
    1. //匹配四次 

    2. start.times(4)//一个事件 

    3. //匹配出现0或4次//2个事件 

    4. start.times(4).optional 

    5. //匹配出现2,3,或者4次 

    6. start.times(2,4) 

    7. //匹配出现2,3,或者4次,并且尽可能多地重复匹配 

    8. start.times(2,4).greedy 

    9. //匹配出现1次或多次 

    10. start.oneOrMore 

    11. //匹配出现0,2次或多次,并且尽肯能多地重复匹配 

    12. start.oneOrMore(2).optional.greedy 

    底层是NFA(包)机制

    105-1. 个体模式的条件

    • 条件(Condition)
      • 每个模式都需要指定触发条件,作为模式是否接受事件进入的判断依据
      • CEP 中的个体模式主要通过调动.where() .or()和.until()来指定条件
      • 按不同的调用方式,可以分成以下几类:
        • 简单条件(SimpleCondition)

          • 通过.where方法对事件中的字段进行判断是啊选,决定是否接受该事件
            start.where(event=>event.getName.startWith("foo"))
        • 组合条件(Combining Condition)

          • 将简单条件进行合并:or方法标识或逻辑相连,where的直接组合就是And
        • 终止条件(Stop Condition)

          • 如果使用了oneOrMore或者oneOrMore.optional,建议使用.until()作为终止条件,以便请权利状态。
        • 迭代条件(iterative Condition)

          • 能够对模式之前所有接收的事件进行处理:value是事件,ctx是上下文
          • 调用where((value,ctx)=>{...}),可以调用ctx.getEventsForPattern("name")

    106. 127 CEP简介3_模式序列

    106-1. 1.组合模式(模式序列)中的近邻模式

    chrome_VQU0siDIYq.png
    chrome_VQU0siDIYq.png

    • 严格近邻(Strict Contiguity)
      • 所有事件按照严格的顺序出现,中间没有任何不匹配的事件,由.next()指定
      • 例如:对于模式“a next b”,事件序列[a,c,b1,b2]没有匹配
    • 宽松近邻(Relaxed Contiguity)
      • 允许中间出现不匹配的事件,由.followedBy()指定
      • 例如:对于模式“a followedBy b”,事件序列[a,c,b1,b2] 匹配为{a,b1}
    • 非确定性宽松近邻(Non-Deterministic Relaxed Contiguity)
      • 进一步放宽条件,之前已经匹配过的事件也可以再次使用,由.followedByAny()指定
      • 例如对于模式“a followedByAny b”,事件序列[a,c,b1,b2]匹配为{a,b1},{a,b2}

    106-1-1. 其他模式序列

    • 除以上模式序列外,还可以定义“不希望出现某种近邻关系”
      • .notNext()---不想让某个事件严格紧邻前一个事件发生
      • .notFollowedBy()---不想让某个事件在两个事件之间发生
    • 需要注意:
      • 所有模式序列必须以.begin开始
      • 模式序列不能以.notFollowedBy结束
      • “not”类型模式不能被optional修饰
      • 此外,还可以为模式指定时间(.withIn())约束,用来要求在多长时间内匹配有效。

    107. 128 CEP简介3_模式的检测和事件处理

    107-1. 2.模式的检测

    • 指定要查找的模式序列后,就可以将其应用与输入流以检测潜在匹配
    • 调用CEP.pattern(),以给定输入流和模式参数,就能得到一个PatternStream

    107-2. 3.匹配事件的提取(map函数)

    • 创建PatternStream之后,就可以用select或者flatselect方法,从检测到的事件序列中提取事件了
    • select方法需要输入一个select function作为参数,每个成功匹配的事件序列都会调用它
    • select以一个Map[String,Iterable[IN]]来接收匹配到的事件序列,其中key就是每个模式的名称,而value就是所有接收到的事件的iterable类型。
    1. def selectFn(pattern:Map[String,Iterable[IN]]):OUT={ 

    2. val startEvent=pattern.get("start").get.next 

    3. val endEvent=pattern.get("end").get.next 

    4. OUT(startEvent,endEvent) 

    5. } 

    107-2-1. 超时事件的提取

    • 当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃;为了能够处理这些超时的部分匹配,select和flatSelect api调用允许指定的超时处理程序
    • 超时处理程序会接收到目前为止由模式匹配到是所有事件,由一个OutputTag定义接收到的超时事件序列
      chrome_jclJMvBySN.png
      PatternTimeoutFunction,PatternSelecFunction PTF,PSF

    108. 129 订单超时实时检测1_实现思路和程序架构

    测试数据

    chrome_S2jdr23EJG.png
    chrome_S2jdr23EJG.png

    109. 130 订单超时实时检测2_CEP具体代码实现OrderTimeout.scala

    110. 131 订单超时实时检测3_流失输入数据测试

    ###每一个事件都是时间戳找到watermark这个点时把watermark之前缓存的数据进行处理。
    chrome_GVuRKFcdPY.png
    chrome_pSDBvJOXwx.png

    111. 132 订单超时实时检测3_ProcessFunction代码实现:OrderTimeoutWithoutCep.scala

    112. 133 双流实时对账1_需求分析和整体架构

    对于支付事件,用户支付完成并不算完,还得确认平台账户上到账了,而这往往来自不同的日志信息,所以需要做双流合并处理。

    113. 134 双流实时对账2_合流代码实现:TxMatch.scala

    114. 135 双流实时对账3_JOINAPI-WindowJoin

    1. stream.join(otherStream) 

    2. .where(<KeySelector>)//stream的key 

    3. .equalTo(<KeySelector>)//otherStream的key 

    4. .window(<WindowAssigner>) 

    5. .apply(<JoinFunction>) 

    chrome_BqcCAxOk01.png
    chrome_BqcCAxOk01.png

    window里就三类方法:trigger,移除器,apply(各种function参数);除了前两者,window也只有一个apply方法调用

    同一个窗口内的数据:两两流做笛卡尔积。
    chrome_NPSDSWAfln.png

    以上是针对窗口的处理

    115. 136 双流实时对账4_JOINAPI-IntervalJoin(间隔join)

    chrome_O5eI5LKyJZ.png
    chrome_O5eI5LKyJZ.png

    chrome_3AkOM0P2O2.png

    间隔join针对数据(时间戳)的处理

    115-1. 源码阅读:

    IntervalJoin.png
    IntervalJoin.png

    116. 137 双流实时对账5_Join代码实现

    116-1. watermark跳变效果分析

    wartermark 周期性200ms生成一次,在200ms内文件读入输出都读入处理完了,处理过程中认为都正常匹配上了,
    watermark没有变定时器没有触发,但事实上当前时间戳已经超过了
    但事实上200ms之后watermark又一次触发的时候,会发现时间已经跳变很多了,会触发watermark跳变。但是之前warmark没有变已经匹配输出的就没法改变了,所以此处匹配上输出的记录比较多见:137截图
    wartmark.png

    money怎么来?
  • 相关阅读:
    在命令行下运行Matlab
    VMWare无法共享文件夹(Win7宿主机Ubuntu14.04客户机)
    [转] CVonline: Image Databases
    第二天
    第一天
    二宝软件的NABCD分析
    用c++实现环形数组的最大子数组之和
    返回一个二维整数数组中最大子数组的和
    求最大子数组之和
    四则运算
  • 原文地址:https://www.cnblogs.com/bchjazh/p/14509532.html
Copyright © 2011-2022 走看看