zoukankan      html  css  js  c++  java
  • 理解flink connector

    一:连接外部存储系统的方式

    flink是新一代的流式计算引擎,它需要从不同的第三方存储引擎读取数据,进行一定的处理,写出到不同的存储引擎,Connector就相当于是一个连接器,连接flink系统和外界存储系统。

    常用的连接方式有以下几种:

      flink内部预定义的source和sink

      flink内部提供了一些Boundled connector

      使用第三方Apache Bahir项目中的连接器

      通过异步IO的方式

    二:每种连接方式的简单说明

    2.1 预定义的source和sink

    大致分为以下几类:

    基于文件:

      source:readTextFile,readFile        sink:writeAsText,writeAsCsv

    基于socket

      source:socketTextStream          sink:writeToSocket

    基于Collection,iterators

      source:fromCollection,fromElement     sink:print,printoToError

    2.2 Boundled connector

      flink内部提供了一些source和sink,例如:kafka的source和sink,es的sink。

      常用的有以下几个:

        Apache Kafka(source/sink)

        Apache Cassandra(sink)

        ElasticSearch(sink)

        Hdfs(sink)

        RabbitMQ(source/sink)

      以上connector是flink的一部分,但是不在flink的二进制发布包中,需要从网上下载jar包或者使用Maven依赖。

    2.3 Apache Bahir中的连接器

      Apache Bahir 最初是从 Apache Spark 中独立出来项目提供,以提供不限于 Spark 相关的扩展/插件、连接器和其他可插入组件的实现。通过提供多样化的流连接器(streaming connectors)和 SQL 数据源扩展分析平台的覆盖面。如有需要写到 flume、redis 的需求的话,可以使用该项目提供的 connector。

      常用的有以下几个:

      Apache ActiveMQ(source/sink)

      Apache Flume(sink)

      Redis(sink)

      akka(sink)

      netty(source)

    2.4 Async I/O

      流计算中经常需要与外部存储系统交互,比如需要关联 MySQL 中的某个表。一般来说,如果用同步 I/O 的方式,会造成系统中出现大的等待时间,影响吞吐和延迟。为了解决这个问题,异步 I/O 可以并发处理多个请求,提高吞吐,减少延迟。

      主要用于读取外部数据库,例如mysql,oracle,hbase等。

    三:flink connect kafka

      我们比较常用的就是使用flink读取kafka,也就是消费kafka的数据,kafka是一个分布式的、分区的、多副本的、 支持高吞吐的、发布订阅消息系统。生产环境环境中也经常会跟 kafka 进行一些数据的交换,比如利用 kafka consumer 读取数据,然后进行一系列的处理之后,再将结果写出到 kafka 中。这里会主要分两个部分进行介绍,一是 Flink kafka Consumer,一个是 Flink kafka Producer。

    3.1 反序列化数据

      因为 kafka 中数据都是以二进制 byte 形式存储的。读到 Flink 系统中之后,需要将二进制数据转化为具体的 java、scala 对象。具体需要实现一个 schema 类,定义如何序列化和反序列数据。反序列化时需要实现 DeserializationSchema 接口,并重写 deserialize(byte[] message) 函数,如果是反序列化 kafka 中 kv 的数据时,需要实现 KeyedDeserializationSchema 接口,并重写 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) 函数。

       另外 Flink 中也提供了一些常用的序列化反序列化的 schema 类。例如,SimpleStringSchema,按字符串方式进行序列化、反序列化。TypeInformationSerializationSchema,它可根据 Flink 的 TypeInformation 信息来推断出需要选择的 schema。JsonDeserializationSchema 使用 jackson 反序列化 json 格式消息,并返回 ObjectNode,可以使用 .get(“property”) 方法来访问相应字段。

    3.2 消费起始位置设置

    • setStartFromGroupOffsets,也是默认的策略,从 group offset 位置读取数据,group offset 指的是 kafka broker 端记录的某个 group 的最后一次的消费位置。但是 kafka broker 端没有该 group 信息,会根据 kafka 的参数”auto.offset.reset”的设置来决定从哪个位置开始消费。

    • setStartFromEarliest,从 kafka 最早的位置开始读取。

    • setStartFromLatest,从 kafka 最新的位置开始读取。

    • setStartFromTimestamp(long),从时间戳大于或等于指定时间戳的位置开始读取。Kafka 时戳,是指 kafka 为每条消息增加另一个时戳。该时戳可以表示消息在 proudcer 端生成时的时间、或进入到 kafka broker 时的时间。

    • setStartFromSpecificOffsets,从指定分区的 offset 位置开始读取,如指定的 offsets 中不存某个分区,该分区从 group offset 位置开始读取。此时需要用户给定一个具体的分区、offset 的集合。

    3.3 topic和partition的动态发现

       实际的生产环境中可能有这样一些需求,比如场景一,有一个 Flink 作业需要将五份数据聚合到一起,五份数据对应五个 kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感知新的 topic。场景二,作业从一个固定的 kafka topic 读数据,开始该 topic 有 10 个 partition,但随着业务的增长数据量变大,需要对 kafka partition 个数进行扩容,由 10 个扩容到 20。该情况下如何在不重启作业情况下动态感知新扩容的 partition?

       针对上面的两种场景,首先需要在构建 FlinkKafkaConsumer 时的 properties 中设置 flink.partition-discovery.interval-millis 参数为非负值,表示开启动态发现的开关,以及设置的时间间隔。此时 FlinkKafkaConsumer 内部会启动一个单独的线程定期去 kafka 获取最新的 meta 信息。针对场景一,还需在构建 FlinkKafkaConsumer 时,topic 的描述可以传一个正则表达式描述的 pattern。每次获取最新 kafka meta 时获取正则匹配的最新 topic 列表。针对场景二,设置前面的动态发现参数,在定期获取 kafka 最新 meta 信息时会匹配新的 partition。为了保证数据的正确性,新发现的 partition 从最早的位置开始读取。

    3.3 commit offset方式

      Flink kafka consumer commit offset 方式需要区分是否开启了 checkpoint。

       如果 checkpoint 关闭,commit offset 要依赖于 kafka 客户端的 auto commit。需设置 enable.auto.commit,auto.commit.interval.ms 参数到 consumer properties,就会按固定的时间间隔定期 auto commit offset 到 kafka。
       如果开启 checkpoint,这个时候作业消费的 offset 是 Flink 在 state 中自己管理和容错。此时提交 offset 到 kafka,一般都是作为外部进度的监控,想实时知道作业消费的位置和 lag 情况。此时需要 setCommitOffsetsOnCheckpoints 为 true 来设置当 checkpoint 成功时提交 offset 到 kafka。此时 commit offset 的间隔就取决于 checkpoint 的间隔,所以此时从 kafka 一侧看到的 lag 可能并非完全实时,如果 checkpoint 间隔比较长 lag 曲线可能会是一个锯齿状。

    3.4 flink kafka producer

       使用 FlinkKafkaProducer 往 kafka 中写数据时,如果不单独设置 partition 策略,会默认使用 FlinkFixedPartitioner,该 partitioner 分区的方式是 task 所在的并发 id 对 topic 总 partition 数取余:parallelInstanceId % partitions.length。

    • 此时如果 sink 为 4,paritition 为 1,则 4 个 task 往同一个 partition 中写数据。但当 sink task < partition 个数时会有部分 partition 没有数据写入,例如 sink task 为2,partition 总数为 4,则后面两个 partition 将没有数据写入。

    • 如果构建 FlinkKafkaProducer 时,partition 设置为 null,此时会使用 kafka producer 默认分区方式,非 key 写入的情况下,使用 round-robin 的方式进行分区,每个 task 都会轮循的写下游的所有 partition。该方式下游的 partition 数据会比较均衡,但是缺点是 partition 个数过多的情况下需要维持过多的网络连接,即每个 task 都会维持跟所有 partition 所在 broker 的连接。

  • 相关阅读:
    高精度模板(未完待续)
    $CH0201$ 费解的开关
    $POJ2288$ $Islands$ $and$ $Bridges$
    luoguP1445 [Violet]樱花
    P3694 邦邦的大合唱站队
    [NOI2009]管道取珠
    [AHOI2006]基因匹配
    luogu P3411 序列变换
    HNOI2001 产品加工
    牛客ACM赛 B [小a的旅行计划 ]
  • 原文地址:https://www.cnblogs.com/lyr999736/p/12097044.html
Copyright © 2011-2022 走看看