zoukankan      html  css  js  c++  java
  • Flink-v1.12官方网站翻译-P009-Event-driven Applications

    事件驱动的应用

    处理函数

    简介

    ProcessFunction将事件处理与定时器和状态结合起来,使其成为流处理应用的强大构件。这是用Flink创建事件驱动应用的基础。它与RichFlatMapFunction非常相似,但增加了定时器。

    例子 

    如果你做过 "流分析 "培训中的实战练习,你会记得它使用TumblingEventTimeWindow来计算每个司机在每个小时内的小费总和,像这样。

    // compute the sum of the tips per hour for each driver
    DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
            .keyBy((TaxiFare fare) -> fare.driverId)
            .window(TumblingEventTimeWindows.of(Time.hours(1)))
            .process(new AddTips());
    

      

    用KeyedProcessFunction做同样的事情是相当直接的,也是很有教育意义的。让我们先把上面的代码替换成这样。

    // compute the sum of the tips per hour for each driver
    DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
            .keyBy((TaxiFare fare) -> fare.driverId)
            .process(new PseudoWindow(Time.hours(1)));
    

      

    在这段代码中,一个名为PseudoWindow的KeyedProcessFunction被应用于一个键控流,其结果是一个DataStream<Tuple3<Long,Long,Float>>(就是使用Flink内置时间窗口的实现所产生的那种流)。

    PseudoWindow的整体轮廓是这样的形状。

    // Compute the sum of the tips for each driver in hour-long windows.
    // The keys are driverIds.
    public static class PseudoWindow extends 
            KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
    
        private final long durationMsec;
    
        public PseudoWindow(Time duration) {
            this.durationMsec = duration.toMilliseconds();
        }
    
        @Override
        // Called once during initialization.
        public void open(Configuration conf) {
            . . .
        }
    
        @Override
        // Called as each fare arrives to be processed.
        public void processElement(
                TaxiFare fare,
                Context ctx,
                Collector<Tuple3<Long, Long, Float>> out) throws Exception {
    
            . . .
        }
    
        @Override
        // Called when the current watermark indicates that a window is now complete.
        public void onTimer(long timestamp, 
                OnTimerContext context, 
                Collector<Tuple3<Long, Long, Float>> out) throws Exception {
    
            . . .
        }
    }
    

      

    需要注意的事情。
    - ProcessFunctions有好几种类型--这是一个KeyedProcessFunctions,但还有CoProcessFunctions、BroadcastProcessFunctions等。
    - KeyedProcessFunction是RichFunction的一种。作为一个RichFunction,它可以访问在管理键控状态下工作所需的open和getRuntimeContext方法。
    - 有两个回调要实现:processElement和onTimer。"processElement "在每次传入事件时被调用;"onTimer "在定时器发射时被调用。这些定时器可以是事件时间,也可以是处理时间定时器。processElement和onTimer都提供了一个上下文对象,该对象可以用来与TimerService交互(除其他外)。这两个回调也都传递了一个可以用来发出结果的Collector。

    open()方法

    // Keyed, managed state, with an entry for each window, keyed by the window's end time.
    // There is a separate MapState object for each driver.
    private transient MapState<Long, Float> sumOfTips;
    
    @Override
    public void open(Configuration conf) {
    
        MapStateDescriptor<Long, Float> sumDesc =
                new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
        sumOfTips = getRuntimeContext().getMapState(sumDesc);
    }
    

      

    由于票价事件可能会不按顺序到达,所以有时需要处理一个小时的事件,然后再完成前一个小时的结果计算。事实上,如果水印延迟比窗口长度长得多,那么可能会有许多窗口同时打开,而不是只有两个。本实现通过使用MapState来支持这一点,MapState将每个窗口结束的时间戳映射到该窗口的提示之和。

    processElement()方法

    public void processElement(
            TaxiFare fare,
            Context ctx,
            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
    
        long eventTime = fare.getEventTime();
        TimerService timerService = ctx.timerService();
    
        if (eventTime <= timerService.currentWatermark()) {
            // This event is late; its window has already been triggered.
        } else {
            // Round up eventTime to the end of the window containing this event.
            long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
    
            // Schedule a callback for when the window has been completed.
            timerService.registerEventTimeTimer(endOfWindow);
    
            // Add this fare's tip to the running total for that window.
            Float sum = sumOfTips.get(endOfWindow);
            if (sum == null) {
                sum = 0.0F;
            }
            sum += fare.tip;
            sumOfTips.put(endOfWindow, sum);
        }
    }
    

      

    要考虑的事情。

    • 迟到的事件会怎样?在水印后面的事件(即迟到)会被丢弃。如果你想做一些比这更好的事情,可以考虑使用侧面输出,这将在下一节解释。
    • 这个例子使用了一个MapState,其中键是时间戳,并为同一个时间戳设置一个Timer。这是一种常见的模式;它使得在定时器发射时查找相关信息变得简单而高效。

    onTimer()方法

    public void onTimer(
            long timestamp, 
            OnTimerContext context, 
            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
    
        long driverId = context.getCurrentKey();
        // Look up the result for the hour that just ended.
        Float sumOfTips = this.sumOfTips.get(timestamp);
    
        Tuple3<Long, Long, Float> result = Tuple3.of(driverId, timestamp, sumOfTips);
        out.collect(result);
        this.sumOfTips.remove(timestamp);
    }
    

      

    观察。

    • 传递给onTimer的OnTimerContext上下文可以用来确定当前的密钥。
    • 我们的伪窗口是在当前水印到达每个小时结束时被触发的,此时调用onTimer。这个onTimer方法从sumOfTips中删除了相关的条目,这样做的效果是无法容纳迟到的事件。这相当于在使用Flink的时间窗口时,将allowLateness设置为零。

    性能考虑因素

    Flink提供了针对RocksDB优化的MapState和ListState类型。在可能的情况下,应该使用这些类型来代替持有某种集合的ValueState对象。RocksDB状态后端可以追加到ListState,而不需要经过(去)序列化,对于MapState,每个键/值对都是一个独立的RocksDB对象,因此MapState可以有效地被访问和更新。

    侧面输出

    简介

    有几个很好的理由可以让Flink操作者有一个以上的输出流,比如报告。

    • 异常
    • 畸形事件
    • 后事
    • 操作警报,如与外部服务的连接超时。

    侧输出是一种方便的方式。除了错误报告之外,侧输出也是实现流的n路分割的好方法。

    例子

    现在,您可以对上一节中被忽略的晚期事件做些什么了。

    一个侧输出通道与一个 OutputTag<T>相关联。这些标签具有与侧输出的DataStream的类型相对应的通用类型,它们具有名称。

    private static final OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};
    

      

    上面展示的是一个静态的OutputTag<TaxiFare>,它既可以在PseudoWindow的processElement方法中发出晚期事件时被引用。

    if (eventTime <= timerService.currentWatermark()) {
        // This event is late; its window has already been triggered.
        ctx.output(lateFares, fare);
    } else {
        . . .
    }
    

      

    并在访问这边的流时,在作业的主方法中输出。

    // compute the sum of the tips per hour for each driver
    SingleOutputStreamOperator hourlyTips = fares
            .keyBy((TaxiFare fare) -> fare.driverId)
            .process(new PseudoWindow(Time.hours(1)));
    
    hourlyTips.getSideOutput(lateFares).print();
    

      

    另外,您也可以使用两个具有相同名称的OutputTags来引用同一侧输出,但如果您这样做,它们必须具有相同的类型。

     结束语

    在这个例子中,你已经看到了如何使用ProcessFunction来重新实现一个直接的时间窗口。当然,如果Flink内置的窗口API满足你的需求,无论如何,请继续使用它。但如果你发现自己在考虑用Flink的窗口做一些变形,不要害怕推出自己的窗口。

    此外,ProcessFunctions对于计算分析之外的许多其他用例也很有用。下面的实践练习提供了一个完全不同的例子。

    ProcessFunctions的另一个常见用例是用于过期的陈旧状态。如果你回想一下Rides和Fares练习,其中使用RichCoFlatMapFunction来计算一个简单的连接,示例解决方案假设TaxiRides和TaxiFares是完美匹配的,每个rideId是一对一的。如果一个事件丢失了,同一乘车ID的其他事件将永远保持在状态。这可以替换为一个KeyedCoProcessFunction来实现,并且可以使用一个定时器来检测和清除任何陈旧的状态。

     实践

    与本节配套的实战练习是长乘警报练习。

    下一步阅读什么

     

  • 相关阅读:
    处理i18n国际电话区号的代码实践
    iOS 万能跳转界面方法 (runtime实用篇一)
    URI跳转方式地图导航的代码实践
    iOS @IBDesignable和@IBInspectable
    我的iOS高效编程秘诀—坚持编程习惯
    如何手动解析CrashLog
    小心别让圆角成了你列表的帧数杀手
    iOS开发之使用Runtime给Model类赋值
    POP介绍与使用实践(快速上手动画)
    NSString&NSMutableString常用操作梳理
  • 原文地址:https://www.cnblogs.com/lukairui/p/14204009.html
Copyright © 2011-2022 走看看