DataStream API编程指导
文档翻译自Flink DataStream API Programming Guide
-----------------------------------------------------------------------
Flink中的DataStream程序是实现在数据流上的transformation(如filtering,updating state, defining windows,aggregating)的普通程序。创建数据流的来源多种多样(如消息队列,socket流,文件等)。程序通过data sink返回结果,如将数据写入文件,或发送到标准输出(如命令行终端)。Flink程序可以在多种上下文中运行,如独立运行或是嵌入在其他程序中执行。程序的执行可以发生在本地JVM,或者在一个拥有许多设备的集群上。
有关介绍Flink API基础概念的文档,请见basic concepts
为了创建你自己的Flink DataStream程序,我们鼓励你从文档anatomy of a Flink Program开始,且欢迎你添加自己的transformations。该文档接下来的部分是额外的operation和进阶特性的参考文档。
一、示例程序
下面的程序是一个完整的流式窗口word count应用,它计算出在web socket的大小为5秒的窗口中的出现各个单词的数量。你可以复制 & 粘贴代码并在本地运行。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String,
Integer>> dataStream = env
.socketTextStream("localhost",
9999)
.flatMap(new
Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public
static class Splitter implements
FlatMapFunction<String, Tuple2<String,
Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
要运行该示例程序,首先从终端运行netcat来开始输入流
nc -lk 9999
仅需要输入一些单词,这些将是word count程序的输入数据。如果你想看到count大于1的结果,在5秒内重复输入同一个单词。
二、DataStream Transformations
Data transformation会将一或多个DataStream转换成一个新的DataStream。程序可以将多个transformation结合形成复杂的拓扑结构(topology)。
本小节给出了所有可用的transformation的描述。
Transformation |
描述 |
Map DataStream -> DataStream |
获取一个element并产出一个element。下例是一个将输入*2的map方法: DataStream<Integer>
dataStream = //... |
FlapMap DataStream -> DataStream |
获取一个element,并产生出0、1或多个element。下例是一个为句子分词的flatmap方法
dataStream.flatMap(new FlatMapFunction<String, String>() { |
Filter DataStream -> DataStream |
在每个获取的element上运行一个boolean方法,留下那些方法返回true的element。下例是一个过滤掉0值的filter dataStream.filter(new FilterFunction<Integer>() { |
KeyBy |
将流逻辑分为不相交的分区,每个分区包含的都是具有相同key的element,该分区方法使用hash分区实现。定义key的方法见于Keys。下例是一个返回KeyedDataStream的transformation。 dataStream.keyBy("someKey") // Key by field
"someKey" |
Reduce KeyedStream -> DataStream |
一个在keyed data stream上“滚动”进行的reduce方法。将上一个reduce过的值和当前element结合,产生新的值并发送出。下例是一个创建部分和的reduce方法。 keyedStream.reduce(new ReduceFunction<Integer>() { |
Fold KeyedStream -> DataStream |
一个在带有初始值的数据流上“滚动”进行的fold方法。将上一个fold的值和当前element结合,产生新的值并发送出。下例是一个fold方法,当应用于序列{1, 2,
3, 4, 5}时,它发出序列{"start-1",
"start-1-2", "start-1-2-3" …}。 DataStream<String>
result = keyedStream.fold("start", new FoldFunction<Integer,
String>() { |
Aggregations KeyedStream -> DataStream |
在一个keyed DataStream上“滚动”进行聚合的方法。其中,min和minBy的区别在于min返回最小值,而minBy返回的是带有在此域中最小值的element(max和maxBy一样如此)。 keyedStream.sum(0); |
Window KeyedStream - > WindowedStream |
Window可以定义在已经分区的KeyedStream上。窗口将根据一些特征(如最近5秒到达的数据)将数据按其各自的key集合在一起。有关窗口的完整描述见于windows // Last 5 seconds of data dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); |
WindowAll DataStream -> AllWindowedStream |
Window可以定义在普通的DataStream上。窗口将根据一些特征(如最近5秒到达的数据)将所有Stream事件集合在一起。有关窗口的完整描述见于windows
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data |
Window Apply WindowedStream -> DataStream AllWindowedStream -> DataStream |
将一个一般函数应用到window整体上去,下面是一个人工计算window中所有element的总和的应用。 windowedStream.apply (new WindowFunction<Tuple2<String,Integer>,
Integer, Tuple, Window>() { // applying an AllWindowFunction on non-keyed window stream |
Window Reduce WindowedStream -> DataStream |
对窗口应用一个功能性reduce方法并返回reduce的结果 windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>()
{ |
Window Fold Windowed Stream -> DataStream |
对窗口应用一个功能性fold方法。下例代码在应用到序列(1, 2,
3, 4, 5)时,它将该序列fold成为字符串"start-1-2-3-4-5" windowedStream.fold("start-", new FoldFunction<Integer,
String>() { |
Aggregations on windows WindowedStream -> DataStream |
对窗口中的内容聚合。其中,min和minBy的区别在于min返回最小值,而minBy返回的是带有在此域中最小值的element(max和maxBy一样如此)。 windowedStream.sum(0); |
Union DataStream* -> DataStream |
将2个或多个data stream合并创建出一个新的包含所有stream的element的stream。注意:如果你对一个data
stream自己进行union操作,则在返回的结果中,每个element都会出现2个。 dataStream.union(otherStream1, otherStream2, ...); |
Window Join DataStream, DataStream -> DataStream |
在给定key和普通window中,将2个DataStream进行Join操作
dataStream.join(otherStream) |
Window CoGroup DataStream, DataStream -> DataStream |
在给定key和普通window中,对2个DataStream进行CoGroup操作。 dataStream.coGroup(otherStream) |
Connect DataStream, DataStream -> ConnectedStreams |
在保留两个DataStream的类型的情况下,将二者"连接"起来。Connect使我们可以共享两个Stream的状态 DataStream<Integer>
someStream = //... ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream); |
CoMap, CoFlatMap ConnectedStreams -> DataStream |
该操作类似于map和flatMap针对连接的Data
Stream版本。Sd connectedStreams.map(new
CoMapFunction<Integer, String, Boolean>() { @Override
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() { @Override @Override |
Split DataStream -> SplitStream |
根据某些标准将Stream分割成2个或更多的stream SplitStream<Integer>
split = someDataStream.split(new OutputSelector<Integer>()
{ |
Select SplitStream -> DataStream |
从SplitStream中选择1个或多个stream
SplitStream<Integer>
split; |
Iterate DataStream -> IterativeStream -> DataStream |
通过将一个Operator的输出重定向到前面的某个Operator的方法,在数据流图中创建一个“反馈”循环。这在定义持续更新模型的算法时十分有用。下面的例子从一个Stream开始,并持续应用迭代体(Iteration
body)。大于0的element被送回到反馈通道,而其他的element则被转发到下游。相关完整描述请见Iterations IterativeStream<Long>
iteration = initialStream.iterate(); |
Extract Timestamps DataStream -> DataStream |
通过从数据中抽取时间戳来使得通过使用事件时间语义的窗口可以工作。详情见于Event Time。 stream.assignTimestamps (new TimeStampExtractor() {...}); |
接下来的Transformation是对Tuple类型的data stream可用的Transformation:
Transformation |
描述 |
Project DataStream -> DataStream |
从tuple中选择出域的子集而产生新的DataStream DataStream<Tuple3<Integer,
Double, String>> in = //
[...] |
物理级分割(Physical Partitioning)
如果需要,Flink同样提供了在进行一次transformation后针对精确stream分割的低层次的控制(low-level control),它们通过以下几个方法实现。
Transformations |
描述 |
Custom partitioning DataStream -> DataStream |
使用一个用户自定义的Partitioner来对每个element选择目标任务sd dataStream.partitionCustom(partitioner,
"someKey"); |
Random partitioning DataStream -> DataStream |
根据均匀分布来随机分割element dataStream.shuffle(); |
Rebalancing(轮询分割) DataStream -> DataStream |
轮询分割element,创建相同负荷的分割。对数据变形(data
skew)时的性能优化十分有用s dataStream.rebalance(); |
Rescaling DataStream -> DataStream |
将element轮询分割到下游Operator子集中去。这在你想流水线并行时十分有用,例如,需要从每个并行的source实例中将数据fan out到一个有着一些mapper来分发负载,但是又不想要函数rebalance()那样引起的完全rebalance的效果时。这就需要仅在本地传输数据,而不是需要从网络传输,这需要依赖其他诸如TaskManager的任务槽数量等等configuration值。
dataStream.rescale(); |
Broadcasting DataStream -> DataStream |
将element广播到每一个分割中去 dataStream.broadcast(); |
链接任务以及资源组(Task chaining & resource groups)
将两个transformation链接起来意味着将它们部署在一起(co-locating),共享同一个线程来获得更好的性能。Flink默认地尽可能地链接Operator(如两个连续的map transformation)。如有需要,API还给出了细粒度的链接控制:
使用StreamExecutionEnvironment.disableOperatorChaining()来关闭整个Job的链接操作。下面表格中的方法则是更加细粒度的控制函数,注意,由于这些函数引用的是前一个transformation,所以它们仅仅在一个DataStream的transformation后使用才是正确的,例如someStream.map( … ).startNewChain()是正确的,而someStream.startNewChain()是错误的。
一个资源组就是Flink中的一个任务槽,如有需要,你可以人工孤立某个Operator到一个独立的任务槽中。
Transformation |
描述 |
startNewChain() |
以当前Operator起点,开始一个新的链接。在下例中,两个mapper将会被链接而filter则不会与第一个mapper链接 someStream.filter(...).map(...).startNewChain().map(...); |
disableChaining() |
下例中,将不会链接mapOperator。 someStream.map(...).disableChaining(); |
slotSharingGroup() |
设置一个Operation的共享任务槽的分组。Flink将会把同一个任务槽共享组的Operation放到同一个任务槽中,而不在同一个任务槽共享组的Operation放到其他任务槽中。这可以用来孤立任务槽。如果所有的输入Operation都在同一个任务槽共享组中,则该任务槽共享组会继承下来。任务槽共享组的默认名为"default",Operation可以通过调用slotSharingGroup("default")来定义其名称。 someStream.filter(...).slotSharingGroup("name"); |
三、数据源
数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)来创建数据源。你可以使用Flink提供的source方法,也可以通过实现SourceFunction来编写自定义的非并行数据源,也可以通过实现ParallelSourceFunction接口或继承RichParallelSourceFunction来编写自定义并行数据源。
以下是几个预定义的数据流源,可以通过StreamExecutionEnvironment来访问:
1. 基于文件的:
· readTextFile(path) / TextInputFormat - 以行读取方式读文件并返回字符串
· readFile(path) / 任意输入格式 - 按用输入格式的描述读取文件
· readFileStream - 创建一个stream,在文件有改动时追加element
2. 基于Socket的:
· socketTextStream - 从socket读取,element可以通过分割符来分开
3. 基于Collection的:
· fromCollection(Collection) - 从Java.util.Collection创建一个数据流。collection中所有的element都必须是同一类型的。
· fromCollection(Iterator, Class) - 从一个迭代器中创建一个数据流。class参数明确了迭代器返回的element的类型。
· fromElement(T …) - 从一个给定的对象序列创建一个数据流。所有对象都必须是同一类型的。
· fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建一个并行数据流。class参数明确了迭代器返回的element的类型。
· generateSequence(from, to) - 从一个给定区间中生成一个并行数字序列。
4. 自定义:
· addSource - 附上一个新的source方法。例如,通过调用addSource(new FlinkKafkaConsumer08<>(…))来从Apache Kafka读取数据,更多信息见于connector
四、Data Sink
Data Sink消耗DataStream并将它们转发到文件、socket、外部系统或打印它们。Flink自带了许多内置的输出格式,封装为DataStream的operation中:
· writeAsText() / TextOutputFormat - 以行字符串的方式写文件,字符串通过调用每个element的toString()方法获得。
· writeAsCsv(…) / CsvOutputFormat - 以逗号分隔的值来讲Tuple写入文件,行和域的分隔符是可以配置的。每个域的值是通过调用object的toString()方法获得的。
· print() / printToErr() - 将每个element的toString()值打印在标准输出 / 标准错误流中。可以提供一个前缀(msg)作为输出的前缀,使得在不同print的调用可以互相区分。如果并行度大于1,输出也会以task的标识符(identifier)为产生的输出的前缀。
· writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出所用的方法和基类,支持自定义object到byte的转换。
· writeToSocket - 依据SerializationSchema将element写到socket中。
· addSink - 调用自定义sink方法,Flink自带连接到其他系统的connector(如Apache Kafka),这些connector都以sink方法的形式实现。
注意DataStream的write*()函数主要用于debug,它们不参与Flink的检查点,这意味着这些方法通常处于“至少一次(at-least-once)“的执行语义下。flush到目标系统的数据依赖于OutputFormat的实现,这意味着不是所有发送到OutputFormat的element都会立即出现在目标系统中,此外,在失效的情况下,这些数据很可能会丢失。
故为了可靠性以及将stream“恰好一次(exact once)”地传入文件系统,我们应当使用flink-connector-filesystem。此外,通过实现“.addSink(…)”的自定义内容会参加Flink的检查点机制,故会保证“恰好一次”的执行语义。
五、迭代(Iterations)
迭代流程序实现了一个阶段方法并将之嵌入到一个IterativeStream中。作为一个可能永远不会结束的程序,它没有最大迭代数,反之,你需要使用split或filter的transformation来明确流的哪一部分会被反馈到迭代中,哪一部分则继续转发到下游。这里,我们使用filter作为例子,我们定义IterativeStream:
IterativeStream<Integer> iteration = input.iterate();
然后,我们定义在循环中将要进行的逻辑处理,我们通过一系列transformation来实现(这里用了一个简单的map transformation):
DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);
我们可以调用IterativeStream的closeWith(feedbackStream)函数来关闭一个迭代并定义迭代尾。传递给closeWith方法的DataStream将会反馈回迭代头。分割出用来反馈的stream的部分和向前传播的stream部分通常的方法便是使用filter来进行分割。这些filter可以定义诸如"termination"逻辑,即element将会传播到下游,而不是被反馈回去。
iteration.closeWith(iterationBody.filter(/*
one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/*
some other part of the stream */);
默认地,反馈的那部分流将会自动设置为迭代头的输入,要想重载该行为,用户需要设置closeWith函数中的一个boolean参数。例如,下面是一个持续将整数序列中的数字减1知道它们变为0的程序:
DataStream<Long> someIntegers = env.generateSequence(0, 1000);
IterativeStream<Long> iteration = someIntegers.iterate();
DataStream<Long>
minusOne = iteration.map(new
MapFunction<Long, Long>() {
@Override
public Long map(Long
value) throws Exception {
return value - 1
;
}
});
DataStream<Long>
stillGreaterThanZero = minusOne.filter(new
FilterFunction<Long>() {
@Override
public boolean
filter(Long
value) throws Exception {
return (value > 0);
}
});
iteration.closeWith(stillGreaterThanZero);
DataStream<Long>
lessThanZero = minusOne.filter(new
FilterFunction<Long>() {
@Override
public boolean
filter(Long
value) throws Exception {
return (value <= 0);
}
});
六、执行参数
StreamExecutionEnvironment包含ExecutionConfig,它可以使用户设置job的确切运行时配置值。
请参考execution configuration来查看参数的解释。特别的,以下这些参数仅适用于DataStream API:
enableTimestamps() / disableTimestamps():在每一个source发出的事件上附加上一个时间戳。函数areTimestampsEnabled()可以返回该状态的当前值。
setAutoWatermarkInterval(long milliseconds):设置自动水印发布(watermark emission)区间。你可以通过调用函数getAutoWatermarkInterval()来获取当前值。
6.1 容错
文档Fault Tolerance Documentation描述了打开并配置Flink的检查点机制的选项和参数
6.2 控制执行时间
默认的,element在网络传输时不是一个个单独传输的(这会导致不必要的网络流量),而是缓存后传输。缓存(是在设备间传输的实际单位)的大小可以在Flink的配置文件中设置。尽管该方法有益于优化吞吐量,他会在stream到达不够快时导致执行时间方面的问题。为了控制吞吐量和执行时间,你可以在执行环境(或独立的Operator)中调用env.setBufferTimeout(timeoutMillis)来设置等待装满buffer的最大等待时间,在这个时间过后,不管buffer是否已满,它都会自动发出。该默认超时时间是100ms。下例是设置API的用法:
LocalStreamEnvironment
env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
要最大化吞吐量,设置setBufferTimeout(-1)来去除超时时间,则buffer仅在它满后才会被flush。要最小化执行时间,设置timeout为一个接近0的数字(如5ms或10ms)。应当避免设置Timeout为0,因为它会造成严重的性能下降。
七、Debugging
在分布式集群上运行流程序之前,确保算法正确执行很重要。因此,实现数据分析程序通常需要递增的检查结果、debug、优化的过程。
Flink提供了可以显著简化数据分析程序的开发过程的特性,即可以在IDE中本地进行debug、注入测试数据、以及结果数据的收集等。本节对如何简化Flink程序开发提出几点建议。
7.1 本地执行环境
LocalStreamEnvironment在创建它的同一个JVM进程下创建Flink系统。如果你从IDE中启动一个LocalEnvironment,你可以在代码中设置断点来简单地debug你的程序。下例为LocalEnvironment是如何创建并使用的:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<String>
lines = env.addSource(/*
some source */);
// build your program
env.execute();
7.2 Collection数据源
Flink提供基于Java collection的特殊数据源来方便测试。一旦程序测试之后,source和sink可以简单地替代为对外部系统的读取/写出的source和sink。Collection数据源使用方法如下:
// Create a DataStream from a list of
elements
DataStream<Integer> myInts = env.fromElements(1,
2, 3,
4, 5);
//
Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data =
...
DataStream<Tuple2<String, Integer>>
myTuples = env.fromCollection(data);
//
Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt,
Long.class);
注意:当前Collection数据源需要实现Serializable接口的数据类型和迭代器。此外,Collection数据源无法并行执行(并行度=1)
7.3 迭代器Data Sink
Flink同样提供了一个收集测试和debug的DataStream结果的sink,它的使用方式如下:
import org.apache.flink.contrib.streaming.DataStreamUtils
DataStream<Tuple2<String,
Integer>> myResult = ...
Iterator<Tuple2<String, Integer>>
myOutput = DataStreamUtils.collect(myResult)