zoukankan      html  css  js  c++  java
  • Flink -- Java Generics Programming

    Flink uses a lot of generics programming, which is an executor Framework with cluster of executor having a lot of thread for task by RPC communication(Actor System).

    The data and the process of data are defined by user.

    Event-Driven == Callback function registry : ListenableFuture,   SettableFuture, CompletableFuture 

    public SplitStream<T> split(OutputSelector<T> outputSelector) {
    		return new SplitStream<>(this, clean(outputSelector));
    	}
    
    public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
    		return new ConnectedStreams<>(environment, this, dataStream);
    	}
    
    public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
    		Preconditions.checkNotNull(key);
    		return new KeyedStream<>(this, clean(key));
    	}
    
    public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
    		Preconditions.checkNotNull(key);
    		Preconditions.checkNotNull(keyType);
    		return new KeyedStream<>(this, clean(key), keyType);
    	}
    
    public KeyedStream<T, Tuple> keyBy(String... fields) {
    		return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
    	}
    
    	private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
    		return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
    				getType(), getExecutionConfig())));
    	}
    
    public DataStream<T> shuffle() {
    		return setConnectionType(new ShufflePartitioner<T>());
    	}
    
    public DataStream<T> forward() {
    		return setConnectionType(new ForwardPartitioner<T>());
    	}
    
    public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
    
    		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
    				Utils.getCallLocationName(), true);
    
    		return transform("Map", outType, new StreamMap<>(clean(mapper)));
    	}
    
    public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
    
    		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
    				getType(), Utils.getCallLocationName(), true);
    
    		return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
    
    	}
    public SingleOutputStreamOperator<T> filter(FilterFunction<T> filter) {
    		return transform("Filter", getType(), new StreamFilter<>(clean(filter)));
    
    	}
    

      

    Same as Template Programming in C++. Refer to  Boost 

    class image;
    
    class animation
    {
    public:
        void advance(int ms);
        bool inactive() const;
        void render(image & target) const;
    };
    
    std::vector<animation> anims;
    
    template<class C, class P> void erase_if(C & c, P pred)
    {
        c.erase(std::remove_if(c.begin(), c.end(), pred), c.end());
    }
    
    void update(int ms)
    {
        std::for_each(anims.begin(), anims.end(), boost::bind(&animation::advance, _1, ms));
        erase_if(anims, boost::mem_fn(&animation::inactive));
    }
    
    void render(image & target)
    {
        std::for_each(anims.begin(), anims.end(), boost::bind(&animation::render, _1, boost::ref(target)));
    }
    
    template<class R>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) () BOOST_BIND_NOEXCEPT, _bi::list0>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) () BOOST_BIND_NOEXCEPT)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) () BOOST_BIND_NOEXCEPT;
        typedef _bi::list0 list_type;
        return _bi::bind_t<R, F, list_type> (f, list_type());
    }
    
    template<class R, class B1, class A1>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1) BOOST_BIND_NOEXCEPT, typename _bi::list_av_1<A1>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1) BOOST_BIND_NOEXCEPT, A1 a1)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_1<A1>::type list_type;
        return _bi::bind_t<R, F, list_type> (f, list_type(a1));
    }
    
    template<class R, class B1, class B2, class A1, class A2>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2) BOOST_BIND_NOEXCEPT, typename _bi::list_av_2<A1, A2>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_2<A1, A2>::type list_type;
        return _bi::bind_t<R, F, list_type> (f, list_type(a1, a2));
    }
    
    template<class R,
        class B1, class B2, class B3,
        class A1, class A2, class A3>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2, B3) BOOST_BIND_NOEXCEPT, typename _bi::list_av_3<A1, A2, A3>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2, B3) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2, A3 a3)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2, B3) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_3<A1, A2, A3>::type list_type;
        return _bi::bind_t<R, F, list_type>(f, list_type(a1, a2, a3));
    }
    
    template<class R,
        class B1, class B2, class B3, class B4,
        class A1, class A2, class A3, class A4>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2, B3, B4) BOOST_BIND_NOEXCEPT, typename _bi::list_av_4<A1, A2, A3, A4>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2, B3, B4) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2, A3 a3, A4 a4)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2, B3, B4) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_4<A1, A2, A3, A4>::type list_type;
        return _bi::bind_t<R, F, list_type>(f, list_type(a1, a2, a3, a4));
    }
    
    template<class R,
        class B1, class B2, class B3, class B4, class B5,
        class A1, class A2, class A3, class A4, class A5>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2, B3, B4, B5) BOOST_BIND_NOEXCEPT, typename _bi::list_av_5<A1, A2, A3, A4, A5>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2, B3, B4, B5) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2, B3, B4, B5) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_5<A1, A2, A3, A4, A5>::type list_type;
        return _bi::bind_t<R, F, list_type>(f, list_type(a1, a2, a3, a4, a5));
    }
    
    template<class R,
        class B1, class B2, class B3, class B4, class B5, class B6,
        class A1, class A2, class A3, class A4, class A5, class A6>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2, B3, B4, B5, B6) BOOST_BIND_NOEXCEPT, typename _bi::list_av_6<A1, A2, A3, A4, A5, A6>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2, B3, B4, B5, B6) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2, A3 a3, A4 a4, A5 a5, A6 a6)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2, B3, B4, B5, B6) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_6<A1, A2, A3, A4, A5, A6>::type list_type;
        return _bi::bind_t<R, F, list_type>(f, list_type(a1, a2, a3, a4, a5, a6));
    }
    
    template<class R,
        class B1, class B2, class B3, class B4, class B5, class B6, class B7,
        class A1, class A2, class A3, class A4, class A5, class A6, class A7>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2, B3, B4, B5, B6, B7) BOOST_BIND_NOEXCEPT, typename _bi::list_av_7<A1, A2, A3, A4, A5, A6, A7>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2, B3, B4, B5, B6, B7) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2, A3 a3, A4 a4, A5 a5, A6 a6, A7 a7)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2, B3, B4, B5, B6, B7) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_7<A1, A2, A3, A4, A5, A6, A7>::type list_type;
        return _bi::bind_t<R, F, list_type>(f, list_type(a1, a2, a3, a4, a5, a6, a7));
    }
    
    template<class R,
        class B1, class B2, class B3, class B4, class B5, class B6, class B7, class B8,
        class A1, class A2, class A3, class A4, class A5, class A6, class A7, class A8>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2, B3, B4, B5, B6, B7, B8) BOOST_BIND_NOEXCEPT, typename _bi::list_av_8<A1, A2, A3, A4, A5, A6, A7, A8>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2, B3, B4, B5, B6, B7, B8) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2, A3 a3, A4 a4, A5 a5, A6 a6, A7 a7, A8 a8)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2, B3, B4, B5, B6, B7, B8) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_8<A1, A2, A3, A4, A5, A6, A7, A8>::type list_type;
        return _bi::bind_t<R, F, list_type>(f, list_type(a1, a2, a3, a4, a5, a6, a7, a8));
    }
    
    template<class R,
        class B1, class B2, class B3, class B4, class B5, class B6, class B7, class B8, class B9,
        class A1, class A2, class A3, class A4, class A5, class A6, class A7, class A8, class A9>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2, B3, B4, B5, B6, B7, B8, B9) BOOST_BIND_NOEXCEPT, typename _bi::list_av_9<A1, A2, A3, A4, A5, A6, A7, A8, A9>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2, B3, B4, B5, B6, B7, B8, B9) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2, A3 a3, A4 a4, A5 a5, A6 a6, A7 a7, A8 a8, A9 a9)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2, B3, B4, B5, B6, B7, B8, B9) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_9<A1, A2, A3, A4, A5, A6, A7, A8, A9>::type list_type;
        return _bi::bind_t<R, F, list_type>(f, list_type(a1, a2, a3, a4, a5, a6, a7, a8, a9));
    }
    

      

      

    https://www.boost.org/

  • 相关阅读:
    关于CSDN指针讨论的心得
    VC++ 6.0 与VS2008 C++ DEBUG工具(Windows)介绍
    VC++ 申明静态变量的注意事项
    大家好,我是新的blue1000~
    [讨论]当我采用动态sql绑定datagrid分页的时候,遇到的问题
    我与Google有个对话
    [BK专访]一切以客户为中心,其它一切纷至沓来
    [译]在.net中使用GDI+来提高gif图片的保存画质
    微软能不能别这样坑爹啊,即使不中毒,也伤不起啊
    长见识!1021字节javascript写成的3D圣诞树
  • 原文地址:https://www.cnblogs.com/iiiDragon/p/9747524.html
Copyright © 2011-2022 走看看