zoukankan      html  css  js  c++  java
  • 【翻译】了解Flink-数据管道和ETL -- Learn Flink


    翻译来源-Learn Flink Data Pipelines & ETL

    Apache Flink的一种非常常见的用例是实现ETL(提取,转换,加载)管道,该管道从一个或多个源获取数据,执行一些转换和/或扩充,然后将结果存储在某个地方。在本节中,我们将研究如何使用Flink的DataStream API来实现这种应用程序。

    请注意,Flink的Table和SQL API 非常适合许多ETL用例。但是,无论您最终是否直接使用DataStream API,这里介绍的基础知识深刻的了解都会很有价值。

    无状态转换

    本节介绍map()和flatmap(),用于实现无状态转换的基本操作。本节中的示例假定您熟悉flink-training仓库中动手练习中使用的Taxi Ride数据 。

    map()

    在第一个练习中,您过滤了出租车事件流。在同一代码库中,有一个 GeoUtils类提供了一种静态方法GeoUtils.mapToGridCell(float lon, float lat),该方法将位置(经度,纬度)映射到网格单元,该网格单元指的是大小约为100x100米的区域。

    现在,通过向每个事件添加startCell和endCell字段来丰富我们的出租车乘车对象流。您可以创建一个EnrichedRide扩展对象TaxiRide,添加以下字段:

    public static class EnrichedRide extends TaxiRide {
        public int startCell;
        public int endCell;
    
        public EnrichedRide() {}
    
        public EnrichedRide(TaxiRide ride) {
            this.rideId = ride.rideId;
            this.isStart = ride.isStart;
            ...
            this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
            this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
        }
    
        public String toString() {
            return super.toString() + "," +
                Integer.toString(this.startCell) + "," +
                Integer.toString(this.endCell);
        }
    }
    

    然后,您可以创建一个转换这个流的应用程序

    DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
    
    DataStream<EnrichedRide> enrichedNYCRides = rides
        .filter(new RideCleansingSolution.NYCFilter())
        .map(new Enrichment());
    
    enrichedNYCRides.print();
    

    使用MapFunction:

    public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
    
        @Override
        public EnrichedRide map(TaxiRide taxiRide) throws Exception {
            return new EnrichedRide(taxiRide);
        }
    }
    

    flatmap()

    一个MapFunction仅在执行一对一转换时适用:对于每个进入的流元素,map()将发出一个转换后的元素。否则,您将要使用 flatmap()

    DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
    
    DataStream<EnrichedRide> enrichedNYCRides = rides
        .flatMap(new NYCEnrichment());
    
    enrichedNYCRides.print();
    

    使用FlatMapFunction:

    public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
    
        @Override
        public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
            FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
            if (valid.filter(taxiRide)) {
                out.collect(new EnrichedRide(taxiRide));
            }
        }
    }
    

    使用此接口中提供的Collector,该flatmap()方法可以发射任意数量的流元素,包括none。

    keyed 流

    keyBy()

    围绕流的一个属性对流进行分区往往很有用,以便属性相同的所有事件分组到一块。例如,假设您要查找从每个网格单元开始的最长的出租车车程。从SQL查询的角度考虑,这将意味着使用进行某种GROUP BY startCell,而在Flink中,这是通过 keyBy(KeySelector)

    rides
        .flatMap(new NYCEnrichment())
        .keyBy(enrichedRide -> enrichedRide.startCell)
    

    每一个keyBy都会导致网络混洗,从而对流进行重新分区。通常,这非常昂贵,因为它涉及网络通信以及序列化和反序列化。

    keys计算

    KeySelector不仅限于从事件中提取key。他们可以,而是计算在你想要的任何方式的key,只要所产生的key是确定的,并具有有效的hashCode()和equals()实现。此限制排除了生成随机数或返回Array或Enums的KeySelector,但是例如,只要它们的元素遵循这些相同的规则,就可以使用Tuples或POJO来使用复合键。

    keys必须以确定性的方式生成,因为它们会在需要时重新计算,而不是附加到流记录中。

    例如,不是创建一个新EnrichedRide类,而是用一个字段,我们通过以下代码使用startCell字段,

    keyBy(enrichedRide -> enrichedRide.startCell)
    

    我们按上面做,而不是按照下面:

    keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
    

    keyed流上的聚合

    此代码段创建一个新的元组流,其中包含startCell和每个乘坐结束事件的时长(以分钟为单位):

    import org.joda.time.Interval;
    
    DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
        .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
    
            @Override
            public void flatMap(EnrichedRide ride,
                                Collector<Tuple2<Integer, Minutes>> out) throws Exception {
                if (!ride.isStart) {
                    Interval rideInterval = new Interval(ride.startTime, ride.endTime);
                    Minutes duration = rideInterval.toDuration().toStandardMinutes();
                    out.collect(new Tuple2<>(ride.startCell, duration));
                }
            }
        });
    

    现在,可以生成一个流,流中仅包含每一个startCell看到(到该点为止)的最长的乘坐。

    可以使用多种方式表示要用作key的字段。前面您看到了一个带有EnrichedRide POJO的示例,其中用作键的字段是用其名称指定的。下面的情况涉及Tuple2对象,并且元组中的索引(从0开始)用于指定key。

    minutesByStartCell
      .keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
      .maxBy(1) // duration
      .print();
    

    输出流包含每个键的记录,在每个key的每次持续时间达到新的最大值时会输出,如此处50797格子中所示:

    ...
    4> (64549,5M)
    4> (46298,18M)
    1> (51549,14M)
    1> (53043,13M)
    1> (56031,22M)
    1> (50797,6M)
    ...
    1> (50797,8M)
    ...
    1> (50797,11M)
    ...
    1> (50797,12M)

    (Implicit)状态

    这是此手册中涉及有状态流的第一个示例。尽管状态是透明处理的,但是Flink必须跟踪每个不同key的最大持续时间。

    每当应用程序涉及到状态,都应该考虑该状态可能会变大。只要key空间是无界的,那么Flink需要的状态量也跟key空间一样。

    当使用流时,通常聚合在有限窗口而不是在整个流上多考虑。

    reduce() 和其他聚合器

    上面使用的maxBy()仅仅是Flink KeyedStream上许多可用的聚合器功能的一个示例。reduce()您还可以使用一个更通用的功能来实现自己的自定义聚合。

    有状态的Transformations

    为什么Flink参与管理状态?
    您的应用程序当然有能力使用状态,而无需Flink参与管理状态-但是Flink为它管理的状态提供了一些引人注目的功能:

    • 本地:Flink状态被保存在处理该状态的机器本地,并且可以以内存速度访问
    • 经久耐用:Flink状态是容错的,即定期自动保存检查点,并在发生故障时恢复
    • 垂直可扩展:Flink状态可以保留在嵌入式RocksDB实例中,该实例可以通过添加更多本地磁盘来扩容
    • 水平可伸缩:随着群集的增长和收缩,Flink状态将重新分配
    • queryable:可通过Queryable State API在外部查询Flink状态。
      在本节中,您将学习如何使用Flink的API管理keyed状态。

    丰富的功能

    此时,您已经看到Flink的几个功能接口,包括 FilterFunction,MapFunction,和FlatMapFunction。这些都是“单一抽象方法”模式的示例。
    对于每个接口,Flink还提供了一个称为“丰富”变种,例如 RichFlatMapFunction,它具有一些其他方法,包括:

    • open(Configuration c)
    • close()
    • getRuntimeContext()
      在算子初始化期间,open()被调用一次。例如,这是加载一些静态数据或打开与外部服务的连接的时机。
      getRuntimeContext() 提供对整套可能感兴趣东西的访问,尤其是如何创建和访问Flink管理的状态。

    keyed状态示例

    在这个例子中,假设有一个要删除重复事件的流,因此只保留每个键的第一个事件。这是一个使用RichFlatMapFunction的应用,Deduplicator是RichFlatMapFunction实现:

    private static class Event {
        public final String key;
        public final long timestamp;
        ...
    }
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
        env.addSource(new EventSource())
            .keyBy(e -> e.key)
            .flatMap(new Deduplicator())
            .print();
      
        env.execute();
    }
    

    为此,Deduplicator需要以某种方式记住对于每个键是否已存在该键的事件。它将使用Flink的keyed状态接口来记忆。

    当您使用这样的keyed流时,Flink将为所管理的每个状态项维护一个键/值存储。

    Flink支持几种不同类型的keyed状态,此示例使用最简单的一种,即ValueState。这意味着Flink将为每个键存储一个对象-在这个例子中,将存储一个类型对象Boolean。

    Deduplicator类有两个方法:open()和flatMap()。通过定义ValueStateDescriptor,open方法引用并管理状态。构造函数的参数指定了keyed状态项的名称("keyHasBeenSeen"),并提供可用于序列化状态对象的信息(在本例中为Types.BOOLEAN)。

    public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
        ValueState<Boolean> keyHasBeenSeen;
        @Override
        public void open(Configuration conf) {
            ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
            keyHasBeenSeen = getRuntimeContext().getState(desc);
        }
        @Override
        public void flatMap(Event event, Collector<Event> out) throws Exception {
            if (keyHasBeenSeen.value() == null) {
                out.collect(event);
                keyHasBeenSeen.update(true);
            }
        }
    }
    

    当flatMap方法调用keyHasBeenSeen.value()时,Flink在上下文中查找该key这部分状态的值,并且只有在这个值是null时,才会讲事件收集到输出中。在这种情况下,它也会更新keyHasBeenSeen的值为true。

    这种访问和更新keyed-partitioned状态的机制看起来似乎很神奇,因为在我们Deduplicator实现中该键不是显式可见的。当Flink运行时调用RichFlatMapFunction的open方法时并没有事件,此时上下文中也没有key。但当调用flatMap方法时,正在处理事件的key是可用的,并且key在后台用于确定操作Flink的状态后端中的哪一个entry。

    当部署到分布式集群时,将有很多Deduplicator实例,每个实例负责整个键空间的不相交子集。因此,当您看到ValueState的单个条目时,例如
    ValueState keyHasBeenSeen;
    请注意理解,这不仅代表单个布尔值,而且代表分布式的,分片的键/值存储。

    清除状态

    上面的示例存在一个可能的问题:如果key空间是无界的,将会发生什么?Flink将使用在某个地方为每个不同key的实例存储一个Boolean实例。如果有一组有限的keys,那会很好,但是在keys无限增长的应用程序中,有必要清除不再需要的keys的状态。这是通过在状态对象上调用clear()来完成的,如下所示:

    keyHasBeenSeen.clear()
    

    您可能要在某个key闲置一段时间后执行此操作。在事件驱动的应用程序部分中了解关于ProcessFunction的内容时,您将看到如何使用Timers来执行此操作。

    还可以使用状态生存时间(TTL)选项进行配置,该选项可以使用状态描述符进行配置,该描述符指定何时自动清除老的密钥状态。

    Non-keyed状态

    也可以在非keyed上下文中管理状态。有时称为算子状态。所涉及的接口稍有不同,并且由于non-keyed状态的用户定义函数是不常见,因此此处不进行介绍。此功能最常用于源和接收器的实现中。

    连接流

    有时,并不是像这样应用预定义的转换:

    你想能够动态修改转换的某些方面-通过流中的阈值,规则或其他参数。Flink中支持此功能的模式被称为“连接流”,其中单个运算符具有两个输入流,如下所示:

    被连接的流还可以用于实现流join。

    例子

    在此示例中,名称为control的流用于指定从streamOfWords流中过滤掉的单词。一个名称为ControlFunction的 RichCoFlatMapFunction应用于连接流以完成此操作。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
        DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
      
        control
            .connect(datastreamOfWords)
            .flatMap(new ControlFunction())
            .print();
    
        env.execute();
    }
    

    请注意,所连接的两个流必须以兼容的方式进行keyed设置。keyBy的作用是对流的数据进行分区,并且在连接keyed流时,必须以相同的方式对它们进行分区。这样可以确保两个流中具有相同key的事件都发送到同一实例。例如,这使得可以将两个流join到一个key上。

    在这种情况下,两个流都是类型DataStream,并且两个流都由字符串作为键。正如将在下面看到的,这个RichCoFlatMapFunction是在keyed状态下存储一个布尔值,并且该布尔值由两个流共享。

    public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
        private ValueState<Boolean> blocked;
          
        @Override
        public void open(Configuration config) {
            blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
        }
          
        @Override
        public void flatMap1(String control_value, Collector<String> out) throws Exception {
            blocked.update(Boolean.TRUE);
        }
          
        @Override
        public void flatMap2(String data_value, Collector<String> out) throws Exception {
            if (blocked.value() == null) {
                out.collect(data_value);
            }
        }
    }
    

    一个RichCoFlatMapFunction是FlatMapFunction的一种,可以应用于一对连接流,它可以访问丰富的函数接口。这意味着可以将其设置为有状态。

    名称为blocked的Boolean被用来记住在control流上所提及的键(在这个例子中是单词),并且记住从streamOfWords流过滤掉出的那些词。这是键控状态,并且在两个流之间共享,这就是两个流必须共享相同键空间的原因。

    flatMap1和flatMap2被Flink运行时调用,Flink带有来自每个连接流的元素-在这个例子下,来自control流的元素被传入入flatMap1,来自streamOfWords的元素被传递到flatMap2。这取决于control.connect(datastreamOfWords)代码对两个流连接的顺序。

    重要的是要认识到flatMap1和 flatMap2回调的调用顺序是无法控制的。这两个输入流彼此竞争,并且Flink运行时将就消费一个或另一个流中的事件做它想要做的功能。如果时间和/或顺序很重要,您可能会发现有必要在管理的Flink状态下缓冲事件,直到您的应用程序准备好处理它们。(注意:如果您真的很绝望,则可以使用实现InputSelectable 接口的自定义运算符,对双输入算子消费输入元素的的顺序进行一些有限的控制 。)

    动手

    本部分附带的动手练习是 乘车和票价练习

    进一步阅读

  • 相关阅读:
    Redis配置文件详解
    linux系统配置Apache虚拟主机实例
    nginx File not found 错误分析与解决方法
    svn配置使用
    linux下svn命令使用大全
    Kendo UI For ASP.NET MVC项目资源
    ReSharper 配置及用法
    SQL判断某列中是否包含中文字符、英文字符、纯数字 (转)
    Visual Studio最好用的快捷键
    19个必须知道的Visual Studio快捷键
  • 原文地址:https://www.cnblogs.com/qlxm/p/14513584.html
Copyright © 2011-2022 走看看