zoukankan      html  css  js  c++  java
  • Flink Program Guide (2) -- 综述 (DataStream API编程指导 -- For Java)

    DataStream API编程指导

    文档翻译自Flink DataStream API Programming Guide

    -----------------------------------------------------------------------

    Flink中的DataStream程序是实现在数据流上的transformation(filteringupdating state defining windowsaggregating)的普通程序。创建数据流的来源多种多样(如消息队列,socket流,文件等)。程序通过data sink返回结果,如将数据写入文件,或发送到标准输出(如命令行终端)。Flink程序可以在多种上下文中运行,如独立运行或是嵌入在其他程序中执行。程序的执行可以发生在本地JVM,或者在一个拥有许多设备的集群上。

     

    有关介绍Flink API基础概念的文档,请见basic concepts

     

    为了创建你自己的Flink DataStream程序,我们鼓励你从文档anatomy of a Flink Program开始,且欢迎你添加自己的transformations。该文档接下来的部分是额外的operation和进阶特性的参考文档。

    一、示例程序

    下面的程序是一个完整的流式窗口word count应用,它计算出在web socket的大小为5秒的窗口中的出现各个单词的数量。你可以复制 & 粘贴代码并在本地运行。

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import
    org.apache.flink.api.java.tuple.Tuple2;
    import
    org.apache.flink.streaming.api.datastream.DataStream;
    import
    org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import
    org.apache.flink.streaming.api.windowing.time.Time;
    import
    org.apache.flink.util.Collector;

    public class WindowWordCount {

    public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Integer>> dataStream = env
      .
    socketTextStream("localhost", 9999)
      .
    flatMap(new Splitter())
      .
    keyBy(0)
      .
    timeWindow(Time.seconds(5))
      .
    sum(1);

    dataStream.print();

    env.execute("Window WordCount");

    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override

    public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {

    for (String word: sentence.split(" ")) {

    out.collect(new Tuple2<String, Integer>(word, 1));

    }

    }

    }

    }

    要运行该示例程序,首先从终端运行netcat来开始输入流

    nc -lk 9999

     

    仅需要输入一些单词,这些将是word count程序的输入数据。如果你想看到count大于1的结果,在5秒内重复输入同一个单词。

     

    二、DataStream Transformations

    Data transformation会将一或多个DataStream转换成一个新的DataStream。程序可以将多个transformation结合形成复杂的拓扑结构(topology)。

     

    本小节给出了所有可用的transformation的描述。

    Transformation

    描述

    Map

    DataStream -> DataStream

    获取一个element并产出一个element。下例是一个将输入*2map方法:
     

    DataStream<Integer> dataStream = //...
    dataStream.
    map(new MapFunction<Integer, Integer>() {
      @Override
      public Integer
    map(Integer value) throws Exception {
        return
    2 * value;
      }
    });

    FlapMap

    DataStream -> DataStream

    获取一个element,并产生出01或多个element。下例是一个为句子分词的flatmap方法

     

    dataStream.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public
    void flatMap(String value, Collector<String> outthrows Exception {
        for(String word: value.
    split(" ")){
        out.
    collect(word);
        }
      }
    });

    Filter

    DataStream -> DataStream

    在每个获取的element上运行一个boolean方法,留下那些方法返回trueelement。下例是一个过滤掉0值的filter
     

    dataStream.filter(new FilterFunction<Integer>() {
      @Override
      public
    boolean filter(Integer value) throws Exception {
        return value !=
    0;
      }
    });

    KeyBy
    DataStream -> KeyedStream

    将流逻辑分为不相交的分区,每个分区包含的都是具有相同keyelement,该分区方法使用hash分区实现。定义key的方法见于Keys。下例是一个返回KeyedDataStreamtransformation
     

    dataStream.keyBy("someKey") // Key by field "someKey"
    dataStream.
    keyBy(0) // Key by the first element of a Tuple

    Reduce

    KeyedStream -> DataStream

    一个在keyed data stream滚动进行的reduce方法。将上一个reduce过的值和当前element结合,产生新的值并发送出。下例是一个创建部分和的reduce方法。
     

    keyedStream.reduce(new ReduceFunction<Integer>() {
      @Override
      public Integer
    reduce(Integer value1, Integer value2throws Exception {
        return value1 + value2;
      }
    });

    Fold

    KeyedStream -> DataStream

    一个在带有初始值的数据流上滚动进行的fold方法。将上一个fold的值和当前element结合,产生新的值并发送出。下例是一个fold方法,当应用于序列{1, 2, 3, 4, 5}时,它发出序列{"start-1", "start-1-2", "start-1-2-3" …}
     

    DataStream<String> result keyedStream.fold("start", new FoldFunction<Integer, String>() {
      @Override
      public String
    fold(String current, Integer value) {
        return current +
    "-" + value;
      }
    });

    Aggregations

    KeyedStream -> DataStream

    在一个keyed DataStream滚动进行聚合的方法。其中,minminBy的区别在于min返回最小值,而minBy返回的是带有在此域中最小值的elementmaxmaxBy一样如此)。
     

    keyedStream.sum(0);
    keyedStream.
    sum("key");
    keyedStream.
    min(0);
    keyedStream.
    min("key");
    keyedStream.
    max(0);
    keyedStream.
    max("key");
    keyedStream.
    minBy(0);
    keyedStream.
    minBy("key");
    keyedStream.
    maxBy(0);
    keyedStream.
    maxBy("key");

    Window

    KeyedStream - > WindowedStream

    Window可以定义在已经分区的KeyedStream上。窗口将根据一些特征(如最近5秒到达的数据)将数据按其各自的key集合在一起。有关窗口的完整描述见于windows
     

    // Last 5 seconds of data

    dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));

    WindowAll

    DataStream -> AllWindowedStream

    Window可以定义在普通的DataStream上。窗口将根据一些特征(如最近5秒到达的数据)将所有Stream事件集合在一起。有关窗口的完整描述见于windows
    警告:该transformation在很多情况下都不是并行化的,所有数据将被收集到一个运行windowAll Operator的任务上。

     

    dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

    Window Apply

    WindowedStream -> DataStream

    AllWindowedStream -> DataStream

    将一个一般函数应用到window整体上去,下面是一个人工计算window中所有element的总和的应用。
    注意:如果你正在使用一个windowAlltransformation,你需要使用AllWindowFunction来代替下例中的参数。
     

    windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public
    void apply (Tuple tuple,
      Window window,
      Iterable<Tuple2<String, Integer>> values,
      Collector<Integer> out) throws Exception {
        
    int sum = 0;
        for (value t: values) {
          sum += t.
    f1;
        }
        out.
    collect (new Integer(sum));
      }
    });

    // applying an AllWindowFunction on non-keyed window stream
    allWindowedStream.
    apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public
    void apply (Window window,
      Iterable<Tuple2<String, Integer>> values,
      Collector<Integer> out) throws Exception {
        
    int sum = 0;
        for (value t: values) {
          sum += t.
    f1;
        }
        out.
    collect (new Integer(sum));
      }
    });

    Window Reduce

    WindowedStream -> DataStream

    对窗口应用一个功能性reduce方法并返回reduce的结果
     

    windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() {
      public Tuple2<String, Integer>
    reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
      return new Tuple2<String,Integer>(value1.
    f0, value1.f1 + value2.f1);
      }
    };

    Window Fold

    Windowed Stream -> DataStream

    对窗口应用一个功能性fold方法。下例代码在应用到序列(1, 2, 3, 4, 5)时,它将该序列fold成为字符串"start-1-2-3-4-5"
     

    windowedStream.fold("start-", new FoldFunction<Integer, String>() {
      public String
    fold(String current, Integer value) {
        return current +
    "-" + value;
      }
    };

    Aggregations on windows

    WindowedStream -> DataStream

    对窗口中的内容聚合。其中,minminBy的区别在于min返回最小值,而minBy返回的是带有在此域中最小值的elementmaxmaxBy一样如此)。
     

    windowedStream.sum(0);
    windowedStream.
    sum("key");
    windowedStream.
    min(0);
    windowedStream.
    min("key");
    windowedStream.
    max(0);
    windowedStream.
    max("key");
    windowedStream.
    minBy(0);
    windowedStream.
    minBy("key");
    windowedStream.
    maxBy(0);
    windowedStream.
    maxBy("key");

    Union

    DataStream* -> DataStream

    2个或多个data stream合并创建出一个新的包含所有streamelementstream。注意:如果你对一个data stream自己进行union操作,则在返回的结果中,每个element都会出现2个。
     

    dataStream.union(otherStream1, otherStream2, ...);

    Window Join

    DataStream, DataStream -> DataStream

    在给定key和普通window中,将2DataStream进行Join操作

     

    dataStream.join(otherStream)
    .
    where(0).equalTo(1)
    .
    window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .
    apply (new JoinFunction () {...});

    Window CoGroup

    DataStream, DataStream -> DataStream

    在给定key和普通window中,对2DataStream进行CoGroup操作。
     

    dataStream.coGroup(otherStream)
    .
    where(0).equalTo(1)
    .
    window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .
    apply (new CoGroupFunction () {...});

    Connect

    DataStream, DataStream -> ConnectedStreams

    在保留两个DataStream的类型的情况下,将二者"连接"起来。Connect使我们可以共享两个Stream的状态
     

    DataStream<Integer> someStream = //...
    DataStream<String> otherStream =
    //...

    ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

    CoMap, CoFlatMap

    ConnectedStreams -> DataStream

    该操作类似于mapflatMap针对连接的Data Stream版本。Sd
     

    connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
      @Override
      public Boolean
    map1(Integer value) {
        return true;
      }

      @Override
      public Boolean
    map2(String value) {
        return false;
      }
    });

     

    connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

      @Override
      public
    void flatMap1(Integer value, Collector<String> out) {
        out.
    collect(value.toString());
      }

      @Override
      public
    void flatMap2(String value, Collector<String> out) {
        for (String word: value.
    split(" ")) {
          out.
    collect(word);
        }
      }
    });

    Split

    DataStream -> SplitStream

    根据某些标准将Stream分割成2个或更多的stream
     

    SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
      @Override
      public Iterable<String>
    select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value %
    2 == 0) {
          output.
    add("even");
        }
        else {
          output.
    add("odd");
        }
        return output;
      }
    });

    Select

    SplitStream -> DataStream

    SplitStream中选择1个或多个stream

     

    SplitStream<Integer> split;
    DataStream<Integer> even = split.
    select("even");
    DataStream<Integer> odd = split.
    select("odd");
    DataStream<Integer> all = split.
    select("even","odd");

    Iterate

    DataStream -> IterativeStream -> DataStream

    通过将一个Operator的输出重定向到前面的某个Operator的方法,在数据流图中创建一个反馈循环。这在定义持续更新模型的算法时十分有用。下面的例子从一个Stream开始,并持续应用迭代体(Iteration body)。大于0element被送回到反馈通道,而其他的element则被转发到下游。相关完整描述请见Iterations
     

    IterativeStream<Long> iteration = initialStream.iterate();
    DataStream<Long> iterationBody = iteration.
    map (/*do something*/);
    DataStream<Long> feedback = iterationBody.
    filter(new FilterFunction<Long>(){
      @Override
      public
    boolean filter(Integer value) throws Exception {
        return value >
    0;
      }
    });
    iteration.
    closeWith(feedback);
    DataStream<Long> output = iterationBody.
    filter(new FilterFunction<Long>(){
      @Override
      public
    boolean filter(Integer value) throws Exception {
        return value <=
    0;
      }
    });

    Extract Timestamps

    DataStream -> DataStream

    通过从数据中抽取时间戳来使得通过使用事件时间语义的窗口可以工作。详情见于Event Time
     

    stream.assignTimestamps (new TimeStampExtractor() {...});

     

    接下来的Transformation是对Tuple类型的data stream可用的Transformation

    Transformation

    描述

    Project

    DataStream -> DataStream

    tuple中选择出域的子集而产生新的DataStream
     

    DataStream<Tuple3<Integer, Double, String>> in = // [...]
    DataStream<Tuple2<String, Integer>> out = in.
    project(2,0);

     

    物理级分割(Physical Partitioning

    如果需要,Flink同样提供了在进行一次transformation后针对精确stream分割的低层次的控制(low-level control),它们通过以下几个方法实现。

     

    Transformations

    描述

    Custom partitioning

    DataStream -> DataStream

    使用一个用户自定义的Partitioner来对每个element选择目标任务sd
     

    dataStream.partitionCustom(partitioner, "someKey");
    dataStream.
    partitionCustom(partitioner, 0);

    Random partitioning

    DataStream -> DataStream

    根据均匀分布来随机分割element
     

    dataStream.shuffle();

    Rebalancing(轮询分割)

    DataStream -> DataStream

    轮询分割element,创建相同负荷的分割。对数据变形(data skew)时的性能优化十分有用s
     

    dataStream.rebalance();

    Rescaling

    DataStream -> DataStream

    element轮询分割到下游Operator子集中去。这在你想流水线并行时十分有用,例如,需要从每个并行的source实例中将数据fan out到一个有着一些mapper来分发负载,但是又不想要函数rebalance()那样引起的完全rebalance的效果时。这就需要仅在本地传输数据,而不是需要从网络传输,这需要依赖其他诸如TaskManager的任务槽数量等等configuration值。
    上游Operation发送element的下游Operation子集同时依赖于上游和下游两方Operation的并行度。例如,若上游Operation的并行度为2,下游Operation并行度为4,则1个上游Operation将会把它的element分发给2个下游Operation。另一方面,若下游并行度为2而上游并行度为4,则2个上游Operation将会把它们的element分发给1个下游Operation,而另外两个上游Operation则分发给另一个下游Operation
    当一个或是多个上下游Operation的并行度不是倍数关系时,下游的Operation将拥有不同的从上游获得的输入的数量。
    下图是上面例子的连接模式图:
     

    dataStream.rescale();

    Broadcasting

    DataStream -> DataStream

    element广播到每一个分割中去
     

    dataStream.broadcast();

     

    链接任务以及资源组(Task chaining & resource groups

    将两个transformation链接起来意味着将它们部署在一起(co-locating),共享同一个线程来获得更好的性能。Flink默认地尽可能地链接Operator(如两个连续的map transformation)。如有需要,API还给出了细粒度的链接控制:

     

    使用StreamExecutionEnvironment.disableOperatorChaining()来关闭整个Job的链接操作。下面表格中的方法则是更加细粒度的控制函数,注意,由于这些函数引用的是前一个transformation,所以它们仅仅在一个DataStreamtransformation后使用才是正确的,例如someStream.map( … ).startNewChain()是正确的,而someStream.startNewChain()错误的。

     

    一个资源组就是Flink中的一个任务槽,如有需要,你可以人工孤立某个Operator到一个独立的任务槽中。

    Transformation

    描述

    startNewChain()

    以当前Operator起点,开始一个新的链接。在下例中,两个mapper将会被链接而filter则不会与第一个mapper链接
     

    someStream.filter(...).map(...).startNewChain().map(...);

    disableChaining()

    下例中,将不会链接mapOperator
     

    someStream.map(...).disableChaining();

    slotSharingGroup()

    设置一个Operation的共享任务槽的分组。Flink将会把同一个任务槽共享组的Operation放到同一个任务槽中,而不在同一个任务槽共享组的Operation放到其他任务槽中。这可以用来孤立任务槽。如果所有的输入Operation都在同一个任务槽共享组中,则该任务槽共享组会继承下来。任务槽共享组的默认名为"default"Operation可以通过调用slotSharingGroup("default")来定义其名称。
     

    someStream.filter(...).slotSharingGroup("name");

     

    三、数据源

    数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)来创建数据源。你可以使用Flink提供的source方法,也可以通过实现SourceFunction来编写自定义的非并行数据源,也可以通过实现ParallelSourceFunction接口或继承RichParallelSourceFunction来编写自定义并行数据源。

    以下是几个预定义的数据流源,可以通过StreamExecutionEnvironment来访问:

    1.    基于文件的:

    ·        readTextFile(path) / TextInputFormat - 以行读取方式读文件并返回字符串

    ·        readFile(path) / 任意输入格式 - 按用输入格式的描述读取文件

    ·        readFileStream - 创建一个stream,在文件有改动时追加element

    2.    基于Socket的:

    ·        socketTextStream - socket读取,element可以通过分割符来分开

    3.    基于Collection的:

    ·        fromCollection(Collection) - Java.util.Collection创建一个数据流。collection中所有的element都必须是同一类型的。

    ·        fromCollection(Iterator, Class) - 从一个迭代器中创建一个数据流。class参数明确了迭代器返回的element的类型。

    ·        fromElement(T …) - 从一个给定的对象序列创建一个数据流。所有对象都必须是同一类型的。

    ·        fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建一个并行数据流。class参数明确了迭代器返回的element的类型。

    ·        generateSequence(from, to) - 从一个给定区间中生成一个并行数字序列。

    4.    自定义:

    ·        addSource - 附上一个新的source方法。例如,通过调用addSource(new FlinkKafkaConsumer08<>(…))来从Apache Kafka读取数据,更多信息见于connector

     

    四、Data Sink

    Data Sink消耗DataStream并将它们转发到文件、socket、外部系统或打印它们。Flink自带了许多内置的输出格式,封装为DataStreamoperation中:

    ·        writeAsText() / TextOutputFormat - 以行字符串的方式写文件,字符串通过调用每个elementtoString()方法获得。

    ·        writeAsCsv(…) / CsvOutputFormat - 以逗号分隔的值来讲Tuple写入文件,行和域的分隔符是可以配置的。每个域的值是通过调用objecttoString()方法获得的。

    ·        print() / printToErr() - 将每个elementtoString()值打印在标准输出 / 标准错误流中。可以提供一个前缀(msg)作为输出的前缀,使得在不同print的调用可以互相区分。如果并行度大于1,输出也会以task的标识符(identifier)为产生的输出的前缀。

    ·        writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出所用的方法和基类,支持自定义objectbyte的转换。

    ·        writeToSocket - 依据SerializationSchemaelement写到socket中。

    ·        addSink - 调用自定义sink方法,Flink自带连接到其他系统的connector(如Apache Kafka),这些connector都以sink方法的形式实现。

     

    注意DataStreamwrite*()函数主要用于debug,它们不参与Flink的检查点,这意味着这些方法通常处于至少一次(at-least-once的执行语义下。flush到目标系统的数据依赖于OutputFormat的实现,这意味着不是所有发送到OutputFormatelement都会立即出现在目标系统中,此外,在失效的情况下,这些数据很可能会丢失。

     

    故为了可靠性以及将stream“恰好一次(exact once地传入文件系统,我们应当使用flink-connector-filesystem。此外,通过实现.addSink(…)的自定义内容会参加Flink的检查点机制,故会保证恰好一次的执行语义。

     

    五、迭代(Iterations

    迭代流程序实现了一个阶段方法并将之嵌入到一个IterativeStream中。作为一个可能永远不会结束的程序,它没有最大迭代数,反之,你需要使用splitfiltertransformation来明确流的哪一部分会被反馈到迭代中,哪一部分则继续转发到下游。这里,我们使用filter作为例子,我们定义IterativeStream

    IterativeStream<Integer> iteration = input.iterate();

    然后,我们定义在循环中将要进行的逻辑处理,我们通过一系列transformation来实现(这里用了一个简单的map transformation):

    DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

     

    我们可以调用IterativeStreamcloseWith(feedbackStream)函数来关闭一个迭代并定义迭代尾。传递给closeWith方法的DataStream将会反馈回迭代头。分割出用来反馈的stream的部分和向前传播的stream部分通常的方法便是使用filter来进行分割。这些filter可以定义诸如"termination"逻辑,即element将会传播到下游,而不是被反馈回去。

    iteration.closeWith(iterationBody.filter(/* one part of the stream */));
    DataStream<Integer> output = iterationBody.
    filter(/* some other part of the stream */);

     

    默认地,反馈的那部分流将会自动设置为迭代头的输入,要想重载该行为,用户需要设置closeWith函数中的一个boolean参数。例如,下面是一个持续将整数序列中的数字减1知道它们变为0的程序:

    DataStream<Long> someIntegers = env.generateSequence(0, 1000);

    IterativeStream<Long> iteration = someIntegers.iterate();

    DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
      @Override
      public Long
    map(Long value) throws Exception {
        return value -
    1 ;
      }
    });

    DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
      @Override
      public
    boolean filter(Long value) throws Exception {
        return (value >
    0);
      }
    });

    iteration.closeWith(stillGreaterThanZero);

    DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
      @Override
      public
    boolean filter(Long value) throws Exception {
        return (value <=
    0);
      }
    });

     

    六、执行参数

    StreamExecutionEnvironment包含ExecutionConfig,它可以使用户设置job的确切运行时配置值。

    请参考execution configuration来查看参数的解释。特别的,以下这些参数仅适用于DataStream API

    enableTimestamps() / disableTimestamps():在每一个source发出的事件上附加上一个时间戳。函数areTimestampsEnabled()可以返回该状态的当前值。

    setAutoWatermarkInterval(long milliseconds):设置自动水印发布(watermark emission)区间。你可以通过调用函数getAutoWatermarkInterval()来获取当前值。

     

    6.1 容错

    文档Fault Tolerance Documentation描述了打开并配置Flink的检查点机制的选项和参数

     

    6.2 控制执行时间

    默认的,element在网络传输时不是一个个单独传输的(这会导致不必要的网络流量),而是缓存后传输。缓存(是在设备间传输的实际单位)的大小可以在Flink的配置文件中设置。尽管该方法有益于优化吞吐量,他会在stream到达不够快时导致执行时间方面的问题。为了控制吞吐量和执行时间,你可以在执行环境(或独立的Operator)中调用env.setBufferTimeout(timeoutMillis)来设置等待装满buffer的最大等待时间,在这个时间过后,不管buffer是否已满,它都会自动发出。该默认超时时间是100ms。下例是设置API的用法:

    LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    env.
    setBufferTimeout(timeoutMillis);

    env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

     

    要最大化吞吐量,设置setBufferTimeout(-1)来去除超时时间,则buffer仅在它满后才会被flush。要最小化执行时间,设置timeout为一个接近0的数字(如5ms10ms)。应当避免设置Timeout0,因为它会造成严重的性能下降。

     

    七、Debugging

    在分布式集群上运行流程序之前,确保算法正确执行很重要。因此,实现数据分析程序通常需要递增的检查结果、debug、优化的过程。

    Flink提供了可以显著简化数据分析程序的开发过程的特性,即可以在IDE中本地进行debug、注入测试数据、以及结果数据的收集等。本节对如何简化Flink程序开发提出几点建议。

     

    7.1 本地执行环境

    LocalStreamEnvironment在创建它的同一个JVM进程下创建Flink系统。如果你从IDE中启动一个LocalEnvironment,你可以在代码中设置断点来简单地debug你的程序。下例为LocalEnvironment是如何创建并使用的:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

    DataStream<String> lines = env.addSource(/* some source */);
    // build your program

    env.execute();

     

    7.2 Collection数据源

    Flink提供基于Java collection的特殊数据源来方便测试。一旦程序测试之后,sourcesink可以简单地替代为对外部系统的读取/写出的sourcesinkCollection数据源使用方法如下:

    // Create a DataStream from a list of elements
    DataStream<Integer> myInts = env.
    fromElements(1, 2, 3, 4, 5);

    // Create a DataStream from any Java collection
    List<Tuple2<String, Integer>> data = ...
    DataStream<Tuple2<String, Integer>> myTuples = env.
    fromCollection(data);

    // Create a DataStream from an Iterator
    Iterator<Long> longIt = ...
    DataStream<Long> myLongs = env.
    fromCollection(longIt, Long.class);

     

    注意:当前Collection数据源需要实现Serializable接口的数据类型和迭代器。此外,Collection数据源无法并行执行(并行度=1

     

    7.3 迭代器Data Sink

    Flink同样提供了一个收集测试和debugDataStream结果的sink,它的使用方式如下:

    import org.apache.flink.contrib.streaming.DataStreamUtils

    DataStream<Tuple2<String, Integer>> myResult = ...
    Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.
    collect(myResult)

  • 相关阅读:
    在阿里云“专有网络”网络类型中配置vsftpd
    Ubuntu 16.04下开启Mysql 3306端口远程访问
    .net core 2.0 报错:error NU1102: Unable to find package ...
    .NET Core Runtime ARM32 builds now available
    .NET Core on Raspberry Pi
    .NET Core 跨平台发布(dotnet publish)
    使用vscode开发调试.net core应用程序并部署到Linux跨平台
    docker容器镜像删除
    使用阿里docker镜像加速器加速
    树莓派3b基于UbuntuMate下载中文输入法
  • 原文地址:https://www.cnblogs.com/lanyun0520/p/5730403.html
Copyright © 2011-2022 走看看