zoukankan      html  css  js  c++  java
  • Flink

    先看例子,

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Tuple2<Long, Long>> stream = env.addSource(...);
    stream
        .keyBy(0)
        .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
        .reduce(new SummingReducer())
        .addSink(new SinkFunction<Tuple2<Long, Long>>() {...});
    
    env.execute();

    看出,和batch最大的不同是,这里是DataStream而不是DataSet;

    /**
     * A DataStream represents a stream of elements of the same type. A DataStream
     * can be transformed into another DataStream by applying a transformation as
     * for example:
     * <ul>
     * <li>{@link DataStream#map},
     * <li>{@link DataStream#filter}, or
     * </ul>
     *
     * @param <T> The type of the elements in this Stream
     */
    public class DataStream<T> {
        
        protected final StreamExecutionEnvironment environment;
        
        protected final StreamTransformation<T> transformation;
        
        /**
         * Create a new {@link DataStream} in the given execution environment with
         * partitioning set to forward by default.
         *
         * @param environment The StreamExecutionEnvironment
         */
        public DataStream(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
            this.environment = Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
            this.transformation = Preconditions.checkNotNull(transformation, "Stream Transformation must not be null.");
        }    
        
        //DataStream上的各种操作。。。。。。
        //map,reduce,keyby......
    }

    DataStream的核心,即

    StreamTransformation<T> transformation; 如何产生data stream

     

    StreamTransformation

    对于StreamTransformation,表示一个用于create dataStream的operation;
    并且不一定需要对应于一个实际的物理operation,可能只是个逻辑概念,比如下面的例子

    /**
     * A {@code StreamTransformation} represents the operation that creates a
     * {@link org.apache.flink.streaming.api.datastream.DataStream}. Every
     * {@link org.apache.flink.streaming.api.datastream.DataStream} has an underlying
     * {@code StreamTransformation} that is the origin of said DataStream.
     *
     * <p>
     * API operations such as {@link org.apache.flink.streaming.api.datastream.DataStream#map} create
     * a tree of {@code StreamTransformation}s underneath. When the stream program is to be executed this
     * graph is translated to a {@link StreamGraph} using
     * {@link org.apache.flink.streaming.api.graph.StreamGraphGenerator}.
     *
     * <p>
     * A {@code StreamTransformation} does not necessarily correspond to a physical operation
     * at runtime. Some operations are only logical concepts. Examples of this are union,
     * split/select data stream, partitioning.
     *
     * <p>
     * The following graph of {@code StreamTransformations}:
     *
     * <pre>{@code
     *   Source              Source        
     *      +                   +           
     *      |                   |           
     *      v                   v           
     *  Rebalance          HashPartition    
     *      +                   +           
     *      |                   |           
     *      |                   |           
     *      +------>Union<------+           
     *                +                     
     *                |                     
     *                v                     
     *              Split                   
     *                +                     
     *                |                     
     *                v                     
     *              Select                  
     *                +                     
     *                v                     
     *               Map                    
     *                +                     
     *                |                     
     *                v                     
     *              Sink 
     * }</pre>
     *
     * Would result in this graph of operations at runtime:
     *
     * <pre>{@code
     *  Source              Source
     *    +                   +
     *    |                   |
     *    |                   |
     *    +------->Map<-------+
     *              +
     *              |
     *              v
     *             Sink
     * }</pre>
     *
     * The information about partitioning, union, split/select end up being encoded in the edges
     * that connect the sources to the map operation.
     *
     * @param <T> The type of the elements that result from this {@code StreamTransformation}
     */
    public abstract class StreamTransformation<T>

    对于StreamTransformation只定义了output,即该transform产生的result stream
    这是抽象类无法直接用,transform产生stream的逻辑还是要封装在具体的operator中

    通过下面的例子体会一下,transform和operator的区别,这里设计的有点绕

     

    OneInputTransformation,在StreamTransformation基础上加上input

    /**
     * This Transformation represents the application of a
     * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} to one input
     * {@link org.apache.flink.streaming.api.transformations.StreamTransformation}.
     *
     * @param <IN> The type of the elements in the nput {@code StreamTransformation}
     * @param <OUT> The type of the elements that result from this {@code OneInputTransformation}
     */
    public class OneInputTransformation<IN, OUT> extends StreamTransformation<OUT> {
    
        private final StreamTransformation<IN> input;
    
        private final OneInputStreamOperator<IN, OUT> operator;
    
        private KeySelector<IN, ?> stateKeySelector;
        
        private TypeInformation<?> stateKeyType;
    }

    所以包含,
    产生input stream的StreamTransformation<IN> input
    以及通过input产生output的OneInputStreamOperator<IN, OUT> operator

    同时也可以看下,

    public class TwoInputTransformation<IN1, IN2, OUT> extends StreamTransformation<OUT> {
    
        private final StreamTransformation<IN1> input1;
        private final StreamTransformation<IN2> input2;
    
        private final TwoInputStreamOperator<IN1, IN2, OUT> operator;
    }

     

    在看下SourceTransformation和SinkTransformation的对比,

    public class SourceTransformation<T> extends StreamTransformation<T> {
    
        private final StreamSource<T> operator;
    }
    
    public class SinkTransformation<T> extends StreamTransformation<Object> {
    
        private final StreamTransformation<T> input;
    
        private final StreamSink<T> operator;
    }

    比较容易理解transform的作用,
    对于source,没有input,所以没有代表input的transformation
    而对于sink,有input,但是sink的operator不是普通的streamOperator,是StreamSink,即流的终点

     

    transform

    这个函数的意思,用用户自定义的operator,将当前的Stream,转化为用户指定类型的Stream

    /**
     * Method for passing user defined operators along with the type
     * information that will transform the DataStream.
     *
     * @param operatorName
     *            name of the operator, for logging purposes
     * @param outTypeInfo
     *            the output type of the operator
     * @param operator
     *            the object containing the transformation logic
     * @param <R>
     *            type of the return stream
     * @return the data stream constructed
     */
    public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
    
        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();
    
        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<T, R>(
                this.transformation,
                operatorName,
                operator,
                outTypeInfo,
                environment.getParallelism());
    
        @SuppressWarnings({ "unchecked", "rawtypes" })
        SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
    
        getExecutionEnvironment().addOperator(resultTransform);
    
        return returnStream;
    }

    所以参数为,

    用户定义的: 输出的TypeInformation,以及OneInputStreamOperator

    实现是,

    创建OneInputTransformation,以this.transformation为input,以传入的operator为OneInputStreamOperator
    所以通过resultTransform,就会将当前的stream转换为目的流

    然后又封装一个SingleOutputStreamOperator,这是什么?

    /**
     * The SingleOutputStreamOperator represents a user defined transformation
     * applied on a {@link DataStream} with one predefined output type.
     *
     * @param <T> The type of the elements in this Stream
     * @param <O> Type of the operator.
     */
    public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<T, O>> extends DataStream<T> {
    
        protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
            super(environment, transformation);
        }
    }

    说白了,就是封装了一下用户定义的transformation

    Flink这块代码的命名有点混乱,Operator,transformation,两个概念容易混

     

    上面的例子,里面keyBy(0)

    会产生

    KeyedStream
    对于keyedStream,关键的就是
    keySelector和keyType,如何产生key以及key的类型
    /**
     * A {@code KeyedStream} represents a {@link DataStream} on which operator state is
     * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
     * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
     * partitioning methods such as shuffle, forward and keyBy.
     *
     * <p>
     * Reduce-style operations, such as {@link #reduce}, {@link #sum} and {@link #fold} work on elements
     * that have the same key.
     *
     * @param <T> The type of the elements in the Keyed Stream.
     * @param <KEY> The type of the key in the Keyed Stream.
     */
    public class KeyedStream<T, KEY> extends DataStream<T> {
    
        /** The key selector that can get the key by which the stream if partitioned from the elements */
        private final KeySelector<T, KEY> keySelector;
    
        /** The type of the key by which the stream is partitioned */
        private final TypeInformation<KEY> keyType;
    }
     
    看下transform,在调用DataStream.transform的同时,设置keySelector和keyType
    // ------------------------------------------------------------------------
    //  basic transformations
    // ------------------------------------------------------------------------
    
    @Override
    public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
            TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
    
        SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
    
        // inject the key selector and key type
        OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
        transform.setStateKeySelector(keySelector);
        transform.setStateKeyType(keyType);
        
        return returnStream;
    }

     

    KeyedStream很关键的是,作为一个到WindowedStream的过度,

    所以提供一组生成Windowed的接口

    // ------------------------------------------------------------------------
    //  Windowing
    // ------------------------------------------------------------------------
    
    /**
     * Windows this {@code KeyedStream} into tumbling time windows.
     *
     * <p>
     * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
     * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
     * set using
     * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
     *
     * @param size The size of the window.
     */
    public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
        return window(TumblingTimeWindows.of(size));
    }

     

    WindowedStream

    例子中
    .timeWindow(Time.of(
    2500, MILLISECONDS), Time.of(500, MILLISECONDS))

     

    /**
     * A {@code WindowedStream} represents a data stream where elements are grouped by
     * key, and for each key, the stream of elements is split into windows based on a
     * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
     * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
     *
     * <p>
     * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
     * different points for each key.
     *
     * <p>
     * If an {@link Evictor} is specified it will be used to evict elements from the window after
     * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
     * When using an evictor window performance will degrade significantly, since
     * pre-aggregation of window results cannot be used.
     *
     * <p>
     * Note that the {@code WindowedStream} is purely and API construct, during runtime
     * the {@code WindowedStream} will be collapsed together with the
     * {@code KeyedStream} and the operation over the window into one single operation.
     * 
     * @param <T> The type of elements in the stream.
     * @param <K> The type of the key by which elements are grouped.
     * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
     */
    public class WindowedStream<T, K, W extends Window> {
    
        /** The keyed data stream that is windowed by this stream */
        private final KeyedStream<T, K> input;
    
        /** The window assigner */
        private final WindowAssigner<? super T, W> windowAssigner;
    
        /** The trigger that is used for window evaluation/emission. */
        private Trigger<? super T, ? super W> trigger;
    
        /** The evictor that is used for evicting elements before window evaluation. */
        private Evictor<? super T, ? super W> evictor;

    可以看到WindowedStream没有直接继承自DataStream

    而是以,KeyedStream作为他的input

    当然window所必需的,WindowAssigner,Trigger和Evictor,也是不会少

     

    继续例子, .reduce(new SummingReducer())

    看看windowedStream的操作,reduce

    /**
     * Applies a reduce function to the window. The window function is called for each evaluation
     * of the window for each key individually. The output of the reduce function is interpreted
     * as a regular non-windowed stream.
     * <p>
     * This window will try and pre-aggregate data as much as the window policies permit. For example,
     * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
     * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
     * so a few elements are stored per key (one per slide interval).
     * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
     * aggregation tree.
     * 
     * @param function The reduce function.
     * @return The data stream that is the result of applying the reduce function to the window. 
     */
    public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) {
        //clean the closure
        function = input.getExecutionEnvironment().clean(function);
    
        String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
        KeySelector<T, K> keySel = input.getKeySelector();
    
        OneInputStreamOperator<T, T> operator;
    
        boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
    
        if (evictor != null) {
            operator = new EvictingWindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    new HeapWindowBuffer.Factory<T>(),
                    new ReduceWindowFunction<K, W, T>(function),
                    trigger,
                    evictor).enableSetProcessingTime(setProcessingTime);
    
        } else {
            operator = new WindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    new PreAggregatingHeapWindowBuffer.Factory<>(function), //PreAggre,即不会cache真实的element,而是直接存聚合过的值,这样比较节省空间
                    new ReduceWindowFunction<K, W, T>(function),
                    trigger).enableSetProcessingTime(setProcessingTime);
        }
    
        return input.transform(opName, input.getType(), operator);
    }

    关键就是根据是否有Evicting,选择创建不同的WindowOperator

    然后调用input.transform,将windowedStream转换成SingleOutputStream,

    这里input,即是keyedStream

    // ------------------------------------------------------------------------
    //  basic transformations
    // ------------------------------------------------------------------------
    
    @Override
    public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
            TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
    
        SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
    
        // inject the key selector and key type
        OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
        transform.setStateKeySelector(keySelector);
        transform.setStateKeyType(keyType);
        
        return returnStream;
    }

    可以看到这里的参数是OneInputStreamOperator,而WindowOperator其实是实现了该interface的,

    可以看到,对于OneInputStreamOperator而言,我们只需要实现,processElement和processWatermark两个接口,侧重如何处理input element

    /**
     * Interface for stream operators with one input. Use
     * {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if
     * you want to implement a custom operator.
     * 
     * @param <IN> The input type of the operator
     * @param <OUT> The output type of the operator
     */
    public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
    
        /**
         * Processes one element that arrived at this operator.
         * This method is guaranteed to not be called concurrently with other methods of the operator.
         */
        void processElement(StreamRecord<IN> element) throws Exception;
    
        /**
         * Processes a {@link Watermark}.
         * This method is guaranteed to not be called concurrently with other methods of the operator.
         *
         * @see org.apache.flink.streaming.api.watermark.Watermark
         */
        void processWatermark(Watermark mark) throws Exception;
    }

    继续调用,super.transform,即DataStream的transform

     

    例子最后,

    .addSink(new SinkFunction<Tuple2<Long, Long>>() {...});

    实际是调用,

    SingleOutputStreamOperator.addSink,即DataStream.addSink

    /**
     * Adds the given sink to this DataStream. Only streams with sinks added
     * will be executed once the {@link StreamExecutionEnvironment#execute()}
     * method is called.
     *
     * @param sinkFunction
     *            The object containing the sink's invoke function.
     * @return The closed DataStream.
     */
    public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
    
        StreamSink<T> sinkOperator = new StreamSink<T>(clean(sinkFunction));
    
        DataStreamSink<T> sink = new DataStreamSink<T>(this, sinkOperator);
    
        getExecutionEnvironment().addOperator(sink.getTransformation());
        return sink;
    }

     

    SinkFunction结构,

    public interface SinkFunction<IN> extends Function, Serializable {
    
        /**
         * Function for standard sink behaviour. This function is called for every record.
         *
         * @param value The input record.
         * @throws Exception
         */
        void invoke(IN value) throws Exception;
    }

     

    StreamSink,即是OneInputStreamOperator,所以主要是processElement接口

    public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>
            implements OneInputStreamOperator<IN, Object> {
    
        public StreamSink(SinkFunction<IN> sinkFunction) {
            super(sinkFunction);
            chainingStrategy = ChainingStrategy.ALWAYS;
        }
    
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
            userFunction.invoke(element.getValue());
        }
    
        @Override
        public void processWatermark(Watermark mark) throws Exception {
            // ignore it for now, we are a sink, after all
        }
    }

     

    DataStreamSink,就是对SinkTransformation的封装

    /**
     * A Stream Sink. This is used for emitting elements from a streaming topology.
     *
     * @param <T> The type of the elements in the Stream
     */
    public class DataStreamSink<T> {
    
        SinkTransformation<T> transformation;
    
        @SuppressWarnings("unchecked")
        protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
            this.transformation = new SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());
        }
    }

     

    最终,

    把SinkTransformation加入 List<StreamTransformation<?>> transformations

     

    最后走到,env.execute();

  • 相关阅读:
    博客第8周
    剑指offer 位运算
    真题
    剑指offer分类刷题(转载)
    PV PVC StorageClass是什么?
    k8s使用rbac实现多租户
    ansible-playbook 使用blockinfile 修改/etc/hosts 主机名
    ansible-playbook 一键部署ntp时间同步 yml
    简单搭建一个HTTP文件下载服务器
    常用docker命令备忘
  • 原文地址:https://www.cnblogs.com/fxjwind/p/5706295.html
Copyright © 2011-2022 走看看