zoukankan      html  css  js  c++  java
  • 【转载】使用Flink低级处理函数ProcessFunction

    转载链接:https://zhuanlan.zhihu.com/p/130708277

    Flink的转换操作是无法访问事件的时间戳信息和水印信息的。例如我们常用的MapFunction转换操作就无法访问时间戳或者当前事件的事件时间。而这在一些应用场景下,极为重要。

    因此,Flink DataStream API提供了一系列的Low-Level转换操作,可以访问时间戳、水印以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。这一类的低级API,被称为"Process Function"。

    ProcessFunction

    ProcessFunction用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。

    ProcessFunction是一个低级的流处理操作,允许访问所有(非循环)流应用程序的基本构件:

    • events:数据流中的元素
    • state:状态,用于容错和一致性,仅用于keyed stream
    • timers:定时器,支持事件时间和处理时间,仅用于keyed stream

    Flink提供了8个Process Function:

    • ProcessFunction:dataStream
    • KeyedProcessFunction:用于KeyedStream,keyBy之后的流处理
    • CoProcessFunction:用于connect连接的流
    • ProcessJoinFunction:用于join流操作
    • BroadcastProcessFunction:用于广播
    • KeyedBroadcastProcessFunction:keyBy之后的广播
    • ProcessWindowFunction:窗口增量聚合
    • ProcessAllWindowFunction:全窗口聚合

    可以将ProcessFunction看作是一个具有key state和定时器(timer)访问权的FlatMapFunction。对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件。

    对于容错状态,ProcessFunction 可以通过 RuntimeContext 访问Flink的keyed state,这与其他有状态函数访问keyed state的方式类似。

    定时器可让应用程序对在处理时间和事件时间中的变化进行响应。每次调用 processElement(...)函数时都可以获得一个Context对象,通过该对象可以访问元素的事件时间(event time)时间戳以及 TimerService。可以使用TimerService为将来的事件时间/处理时间实例注册回调。对于事件时间计时器,当当前水印被提升到或超过计时器的时间戳时,将调用onTimer(…)方法,而对于处理时间计时器,当挂钟时间达到指定时间时,将调用onTimer(…)方法。在调用期间,所有状态的范围再次限定为创建定时器所用的key,从而允许定时器操作keyed state。

    如果想要在流处理过程中访问keyed state和定时器,就必须在一个keyed stream上应用ProcessFunction函数,代码如下:

    stream.keyBy(...).process(new MyProcessFunction())

    KeyedProcessFunction使用示例

    作为ProcessFunction的扩展(即子类),KeyedProcessFunction在其onTimer(…)方法中提供对计时器key的访问。其模板代码如下所示:

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {
        K key = ctx.getCurrentKey();
        // ...
    }

    在下面的示例中,KeyedProcessFunction维护每个key的计数,并在每过一分钟(以事件时间)而未更新该key时,发出一个key/count对:

    • 把计数、key和最后修改时间戳(last-modification-timestamp)存储在一个ValueState中, ValueState的作用域是通过key隐式确定的。
    • 对于每个记录,KeyedProcessFunction递增计数器并设置最后修改时间戳。
    • 该函数还安排了一个一分钟后的回调(以事件时间)。
    • 在每次回调时,它根据存储的计数的最后修改时间检查回调的事件时间时间戳,并在它们匹配时发出key/count(即,在该分钟内没有进一步的更新)。

    【示例】维护数据流中每个key的计数,并在每过一分钟(以事件时间)而未更新该key时,发出一个key/count对。

    1)首先导入必须所依赖包

    package com.xueai8;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
    import org.apache.flink.util.Collector;

    2)定义存储状态数据的数据结构(数据模型)

    /**
     * 存储在状态中的数据类型
     */
    public class CountWithTimestamp {
    
        public String key;           // 存储key
        public long count;           // 存储计数值
        public long lastModified;    // 最后一次修改时间
    }

    3)自定义ProcessFunction,继承自KeyedProcessFunction:

    public class CountWithTimeoutFunction
            extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {
    
        /** 由这个处理函数负责维护的状态 */
        private ValueState<CountWithTimestamp> state;
    
        // 首先获得由这个处理函数(process function)维护的状态
            // 通过 RuntimeContext 访问Flink的keyed state
        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
        }
    
        // 对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件
        // 对于每个记录,KeyedProcessFunction递增计数器并设置最后修改时间戳
        @Override
        public void processElement(
                Tuple2<String, String> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {
    
            // 获取当前的计数
            CountWithTimestamp current = state.value();
            if (current == null) {
                current = new CountWithTimestamp();
                current.key = value.f0;
            }
    
            // 更新状态计数值
            current.count++;
    
            // 设置该状态的时间戳为记录的分配的事件时间时间时间戳
                    if (ctx != null) {
                        current.lastModified = ctx.timestamp();
                    }
    
                    // 将状态写回
            state.update(current);
    
            // 从当前事件时间开始安排下一个计时器60秒
            ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
            }
    
        // 如果一分钟内没有进一步的更新,则发出 key/count对
        @Override
        public void onTimer(
                long timestamp,
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {
    
            // 获取调度此计时器的key的状态
            CountWithTimestamp result = state.value();
    
            // 检查这是一个过时的计时器还是最新的计时器
            if (timestamp == result.lastModified + 60000) {
                // 超时时发出状态
                out.collect(new Tuple2<String, Long>(result.key, result.count));
            }
        }
    }

    4)在流处理的主方法中应用自定义的处理函数

    public class StreamingJob {
        public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // 默认情况下,Flink将使用处理时间。要改变这个,可以设置时间特征:
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            // 源数据流
            DataStream<Tuple2<String, String>> stream = env
                    .fromElements("good good study","day day up","you see see you")
                    .flatMap(new FlatMapFunction<String, Tuple2<String,String>>() {
                        @Override
                        public void flatMap(String line, Collector<Tuple2<String, String>> collector) throws Exception {
                            for(String word : line.split("\W+")){
                                collector.collect(new Tuple2<>(word,"1"));
                            }
                        }
                    });
    
        // 因为模拟数据没有时间戳,所以用此方法添加时间戳和水印
            DataStream<Tuple2<String, String>> withTimestampsAndWatermarks =
                    stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, String>>() {
                        @Override
                        public long extractAscendingTimestamp(Tuple2<String, String> element) {
                            return System.currentTimeMillis();
                        }
                    });
    
        // 在keyed stream上应用该处理函数
        DataStream<Tuple2<String, Long>> result = withTimestampsAndWatermarks.keyBy(0).process(new CountWithTimeoutFunction());
    
        // 输出查看
            result.print();
    
        // 执行流程序
        env.execute("Process Function");
        }
    }
  • 相关阅读:
    报错:Failed to create BuildConfig class
    emulator control无法使用问题
    the import android cannot be resolved
    报错:init: Could not find wglGetExtensionsStringARB!
    Android SDK升级后报错error when loading the sdk 发现了元素 d:skin 开头无效内容
    Eclipse Android环境搭建
    android中导入低版本project可能会遇到的编译问题(转自: Victor@Beijing)
    22.9
    GIT文档
    机器学习的几个问题探讨
  • 原文地址:https://www.cnblogs.com/carsonwuu/p/14926695.html
Copyright © 2011-2022 走看看