zoukankan      html  css  js  c++  java
  • 【翻译】了解Flink-数据管道和ETL -- Learn Flink


    翻译来源-Learn Flink Data Pipelines & ETL

    Apache Flink的一种非常常见的用例是实现ETL(提取,转换,加载)管道,该管道从一个或多个源获取数据,执行一些转换和/或扩充,然后将结果存储在某个地方。在本节中,我们将研究如何使用Flink的DataStream API来实现这种应用程序。

    请注意,Flink的Table和SQL API 非常适合许多ETL用例。但是,无论您最终是否直接使用DataStream API,这里介绍的基础知识深刻的了解都会很有价值。

    无状态转换

    本节介绍map()和flatmap(),用于实现无状态转换的基本操作。本节中的示例假定您熟悉flink-training仓库中动手练习中使用的Taxi Ride数据 。

    map()

    在第一个练习中,您过滤了出租车事件流。在同一代码库中,有一个 GeoUtils类提供了一种静态方法GeoUtils.mapToGridCell(float lon, float lat),该方法将位置(经度,纬度)映射到网格单元,该网格单元指的是大小约为100x100米的区域。

    现在,通过向每个事件添加startCell和endCell字段来丰富我们的出租车乘车对象流。您可以创建一个EnrichedRide扩展对象TaxiRide,添加以下字段:

    public static class EnrichedRide extends TaxiRide {
        public int startCell;
        public int endCell;
    
        public EnrichedRide() {}
    
        public EnrichedRide(TaxiRide ride) {
            this.rideId = ride.rideId;
            this.isStart = ride.isStart;
            ...
            this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
            this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
        }
    
        public String toString() {
            return super.toString() + "," +
                Integer.toString(this.startCell) + "," +
                Integer.toString(this.endCell);
        }
    }
    

    然后,您可以创建一个转换这个流的应用程序

    DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
    
    DataStream<EnrichedRide> enrichedNYCRides = rides
        .filter(new RideCleansingSolution.NYCFilter())
        .map(new Enrichment());
    
    enrichedNYCRides.print();
    

    使用MapFunction:

    public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
    
        @Override
        public EnrichedRide map(TaxiRide taxiRide) throws Exception {
            return new EnrichedRide(taxiRide);
        }
    }
    

    flatmap()

    一个MapFunction仅在执行一对一转换时适用:对于每个进入的流元素,map()将发出一个转换后的元素。否则,您将要使用 flatmap()

    DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
    
    DataStream<EnrichedRide> enrichedNYCRides = rides
        .flatMap(new NYCEnrichment());
    
    enrichedNYCRides.print();
    

    使用FlatMapFunction:

    public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
    
        @Override
        public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
            FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
            if (valid.filter(taxiRide)) {
                out.collect(new EnrichedRide(taxiRide));
            }
        }
    }
    

    使用此接口中提供的Collector,该flatmap()方法可以发射任意数量的流元素,包括none。

    keyed 流

    keyBy()

    围绕流的一个属性对流进行分区往往很有用,以便属性相同的所有事件分组到一块。例如,假设您要查找从每个网格单元开始的最长的出租车车程。从SQL查询的角度考虑,这将意味着使用进行某种GROUP BY startCell,而在Flink中,这是通过 keyBy(KeySelector)

    rides
        .flatMap(new NYCEnrichment())
        .keyBy(enrichedRide -> enrichedRide.startCell)
    

    每一个keyBy都会导致网络混洗,从而对流进行重新分区。通常,这非常昂贵,因为它涉及网络通信以及序列化和反序列化。

    keys计算

    KeySelector不仅限于从事件中提取key。他们可以,而是计算在你想要的任何方式的key,只要所产生的key是确定的,并具有有效的hashCode()和equals()实现。此限制排除了生成随机数或返回Array或Enums的KeySelector,但是例如,只要它们的元素遵循这些相同的规则,就可以使用Tuples或POJO来使用复合键。

    keys必须以确定性的方式生成,因为它们会在需要时重新计算,而不是附加到流记录中。

    例如,不是创建一个新EnrichedRide类,而是用一个字段,我们通过以下代码使用startCell字段,

    keyBy(enrichedRide -> enrichedRide.startCell)
    

    我们按上面做,而不是按照下面:

    keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
    

    keyed流上的聚合

    此代码段创建一个新的元组流,其中包含startCell和每个乘坐结束事件的时长(以分钟为单位):

    import org.joda.time.Interval;
    
    DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
        .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
    
            @Override
            public void flatMap(EnrichedRide ride,
                                Collector<Tuple2<Integer, Minutes>> out) throws Exception {
                if (!ride.isStart) {
                    Interval rideInterval = new Interval(ride.startTime, ride.endTime);
                    Minutes duration = rideInterval.toDuration().toStandardMinutes();
                    out.collect(new Tuple2<>(ride.startCell, duration));
                }
            }
        });
    

    现在,可以生成一个流,流中仅包含每一个startCell看到(到该点为止)的最长的乘坐。

    可以使用多种方式表示要用作key的字段。前面您看到了一个带有EnrichedRide POJO的示例,其中用作键的字段是用其名称指定的。下面的情况涉及Tuple2对象,并且元组中的索引(从0开始)用于指定key。

    minutesByStartCell
      .keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
      .maxBy(1) // duration
      .print();
    

    输出流包含每个键的记录,在每个key的每次持续时间达到新的最大值时会输出,如此处50797格子中所示:

    ...
    4> (64549,5M)
    4> (46298,18M)
    1> (51549,14M)
    1> (53043,13M)
    1> (56031,22M)
    1> (50797,6M)
    ...
    1> (50797,8M)
    ...
    1> (50797,11M)
    ...
    1> (50797,12M)

    (Implicit)状态

    这是此手册中涉及有状态流的第一个示例。尽管状态是透明处理的,但是Flink必须跟踪每个不同key的最大持续时间。

    每当应用程序涉及到状态,都应该考虑该状态可能会变大。只要key空间是无界的,那么Flink需要的状态量也跟key空间一样。

    当使用流时,通常聚合在有限窗口而不是在整个流上多考虑。

    reduce() 和其他聚合器

    上面使用的maxBy()仅仅是Flink KeyedStream上许多可用的聚合器功能的一个示例。reduce()您还可以使用一个更通用的功能来实现自己的自定义聚合。

    有状态的Transformations

    为什么Flink参与管理状态?
    您的应用程序当然有能力使用状态,而无需Flink参与管理状态-但是Flink为它管理的状态提供了一些引人注目的功能:

    • 本地:Flink状态被保存在处理该状态的机器本地,并且可以以内存速度访问
    • 经久耐用:Flink状态是容错的,即定期自动保存检查点,并在发生故障时恢复
    • 垂直可扩展:Flink状态可以保留在嵌入式RocksDB实例中,该实例可以通过添加更多本地磁盘来扩容
    • 水平可伸缩:随着群集的增长和收缩,Flink状态将重新分配
    • queryable:可通过Queryable State API在外部查询Flink状态。
      在本节中,您将学习如何使用Flink的API管理keyed状态。

    丰富的功能

    此时,您已经看到Flink的几个功能接口,包括 FilterFunction,MapFunction,和FlatMapFunction。这些都是“单一抽象方法”模式的示例。
    对于每个接口,Flink还提供了一个称为“丰富”变种,例如 RichFlatMapFunction,它具有一些其他方法,包括:

    • open(Configuration c)
    • close()
    • getRuntimeContext()
      在算子初始化期间,open()被调用一次。例如,这是加载一些静态数据或打开与外部服务的连接的时机。
      getRuntimeContext() 提供对整套可能感兴趣东西的访问,尤其是如何创建和访问Flink管理的状态。

    keyed状态示例

    在这个例子中,假设有一个要删除重复事件的流,因此只保留每个键的第一个事件。这是一个使用RichFlatMapFunction的应用,Deduplicator是RichFlatMapFunction实现:

    private static class Event {
        public final String key;
        public final long timestamp;
        ...
    }
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
        env.addSource(new EventSource())
            .keyBy(e -> e.key)
            .flatMap(new Deduplicator())
            .print();
      
        env.execute();
    }
    

    为此,Deduplicator需要以某种方式记住对于每个键是否已存在该键的事件。它将使用Flink的keyed状态接口来记忆。

    当您使用这样的keyed流时,Flink将为所管理的每个状态项维护一个键/值存储。

    Flink支持几种不同类型的keyed状态,此示例使用最简单的一种,即ValueState。这意味着Flink将为每个键存储一个对象-在这个例子中,将存储一个类型对象Boolean。

    Deduplicator类有两个方法:open()和flatMap()。通过定义ValueStateDescriptor,open方法引用并管理状态。构造函数的参数指定了keyed状态项的名称("keyHasBeenSeen"),并提供可用于序列化状态对象的信息(在本例中为Types.BOOLEAN)。

    public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
        ValueState<Boolean> keyHasBeenSeen;
        @Override
        public void open(Configuration conf) {
            ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
            keyHasBeenSeen = getRuntimeContext().getState(desc);
        }
        @Override
        public void flatMap(Event event, Collector<Event> out) throws Exception {
            if (keyHasBeenSeen.value() == null) {
                out.collect(event);
                keyHasBeenSeen.update(true);
            }
        }
    }
    

    当flatMap方法调用keyHasBeenSeen.value()时,Flink在上下文中查找该key这部分状态的值,并且只有在这个值是null时,才会讲事件收集到输出中。在这种情况下,它也会更新keyHasBeenSeen的值为true。

    这种访问和更新keyed-partitioned状态的机制看起来似乎很神奇,因为在我们Deduplicator实现中该键不是显式可见的。当Flink运行时调用RichFlatMapFunction的open方法时并没有事件,此时上下文中也没有key。但当调用flatMap方法时,正在处理事件的key是可用的,并且key在后台用于确定操作Flink的状态后端中的哪一个entry。

    当部署到分布式集群时,将有很多Deduplicator实例,每个实例负责整个键空间的不相交子集。因此,当您看到ValueState的单个条目时,例如
    ValueState keyHasBeenSeen;
    请注意理解,这不仅代表单个布尔值,而且代表分布式的,分片的键/值存储。

    清除状态

    上面的示例存在一个可能的问题:如果key空间是无界的,将会发生什么?Flink将使用在某个地方为每个不同key的实例存储一个Boolean实例。如果有一组有限的keys,那会很好,但是在keys无限增长的应用程序中,有必要清除不再需要的keys的状态。这是通过在状态对象上调用clear()来完成的,如下所示:

    keyHasBeenSeen.clear()
    

    您可能要在某个key闲置一段时间后执行此操作。在事件驱动的应用程序部分中了解关于ProcessFunction的内容时,您将看到如何使用Timers来执行此操作。

    还可以使用状态生存时间(TTL)选项进行配置,该选项可以使用状态描述符进行配置,该描述符指定何时自动清除老的密钥状态。

    Non-keyed状态

    也可以在非keyed上下文中管理状态。有时称为算子状态。所涉及的接口稍有不同,并且由于non-keyed状态的用户定义函数是不常见,因此此处不进行介绍。此功能最常用于源和接收器的实现中。

    连接流

    有时,并不是像这样应用预定义的转换:

    你想能够动态修改转换的某些方面-通过流中的阈值,规则或其他参数。Flink中支持此功能的模式被称为“连接流”,其中单个运算符具有两个输入流,如下所示:

    被连接的流还可以用于实现流join。

    例子

    在此示例中,名称为control的流用于指定从streamOfWords流中过滤掉的单词。一个名称为ControlFunction的 RichCoFlatMapFunction应用于连接流以完成此操作。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
        DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
      
        control
            .connect(datastreamOfWords)
            .flatMap(new ControlFunction())
            .print();
    
        env.execute();
    }
    

    请注意,所连接的两个流必须以兼容的方式进行keyed设置。keyBy的作用是对流的数据进行分区,并且在连接keyed流时,必须以相同的方式对它们进行分区。这样可以确保两个流中具有相同key的事件都发送到同一实例。例如,这使得可以将两个流join到一个key上。

    在这种情况下,两个流都是类型DataStream,并且两个流都由字符串作为键。正如将在下面看到的,这个RichCoFlatMapFunction是在keyed状态下存储一个布尔值,并且该布尔值由两个流共享。

    public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
        private ValueState<Boolean> blocked;
          
        @Override
        public void open(Configuration config) {
            blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
        }
          
        @Override
        public void flatMap1(String control_value, Collector<String> out) throws Exception {
            blocked.update(Boolean.TRUE);
        }
          
        @Override
        public void flatMap2(String data_value, Collector<String> out) throws Exception {
            if (blocked.value() == null) {
                out.collect(data_value);
            }
        }
    }
    

    一个RichCoFlatMapFunction是FlatMapFunction的一种,可以应用于一对连接流,它可以访问丰富的函数接口。这意味着可以将其设置为有状态。

    名称为blocked的Boolean被用来记住在control流上所提及的键(在这个例子中是单词),并且记住从streamOfWords流过滤掉出的那些词。这是键控状态,并且在两个流之间共享,这就是两个流必须共享相同键空间的原因。

    flatMap1和flatMap2被Flink运行时调用,Flink带有来自每个连接流的元素-在这个例子下,来自control流的元素被传入入flatMap1,来自streamOfWords的元素被传递到flatMap2。这取决于control.connect(datastreamOfWords)代码对两个流连接的顺序。

    重要的是要认识到flatMap1和 flatMap2回调的调用顺序是无法控制的。这两个输入流彼此竞争,并且Flink运行时将就消费一个或另一个流中的事件做它想要做的功能。如果时间和/或顺序很重要,您可能会发现有必要在管理的Flink状态下缓冲事件,直到您的应用程序准备好处理它们。(注意:如果您真的很绝望,则可以使用实现InputSelectable 接口的自定义运算符,对双输入算子消费输入元素的的顺序进行一些有限的控制 。)

    动手

    本部分附带的动手练习是 乘车和票价练习

    进一步阅读

  • 相关阅读:
    Hdu 5396 Expression (区间Dp)
    Lightoj 1174
    codeforces 570 D. Tree Requests (dfs)
    codeforces 570 E. Pig and Palindromes (DP)
    Hdu 5385 The path
    Hdu 5384 Danganronpa (AC自动机模板)
    Hdu 5372 Segment Game (树状数组)
    Hdu 5379 Mahjong tree (dfs + 组合数)
    Hdu 5371 Hotaru's problem (manacher+枚举)
    Face The Right Way---hdu3276(开关问题)
  • 原文地址:https://www.cnblogs.com/qlxm/p/14513584.html
Copyright © 2011-2022 走看看