flink学习总结
1.Flink是什么?
Apache Flink 是一个框架和分布式处理引擎,用于处理无界和有界数据流的状态计算。
2.为什么选择Flink?
1.流数据更加真实的反映了我们的生活方式。
2.传统的数据架构是基于有限的数据集
3.Flink 可以做到 低延迟,高吞吐,结果的准确性和良好的容错性
3.Flink的主要特点:
1.事件驱动
2.基于流的世界观:在Flink中,一切都是流,离线数据是有界流,实时数据是无界流。
3.分层API:越顶层越抽象,表达简单,使用方便;越底越具体,表达丰富使用灵活。
1.SQL/Table API (dynamic tables)
2.DataStream API (streams,windows)
3.ProsessFunction(events,states,time)
4.支持事件时间(event-time)和处理时间(processing-time)语义
5.精确一次(exactly-once)的状态一致性保证
6.低延迟,每秒处理百万个事件,毫秒级延迟
7.与众多常用存储系统系统的连接
8.高可用,动态扩展,实现7*24小时全天候运行
4.Flink运行时组件:
1.JobManager:作业管理器
1.控制一个应用程序的主进程,即每个APP被一个不同的JobManager
2.JobMannger会接收到要执行的程序,包括:作业图(jobGraph),逻辑数据流图(logical dataflow graph) 和打包的所有类库,和其他jar包
3.JobManager会把JobGraph 转换为一个物理层的数据流图,叫做执行图(ExecutorGraph),包含所有可以并发执行的任务。
4.JobManager会向资源管理器请求执行任务必要的资源,也即是slot插槽。获得资源之后,就会将执行图分发到运行他们的taskManager上。运行过程中JobManager负责中央协调操作,比如检查点(checkpoints)的协调。
2.TaskManager:任务管理器
1.Flink工作进程。通常Flink有多个TaskManager运行,每个TaskManager有多个Slot,Slot数量限制了TaskManager能够执行的任务数量。
2.启动之后 ,TaskManager向资源管理器注册插槽,收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分匹配任务执行了。
3.执行中,一个TaskManager可以和其他运行同一应用程序的TaskManager交换数据
3.ResourceManager:资源管理器
1.主要负责管理TaskManager的插槽Slot。Slot是Flink定义处理资源单元
2.Flink为不同环境和资源管理工具给提供了不同的资源管理器,比如yarn,Mesos,K8s,以及standalone。
3.当JobManager申请插槽时,ResourceManager会将有空闲的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽,还可以向资源提供平台发起会话,一提供启动TaskManager进程的容器。
4.Dispacher:分发器
1.可以跨作业运行,作为应用提交提供REST接口
2.当一个APP被提交时,分发器会启动并将APP移交给一个JobManager.
3.Dispatcher也会启动一个WebUI ,方便展示监控信息。
4.DISPACher在架构中可能不是必须的,只取决于应用提交的方式。
5.任务提交流程:
1.standalone:
APP提交应用到Dispatchr,转交给JobManager,JM请求向RM申请Slot,TM向RM注册启动自己的Slot,RM发出提供
Slot的指令,TM向JM提供Slots, JM分配给TM要执行的任务,最后不同的TM之间还会有数据交换。
6.TM和Slots
1.Flink每个TM都是一个JVM进程,可能在独立的线程中执行一个或多个task
2.为控制每个TM接受的task数,TM通过Slot控制,一个TM至少1个slot
3.默认Flink允许任务共享Slot,即使是不同任务的子任务。这样的结果是,一个Slot可以保存作业的整个管道。
4.slot是静态的概念,是指TM具有并发执行能力。
7.程序和数据流(dataFlow)
1.所有的Flink都是由三部分组成:Source,Transformation和Sink
2.source负责读取数据源,Transformation负责利用各种算子转换加工,sink负责输出。
3.运行时,Flink运行程序会被映射成逻辑数据流DataFlow
8.并行度
1.一个特定算子的子任务的个数称之为并行度,Parallelism
2.一般来说,一个stream的并行度可以认为是算子中最大的并行度
3.一个程序,不同的算子可能有不同的并行度
4.算子之间传输数据的形式可能是one to one 也可能是redistributing。具体哪种取决于算子的种类。
5.one to one :stream 维护分区以及元素的顺序,这意味map算子的子任务看到的元素个数以及顺序和source
产生的元素顺序相同。map,filter,flatmap等都是one to one 。
6.Redistrbuting:stream的分区发生改变。每一个算子的子任务一句选择的transformation发动数据到不同的任务。例如,
keyBy基于hashCode重分区,而broadcast和rebalance会随机分区,就类似于Spark中的Shuffle。
9.任务链(operator Chains)
1.Flink采用一种任务链的优化技术,在特定情况下减少本地通信的开销,为了满足任务链的需求,必须将两个或多个
算子设为相同的进行度,另通过本地转发的方式进行连接
2.one to one 操作是,Flink连接的算子连接在一起形成一个task,原来的神算子成为里面的subtask
3.并行度相同,并且是one to one 操作,两个缺一不可
4.operator chains 的优点,在同一个chain里面的任务数据不用做数据传输,即序列化反序列化,以及通信开销
5.在代码中禁用这种任务链:env.disableOpetarorChaining()
10.Flink 流处理API
1.Environment:val env = StreamExecutionEnvironment.getExecutionEnvironment
2.createLocalEnvironment:返回本地执并行度行环境,设置调用并行度
3.createRemoteEnvironment:返回集群执行环境,将Jar包需要在提交都远程服务器,需要在调用时指定JobManager的IP
和端口号,并指定集群运行的Jar包。
11.Flink基本算子
1.map ;flatMap;filter;类似Spark
2.keyBy:DataStream -> KeyedStream:逻辑地将一个流拆成不相交的分区,每个分区包含相同key的元素,
在内部以hash形式实现。
3.滚动聚合算子(rolling Aggregation):必须针对keyBy 之后的算子:sum,min,max,minBy,MaxBy
12.split 和Select
1.DataStream -》 SplitStream:根据某些特征把DataStream拆分成两个或者多个DataStream。
2.SplitStream -> DataStream :从一个SplitStream中获取一个或多个DataStream。
13.Connect 和CoMap(只能两条流合并,必须先CoMap再使用)
DataStream,DataStream->ConnectedSteam:连接两个保持各自类型数据流,两个数据流被Connect后,只是被放在同一个
流中,内部依然保持各自的数据和形式不发生变化,各自独立。
ConnectedSteam-> DataStream:作用于ConnectStream上,功能与map,flatMa一样,对ConnectStream中的每一
个Stream进行map和flatMap处理
14.union(可以合并多条流):数据结构必须一样
1.DataStream->daStream:对两或两个以上的DataStream进行合并,产生一个包含所有DataStream元素的
新DataStream。
15.Flink支持的数据类型:
1.Java和Scala所有基础类型。
2.Java和Scala元组
3.Scala样例类
4.Java简单对象
5.其他Arrays,Lists,Maps,Enums
16.实现UDF函数--更细粒度的控制流
1.富函数(RichFunction):是DataStream API 提供的一个函数类接口,所有Flink函数类都有Rich版本。
可以获得运行时环境上下文,并拥有生命周期方法,所以可以实现更复杂的功能。
RichMapFunction。。。
2.生命周期:open()初始化方法
close()最后一个调用,清理工作
getRuntimeContext(),提供运行时上下文,例如并行度,任务的名称,以及state状态
17.Sink API:
所有Flink组件输出都要通过Sink组件来完成:stream.addSink(new MySink(xxxx))
1.Flink提供编译好的sink组件包括kafkaSink, redisSink,jdbcSink,等
18.window API:
1.处理无界流,可以进行切分,得到有界流。
2.窗口(window)就是将无界流切割成有节流的一种方式,他将数据流分发到有限的桶中进行分析(bucket)
19.window类型
1.时间窗口(time window)
1.滚动时间窗口(Tumbling Window)
1.将数据一句固定的窗口长度对数据进行切分
2.时间对齐,窗口长度固定,没有重叠
2.滑动时间窗口(sliding Windows)
1.滑动窗口是固定窗口更广义的一种形式,滑动窗口由固定的窗口长度和滑动步长组成
2.窗口长度对齐,可以重叠
3.会话窗口
1.由一系列事件组成一个指定时间长度的timeout间隙组成,也就是窗口长度和滑动间隔组成
2.特征:时间无对齐
2.计数窗口(Count Window)
1.滚动计数窗口
2.滑动计数窗口
20.窗口API
1.窗口分配器:window()
2.window方法只能用在keyBy后。
3.Flink提供更加简单的timeWindow和CountWindow
4.DataStream.map(r=>(r.id,r.temp)).keyBy(_._1).timeWindow(Time.seconds(15)).reduce((r1,r2)=>(r1._1.r1._2.min(r2._2)))
21.窗口分配器:(window assigner)
1.window() 方法接受的参数是一个WindowAssigner
2.WindowAssigner 负责将每条数据分发到正确的window中
3.Flink提供通用的WindowAssigner
1.滚动窗口(tumbling window)
2.滑动窗口(sliding window)
3.会话窗口(session window)
4.全局窗口(global window)
4.创建不同的窗口
1.滚动:.timeWindow(Time.seconds(15))
2.滑动:.timeWindow(Time.seconds(15),Time.seconds(5))
3.会话:.timeWindow(EventTImeSessionWindows.withGap(Time.minutes(10))
4.滚动计数:.countWindow(5)
5.滑动计数窗口:countWindow(10,2)
22.窗口函数:window function
1.增量聚合函数:incremental aggregation functions
1.每条数据到来就进行计算,保持一个简单的状态
2.ReduceFunction,AggregateFunction
2.全窗口函数:full window functions
1.先把窗口所有数据收集起来,等到计算的时候遍历所有数据
2.ProcessWindowFunction
3.其他可选API:
1.trigger():触发器:定义window什么时候关闭,触发计算结果
2.evitor:(移除器):定义移除某些数据的逻辑
3.allowedLateness():允许处理迟到的数据
4.sideOutPutLateData():将迟到的数据放入侧输出流
5.getSideOutPut():获得侧输出流
23.Flink的时间语义和waterMark
1.时间语义:
1.Event TIme:事件创建的时间
2.Ingestion TIme:数据进入Flink的时间
3.Processing TIme:执行算子的本地系统时间,与机器有关
4.不同的时间语义有着不同的应用场景,往往关注事件时间:Event TIme
5.Event TIme 可以在日志数据的时间戳中提取
2.在代码中设置时间语义:env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
3.乱序数据的影响:
1.当Flink以Event Time模式处理数据时,他会根据数据里的时间戳来处理时间的算子
2.由于网络,分布式等原因,会导致乱序数据的产生
3.乱序数据会让窗口计算不准确
4.waterMark
1.遇到一个时间戳达到窗口关闭时间,不应该立即触发窗口计算,而是应该等待一段时间,等迟到的数据来了才关闭窗口
2.watermark是衡量 Event time进展的机制,可以设定延迟触发
3.watermark 是用于处理乱序数据,而正确的处理乱序数据,通常用waterMark机制结合window来实现
4.数据流中的watermark用于表示timeStamp小于watermark的数据,都已经到达,因此window的执行也是由Watermark触发的。
5.watermark用于让程序自己平衡延迟和结果正确性
5.watermark的特点
1.watermark是一条特殊的记录数据
2.watermark 必须单调递增,以确保任务的事件时间钟在向前推进,而不是在后退
3.waterMark与数据的时间戳相关
6.watermark的引入:对于排序好的数据只需要指定时间戳就够了,不需要延迟触发
1.DataStream.assignAscendingTimestamps(_.timestamp * 1000)
2.Flink 暴露了TImestampAssigner接口供我们实现,使我们可以自定义如何从事件数据抽取时间戳和生成watermark
dataStream.assignTimestampsAndWatermarks(new MyAssigner())
7.TimestampAssigner
1.定义抽取时间戳,以及生成waterMark的方法,有两种
2.AssignerWithPeriodicWatermarks
1.周期生成waterMark:系统周期性的将waterMark插入到流中
2.默认周期200ms,可以使用ExecutionConfig.setAutoWatermarkInterval()方法设置
3.升序和乱序的处理数据BoundedOutofOrderness,都是基于周期性的watermark。
3.AssignerWithPunctuatedWatermarks
1.没有时间周期规律,可生成watermark
8.watermark的设定
1.watermark由应用程序人员生成,需要对业务有一定了解
2.如果waterMark设置太久,收到结果的速度可能会很慢,解决办法是在水位线到达之前先输出一个近似值
3.而如果waterMark达到的太早,则可能收到错误结果,不过Flink处理迟到数据的机制可以解决这个问题
24.ProcessFunction API 底层API
1.DataStream API 提供一系列底层API,可以访问时间戳,watermark以及注册定时事件,还可以输出一些事件。
2.Flink提供8个Process Function
ProcessFunction;KeyedProcessFunction;CoProcessFunction;ProcessJoinFunction;BroadcastProcessFunction
ProcessWindowFunction;keyedBroadcastFunction;ProcessAllWindowFunction
25.keyedProcessFunction:
1.用来操作KeyedStream。可以处理流的每一个元素,输出为0 ,1或多个元素。继承自RichFunction,所以都有
open(),close(),getRuntimeContext()z等方法。
2.keyedProcessFunction额外提供两个方法:
1.processElement,流中每个元素都会调用该方法,调用结果放在Collector
数据类型中输出,Context可以访问时间戳,元素的key以及TImeService时间服务,Context还可以将输出结果输出到其他的流
Sideoutputs。
2.onTImer:是一个回调函数。当之前注册的定时器触发时调用。参数timestamp为定时器所设定的时间戳。Collector
为输出结果集合。
3.侧输出流:process function的侧输出流可以产生多条流,并且流的数据类型可以不一样。一个side output可以定义为
OutputTag[X]对象,process function可以通过Context对象发射一个事件或者多个side outputs
26.Flink 中的状态
1.由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态。
2.可以认为状态就是本地本地变量,可以被任务的业务逻辑访问
3.Flink会进行状态管理,包括状态一致性,故障处理以及高效存储和访问,以便开发人员专注于应用程序的逻辑
4.在Flink中,状态始终与特定的算子相关联
5.为了使运行时的Flink了解算子的状态,算子需要预选注册其状态
6.总体来说,有两种类型的状态:
1.算子状态:作用范围为算子任务,同一并行处理的所有数据都能访问到相同的状态;
状态对于同一个任务是共享的
算子状态不能由相同或者不同的算子的另一个任务访问
2.键控状态:根据数据流中的Key来维护状态(常用)
Flink为每个key维护一个状态实例,并且具有相同Key的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个Key对应的状态
当任务处理一条数据时,会自动将状态的访问范围限定在当前的key
7.算子状态数据结构:
1.列表结构(list state):将状态表示为一组数据的列表
2.联合列表状态(union list state):也是列表。区别是在发生故障时,或者从保存点启动程序时如何恢复
3.广播状态(Broadcast state):如果一个算子有多个任务,而且这些任务的状态相同,则这种情况最适合广播状态
8.键控状态数据结构:
1.值状态:(Value state):将状态表示为单个的值
2.列表状态:(list state):将状态表示为一组数据的列表
3.映射状态:(Map state):将状态表示为key-value
4.聚合状态:(reducing state & Aggregating state):将状态表示为一个用于聚合操作的列表
9.键控状态的使用:
1.声明一个键控状态
lazy val lastTemp:ValueState[Double] = getRuntimeContext.getState[Double](new ValueStateDescriptor[Double]("lasttemp",classOf[Double]))
2.读取状态: val preTemp = lastTemp.value()
3.对状态进行赋值:lastTemp.update(value.temperature)
27.状态后端(state Backends)
1.每传入一条数据,有状态的算子都会读取和更新状态
2.由于有状态访问对于处理数据的低延迟至关重要,因此每个并行的任务都会在本地维护其状态,以确保快速的状态访问
3.状态的存储,访问和维护,由一个可插入的组件决定,这就是状态后端。
4.状态后端主要负责两件事:本地的状态管理,和将检查点checkPoint状态写入远程存储
28.选择一个状态后端:
1.MemoryStatebackend:内存级,会将键控状态作为内存中的对象进行管理,将他们存储在TaskManager的JVM堆上,而将checkPoint存储在JobManager
的内存中
2.FsStateBackend:将checkPoint存在远程的持久化文件系统中,对于本地状态,和MemoryStateBackend一样,也会存在TaskManager的JVM堆上,同时
拥有内存级别的本地访问速度和更好的容错
3.RocksBDStateBackend:将所有状态序列化以后,存入本地的RocksDB
29.一致性检查点(CheckPoints)
1.Flink 故障恢复机制的核心,就是应用状态的一致性检查点
2.有状态流应用的一致性检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(快照),
这个时间点,应该是所有任务都恰好处理完成一个相同的输入数据的时候
3.在执行流应用程序期间,Flink会定期保存状态的一致性检查点
4.如果发生故障,Flink将会使用最近的检查点来一致恢复应用的程序的状态,并重新启动处理流程
5.遇到故障以后,第一步应该重启应用
6.第二步是从checkPoint中读取状态,将状态重置,从检查点重新启动应用后,其内状态与检查点完成时的
状态完全相同
7.第3步:开始消费并处理检查点到发生故障之间的所有数据,这种检查点的保存和恢复机制可以为
应用程序状态提供精确一次(exactly-once)的一致性,因为所有的算子都会保存检查点并恢复所有状态,这样一来
所有的输入流就都会被重置到检查点完成的位置。
30.检查点的实现算法:
1.简单的想法:暂停应用,保存状态到检查点,再重新恢复应用
2.Flink的改进实现:基于Chandy-lamport算法的分布式快照,将检查点的保存和数据处理分开,不暂停整个应用
3.检查点分界线(Checkpoint Barrier):
1.Flink 检查点算法用到了一种称为分界线的特殊数据形式,用来把一条流上的数据按照不同的检查点分开
2.分界线之前到来的数据导致的状态改变,都会被包含在当前的分界线所属的检查点中,而基于分界线之后的
数据所导致的所有改变,就会被包含在之后的检查点中
3.Jobmanager回向每个source任务发送一条带有新检查点ID的消息,通过这种方式启动检查点
4.数据源将他们的状态写入检查点,并发出一个检查点barrier
5.状态后端在状态存入检查点之后,会返回通知给source任务,source任务就会向JobManager确认检查点完成
6.分界线对齐:barrier向下游传递,sum任务会等待所有输入分区的barrier到达
7.对于barrier已经到达的分区,继续到达的数据会被缓存
8.而barrier未到达的分区,数据会被正常处理
9.当收到所有输入分区的barrier时,任务就会将其任务状态保存到状态后端的检查点,然后将barrier继续向下游转发
10.向下游转发检查点barrier后,任务继续正常的数据处理
11.sink任务向JobManager确认状态保存到checkpoint完毕
12.当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了
31.savepoint(保存点)
1.Flink还可以自定义镜像保存点,就是savepoints
2.原则是,创建保存点使用算法和检查点完全相同,因此可以看成是具有一些额外元数据的检查点
3.Flink不会自动创建保存点,因此用户必须明确地触发创建操作
4.保存点是一个强大的功能。除了可以故障修复外,保存点可以用于:有计划地手动备份,更新应用程序,版本迁移,暂停和重启应用
32.状态一致性:
1.有状态的流处理,内部每个算子任务都有自己的状态
2.对于流处理器内部来说,所谓状态一致性,就是计算结果保持正确
3.一条数据不应该丢失,也不应该重复计算
4.在遇到故障是可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的
33.状态一致性分类:
1.at-most-once(最多一次):即什么都不干,既不恢复丢失的状态,也不重播丢失的数据。即最多处理一次事件
2.at-least-once(至少一次):大多数场景下我们不希望丢失事件。,即所有事件都得到了处理,而且一些事件可能被多次处理
3.exactly-once(精确一次):恰好处理一次,是最严格的保证,也是最难实现的。不仅事件不丢,而且对每一个数据,内保状态只更新一次
34.一致性检查点(checkPoints)
1.Flink使用一种轻量级的快照机制--检查点来保证exactly-once语义
2.有状态流应用的一致检查点,就是所有任务的状态在某个事件点的一份拷贝(快照),二这个时间,应该是所以任务都恰好处理完一个相同的输入数据的时候。
3.应用状态的一致检查点,是Flink故障恢复机制的核心
35.端到端的状态一致性(end to end)
1.真正应用中,流处理应用除了流处理器外还包含数据源如Kafka和输出到持久化系统
2.整个端到端的一致性级别取决于所有组件中一致性最弱的组件
36.exactly-once:
1.内保保证--checkPoint
2.source端--可重设数据的读取位置
3.sink端--从故障恢复时,数据不会重复写入外部系统
1.幂等写入:就是说一个操作,可以执行很多次,但只导致一次结果改变,即,后面的重复操作不起作用
2.事务写入:应用程序中一系列严密的操作都必须成功,否则每个操作中所作出的更改都会被撤销
原子性:要么都成功,要么都不做
4.two-phase-commit,2pc
1.对于每个checkpoint,sink任务会启动一个事务,并将接下来的数据添加到事务里。
2.然后将这些数据写入外部的sink系统,但不提交,只是预提交
3.当收到checkpoint完成的通知时,才正式提交事务,实现结果的写入
4.这种真正实现了exactly-once,他需要一个提供事务支持的外部sink系统,Flink提供了twoPhaseCommitSinkFunction接口
5.2PC对外部sink系统的要求
1.必须支持事务,或者sink任务必须能够模拟外部系统上的事务。
2.在checkpoint间隔期间里,必须能够开启一个事物并接受数据写入
3.在收到checkpoint完成通知之前,事务必须是等待提交的状态,在故障恢复的情况下,这可能需要一点时间。如果这时sink系统关闭了(超时),那么未提交的数据会丢失
4.sink任务必须 能在进程失败后恢复事务
5.提交事务必须是幂等操作
37.Flink+Kafka端到端一致性的保证
1.内部--checkpoint机制
2.source:kafka consumer 作为source,可以将偏移量保存下来,如果后续任务出现故障,可以由连接器重置偏移量,重新消费
3.sink:kafka producer作为sink,采用两阶段提交sink,需要实现一个TwoPhaseCommitSinkFunction
38.Flink tableAPI
1.略
39.Flink CEP
1.复杂事件处理
2.Flink CEP是在Flink中实现复杂事件处理库
3.允许在无休止的事件流检测事件模式,让我们有机会掌握数据的重要部分
4.有一个活多个简单时间构成的时间通过一定的规则匹配,然后输出用户想要的数据
5.CEP的特点:
目标:.从简单的事件流中发现一些高级特征
输入:一个或多个有简单事件构成的事件流
处理:识别简单事件流之间的内在联系,多个符合一定简单规则的事件构成复杂事件
输出:满足规则的复杂事件
40:Pattern API
1.exmaple:
val pattern = Pattern.begin[Event]("start").where(_.getId ==42).next("middle").subtype(classOf[SubEvent]
.where(_.getTemp >=10.0).followedBy("end").where(_.getName == "end")
val patternStream = CEP.pattern(inputDataStream,pattern)
val result : DataStream[Alert] = patternStream.select(createAlert(_))
2.个体模式:组成复杂规则的每一个单独模式定义
3.组合模式:很多个体模式组合起来,就形成了整个模式序列 --一个模式必须以.start()开始
4.模式组:将一个模式序列作为条件嵌套在个体模式里,成为一组模式
41.个体模式
1.个体模式可以包括单例模式和循环模式(singleton && looping)
2.单例模式只接受一个事件,而循环模式可以接受多个
3.量词:可以在一个个体模式后面追加量词,也就是循环次数
start.times(4) //匹配4次 start.times(2,4).greedy //匹配2,3,或4并且尽可能多的重复匹配
start.times(4).optional // 0次或4次 start.oneOrMore //1次或多次
4.条件
1.每个模式都需要指定触发条件,作为是否接受事件进入的判断依据
2.CEP中的个体模式主要通过调用.where(),.or(),.until()来指定条件
3.按照不同调用方式分为
1.简单条件:通过.where()筛选
2.组合条件:将简单条件组合:pattern.where().or()
3.终止条件:如果用了oneOrMore或者oneOrMore.optional,建议使用.until()作为终止条件
4.迭代条件:能够对之前所有的事件处理:调用where((value,ctx)=>{}),可调用ctx.getEventsForPattern("name")
start.times(2,4) //出现2,3或4次 start.timesOrMore(2).optioanl.greedy //出现0次,2次或多次,且重复匹配
42.模式序列
1.不同的近邻模式:
1.严格近邻:所有事件严格按照顺序出现,中间没有任何不符合的时间,由next(),指定
2.宽松近邻:允许中间出现不匹配的事件,由followedBy指定
3.非确定性宽松近邻:进一步放宽条件,之前匹配过的也可以再次使用,由followByAny()指定。
2.不希望出现某种近邻:
1.notNext() -不想让某种时间严格近邻前一个事件
2.notFollowedBy:--不想让某个事件发生在两个事件之间。末尾不能
3.注意:
1.所有模式必须以begin()开始
2.所有序列不能以notFollowedBy结束
3.not 类型模式不能别optional修饰
4.可以指定模式指定时间约束,用来要求在多长时间内匹配有效:next.within(Time.seconds(10))
43.匹配事件的提取:
1.创建PatternStream后,就可以应用select或者flatselect方法,从检测到的事件里提取事件。
2.select方法需要传入一个select function作为参数,每个匹配到的事件序列都会调用他
select 以一个Map【string iterable】来接受匹配到的事件序列,其中key就是每个模式的名称,而value就是接受到所有事件的iterable类型
44.超时事件的提取
1.当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃,为了能够处理这部分的匹配,select
和flatselect 允许处理超时序列
2.超时处理程序会接受到目前为止由模式匹配到的所有事件。由一个OutputTag定义接收到的超时序列
3.例:val patternStream = CEP.pattern(input,pattern)
val outputTag = OutputTag[String]("side-out-put")
val result = patternStream.select(outputTag){...}