一:连接外部存储系统的方式
flink是新一代的流式计算引擎,它需要从不同的第三方存储引擎读取数据,进行一定的处理,写出到不同的存储引擎,Connector就相当于是一个连接器,连接flink系统和外界存储系统。
常用的连接方式有以下几种:
flink内部预定义的source和sink
flink内部提供了一些Boundled connector
使用第三方Apache Bahir项目中的连接器
通过异步IO的方式
二:每种连接方式的简单说明
2.1 预定义的source和sink
大致分为以下几类:
基于文件:
source:readTextFile,readFile sink:writeAsText,writeAsCsv
基于socket
source:socketTextStream sink:writeToSocket
基于Collection,iterators
source:fromCollection,fromElement sink:print,printoToError
2.2 Boundled connector
flink内部提供了一些source和sink,例如:kafka的source和sink,es的sink。
常用的有以下几个:
Apache Kafka(source/sink)
Apache Cassandra(sink)
ElasticSearch(sink)
Hdfs(sink)
RabbitMQ(source/sink)
以上connector是flink的一部分,但是不在flink的二进制发布包中,需要从网上下载jar包或者使用Maven依赖。
2.3 Apache Bahir中的连接器
Apache Bahir 最初是从 Apache Spark 中独立出来项目提供,以提供不限于 Spark 相关的扩展/插件、连接器和其他可插入组件的实现。通过提供多样化的流连接器(streaming connectors)和 SQL 数据源扩展分析平台的覆盖面。如有需要写到 flume、redis 的需求的话,可以使用该项目提供的 connector。
常用的有以下几个:
Apache ActiveMQ(source/sink)
Apache Flume(sink)
Redis(sink)
akka(sink)
netty(source)
2.4 Async I/O
流计算中经常需要与外部存储系统交互,比如需要关联 MySQL 中的某个表。一般来说,如果用同步 I/O 的方式,会造成系统中出现大的等待时间,影响吞吐和延迟。为了解决这个问题,异步 I/O 可以并发处理多个请求,提高吞吐,减少延迟。
主要用于读取外部数据库,例如mysql,oracle,hbase等。
三:flink connect kafka
我们比较常用的就是使用flink读取kafka,也就是消费kafka的数据,kafka是一个分布式的、分区的、多副本的、 支持高吞吐的、发布订阅消息系统。生产环境环境中也经常会跟 kafka 进行一些数据的交换,比如利用 kafka consumer 读取数据,然后进行一系列的处理之后,再将结果写出到 kafka 中。这里会主要分两个部分进行介绍,一是 Flink kafka Consumer,一个是 Flink kafka Producer。
3.1 反序列化数据
因为 kafka 中数据都是以二进制 byte 形式存储的。读到 Flink 系统中之后,需要将二进制数据转化为具体的 java、scala 对象。具体需要实现一个 schema 类,定义如何序列化和反序列数据。反序列化时需要实现 DeserializationSchema 接口,并重写 deserialize(byte[] message) 函数,如果是反序列化 kafka 中 kv 的数据时,需要实现 KeyedDeserializationSchema 接口,并重写 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) 函数。
3.2 消费起始位置设置
-
setStartFromGroupOffsets,也是默认的策略,从 group offset 位置读取数据,group offset 指的是 kafka broker 端记录的某个 group 的最后一次的消费位置。但是 kafka broker 端没有该 group 信息,会根据 kafka 的参数”auto.offset.reset”的设置来决定从哪个位置开始消费。
-
setStartFromEarliest,从 kafka 最早的位置开始读取。
-
setStartFromLatest,从 kafka 最新的位置开始读取。
-
setStartFromTimestamp(long),从时间戳大于或等于指定时间戳的位置开始读取。Kafka 时戳,是指 kafka 为每条消息增加另一个时戳。该时戳可以表示消息在 proudcer 端生成时的时间、或进入到 kafka broker 时的时间。
-
setStartFromSpecificOffsets,从指定分区的 offset 位置开始读取,如指定的 offsets 中不存某个分区,该分区从 group offset 位置开始读取。此时需要用户给定一个具体的分区、offset 的集合。
3.3 topic和partition的动态发现
实际的生产环境中可能有这样一些需求,比如场景一,有一个 Flink 作业需要将五份数据聚合到一起,五份数据对应五个 kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感知新的 topic。场景二,作业从一个固定的 kafka topic 读数据,开始该 topic 有 10 个 partition,但随着业务的增长数据量变大,需要对 kafka partition 个数进行扩容,由 10 个扩容到 20。该情况下如何在不重启作业情况下动态感知新扩容的 partition?
3.3 commit offset方式
Flink kafka consumer commit offset 方式需要区分是否开启了 checkpoint。
3.4 flink kafka producer
使用 FlinkKafkaProducer 往 kafka 中写数据时,如果不单独设置 partition 策略,会默认使用 FlinkFixedPartitioner,该 partitioner 分区的方式是 task 所在的并发 id 对 topic 总 partition 数取余:parallelInstanceId % partitions.length。
-
此时如果 sink 为 4,paritition 为 1,则 4 个 task 往同一个 partition 中写数据。但当 sink task < partition 个数时会有部分 partition 没有数据写入,例如 sink task 为2,partition 总数为 4,则后面两个 partition 将没有数据写入。
-
如果构建 FlinkKafkaProducer 时,partition 设置为 null,此时会使用 kafka producer 默认分区方式,非 key 写入的情况下,使用 round-robin 的方式进行分区,每个 task 都会轮循的写下游的所有 partition。该方式下游的 partition 数据会比较均衡,但是缺点是 partition 个数过多的情况下需要维持过多的网络连接,即每个 task 都会维持跟所有 partition 所在 broker 的连接。