zoukankan      html  css  js  c++  java
  • Flink

    使用方式,

    dataStream.coGroup(otherStream)
        .where(0).equalTo(1)
        .window(TumblingEventTimeWindows.of(Time.seconds(3)))
        .apply (new CoGroupFunction () {...});

     

    可以看到coGroup只是产生CoGroupedStreams

        public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
            return new CoGroupedStreams<>(this, otherStream);
        }

     

    而where, equalTo只是添加keySelector,对于两个流需要分别指定

    keySelector1,keySelector2

     

    window设置双流的窗口,很容易理解

     

    apply,

           /**
             * Completes the co-group operation with the user function that is executed
             * for windowed groups.
             *
             * <p>Note: This method's return type does not support setting an operator-specific parallelism.
             * Due to binary backwards compatibility, this cannot be altered. Use the
             * {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific parallelism.
             */
            public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
                //clean the closure
                function = input1.getExecutionEnvironment().clean(function);
    
                UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
                UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);
    
                DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1 //将input1封装成TaggedUnion,很简单,就是赋值到one上
                        .map(new Input1Tagger<T1, T2>())
                        .setParallelism(input1.getParallelism())
                        .returns(unionType);
                DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2 //将input2封装成TaggedUnion
                        .map(new Input2Tagger<T1, T2>())
                        .setParallelism(input2.getParallelism())
                        .returns(unionType);
    
                DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2); //由于现在双流都是TaggedUnion类型,union成一个流,问题被简化
    
                // we explicitly create the keyed stream to manually pass the key type information in
                WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp = //创建窗口
                        new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
                        .window(windowAssigner);
    
                if (trigger != null) { //如果有trigger,evictor,设置上
                    windowOp.trigger(trigger);
                }
                if (evictor != null) {
                    windowOp.evictor(evictor);
                }
    
                return windowOp.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType); //调用window的apply
            }

    关键理解,他要把两个流变成一个流,这样问题域就变得很简单了

    最终调用到WindowedStream的apply,apply是需要保留window里面的所有原始数据的,和reduce不一样

    apply的逻辑,是CoGroupWindowFunction

     

    private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
                extends WrappingFunction<CoGroupFunction<T1, T2, T>>
                implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
    
            private static final long serialVersionUID = 1L;
    
            public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
                super(userFunction);
            }
    
            @Override
            public void apply(KEY key,
                    W window,
                    Iterable<TaggedUnion<T1, T2>> values,
                    Collector<T> out) throws Exception {
    
                List<T1> oneValues = new ArrayList<>();
                List<T2> twoValues = new ArrayList<>();
    
                for (TaggedUnion<T1, T2> val: values) {
                    if (val.isOne()) {
                        oneValues.add(val.getOne());
                    } else {
                        twoValues.add(val.getTwo());
                    }
                }
                wrappedFunction.coGroup(oneValues, twoValues, out);
            }
        }
    }

    逻辑也非常的简单,就是将该key所在window里面的value,放到oneValues, twoValues两个列表中

    最终调用到用户定义的wrappedFunction.coGroup

     

    DataStream.join就是用CoGroup实现的

                return input1.coGroup(input2)
                        .where(keySelector1)
                        .equalTo(keySelector2)
                        .window(windowAssigner)
                        .trigger(trigger)
                        .evictor(evictor)
                        .apply(new FlatJoinCoGroupFunction<>(function), resultType);

     

    FlatJoinCoGroupFunction

    private static class FlatJoinCoGroupFunction<T1, T2, T>
                extends WrappingFunction<FlatJoinFunction<T1, T2, T>>
                implements CoGroupFunction<T1, T2, T> {
            private static final long serialVersionUID = 1L;
    
            public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) {
                super(wrappedFunction);
            }
    
            @Override
            public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
                for (T1 val1: first) {
                    for (T2 val2: second) {
                        wrappedFunction.join(val1, val2, out);
                    }
                }
            }
        }

    可以看出当前join是inner join,必须first和second都有的情况下,才会调到用户的join函数

  • 相关阅读:
    星浩资本-以流程为中心
    BPM配置故事之案例13-触发消息通知
    BPM配置故事之案例12-触发另外流程
    【从零开始学BPM,Day5】报表配置及自定义功能页面开发
    BPM配置故事之案例11-操作外部数据源
    BPM配置故事之案例10-获取外部数据
    [转]页游开发中的 Python 组件与模式Presentation Transcript
    Creating a Game with CocosBuilder
    As3 Practises : use TheMiner do as3 project swf performance profile , find memory leak!
    Manual Install Cocos2d-x vc template on Windows 7
  • 原文地址:https://www.cnblogs.com/fxjwind/p/7216981.html
Copyright © 2011-2022 走看看