zoukankan      html  css  js  c++  java
  • Flink -- Java Generics Programming

    Flink uses a lot of generics programming, which is an executor Framework with cluster of executor having a lot of thread for task by RPC communication(Actor System).

    The data and the process of data are defined by user.

    Event-Driven == Callback function registry : ListenableFuture,   SettableFuture, CompletableFuture 

    public SplitStream<T> split(OutputSelector<T> outputSelector) {
    		return new SplitStream<>(this, clean(outputSelector));
    	}
    
    public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
    		return new ConnectedStreams<>(environment, this, dataStream);
    	}
    
    public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
    		Preconditions.checkNotNull(key);
    		return new KeyedStream<>(this, clean(key));
    	}
    
    public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
    		Preconditions.checkNotNull(key);
    		Preconditions.checkNotNull(keyType);
    		return new KeyedStream<>(this, clean(key), keyType);
    	}
    
    public KeyedStream<T, Tuple> keyBy(String... fields) {
    		return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
    	}
    
    	private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
    		return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
    				getType(), getExecutionConfig())));
    	}
    
    public DataStream<T> shuffle() {
    		return setConnectionType(new ShufflePartitioner<T>());
    	}
    
    public DataStream<T> forward() {
    		return setConnectionType(new ForwardPartitioner<T>());
    	}
    
    public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
    
    		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
    				Utils.getCallLocationName(), true);
    
    		return transform("Map", outType, new StreamMap<>(clean(mapper)));
    	}
    
    public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
    
    		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
    				getType(), Utils.getCallLocationName(), true);
    
    		return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
    
    	}
    public SingleOutputStreamOperator<T> filter(FilterFunction<T> filter) {
    		return transform("Filter", getType(), new StreamFilter<>(clean(filter)));
    
    	}
    

      

    Same as Template Programming in C++. Refer to  Boost 

    class image;
    
    class animation
    {
    public:
        void advance(int ms);
        bool inactive() const;
        void render(image & target) const;
    };
    
    std::vector<animation> anims;
    
    template<class C, class P> void erase_if(C & c, P pred)
    {
        c.erase(std::remove_if(c.begin(), c.end(), pred), c.end());
    }
    
    void update(int ms)
    {
        std::for_each(anims.begin(), anims.end(), boost::bind(&animation::advance, _1, ms));
        erase_if(anims, boost::mem_fn(&animation::inactive));
    }
    
    void render(image & target)
    {
        std::for_each(anims.begin(), anims.end(), boost::bind(&animation::render, _1, boost::ref(target)));
    }
    
    template<class R>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) () BOOST_BIND_NOEXCEPT, _bi::list0>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) () BOOST_BIND_NOEXCEPT)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) () BOOST_BIND_NOEXCEPT;
        typedef _bi::list0 list_type;
        return _bi::bind_t<R, F, list_type> (f, list_type());
    }
    
    template<class R, class B1, class A1>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1) BOOST_BIND_NOEXCEPT, typename _bi::list_av_1<A1>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1) BOOST_BIND_NOEXCEPT, A1 a1)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_1<A1>::type list_type;
        return _bi::bind_t<R, F, list_type> (f, list_type(a1));
    }
    
    template<class R, class B1, class B2, class A1, class A2>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2) BOOST_BIND_NOEXCEPT, typename _bi::list_av_2<A1, A2>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_2<A1, A2>::type list_type;
        return _bi::bind_t<R, F, list_type> (f, list_type(a1, a2));
    }
    
    template<class R,
        class B1, class B2, class B3,
        class A1, class A2, class A3>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2, B3) BOOST_BIND_NOEXCEPT, typename _bi::list_av_3<A1, A2, A3>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2, B3) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2, A3 a3)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2, B3) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_3<A1, A2, A3>::type list_type;
        return _bi::bind_t<R, F, list_type>(f, list_type(a1, a2, a3));
    }
    
    template<class R,
        class B1, class B2, class B3, class B4,
        class A1, class A2, class A3, class A4>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2, B3, B4) BOOST_BIND_NOEXCEPT, typename _bi::list_av_4<A1, A2, A3, A4>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2, B3, B4) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2, A3 a3, A4 a4)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2, B3, B4) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_4<A1, A2, A3, A4>::type list_type;
        return _bi::bind_t<R, F, list_type>(f, list_type(a1, a2, a3, a4));
    }
    
    template<class R,
        class B1, class B2, class B3, class B4, class B5,
        class A1, class A2, class A3, class A4, class A5>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2, B3, B4, B5) BOOST_BIND_NOEXCEPT, typename _bi::list_av_5<A1, A2, A3, A4, A5>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2, B3, B4, B5) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2, B3, B4, B5) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_5<A1, A2, A3, A4, A5>::type list_type;
        return _bi::bind_t<R, F, list_type>(f, list_type(a1, a2, a3, a4, a5));
    }
    
    template<class R,
        class B1, class B2, class B3, class B4, class B5, class B6,
        class A1, class A2, class A3, class A4, class A5, class A6>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2, B3, B4, B5, B6) BOOST_BIND_NOEXCEPT, typename _bi::list_av_6<A1, A2, A3, A4, A5, A6>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2, B3, B4, B5, B6) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2, A3 a3, A4 a4, A5 a5, A6 a6)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2, B3, B4, B5, B6) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_6<A1, A2, A3, A4, A5, A6>::type list_type;
        return _bi::bind_t<R, F, list_type>(f, list_type(a1, a2, a3, a4, a5, a6));
    }
    
    template<class R,
        class B1, class B2, class B3, class B4, class B5, class B6, class B7,
        class A1, class A2, class A3, class A4, class A5, class A6, class A7>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2, B3, B4, B5, B6, B7) BOOST_BIND_NOEXCEPT, typename _bi::list_av_7<A1, A2, A3, A4, A5, A6, A7>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2, B3, B4, B5, B6, B7) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2, A3 a3, A4 a4, A5 a5, A6 a6, A7 a7)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2, B3, B4, B5, B6, B7) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_7<A1, A2, A3, A4, A5, A6, A7>::type list_type;
        return _bi::bind_t<R, F, list_type>(f, list_type(a1, a2, a3, a4, a5, a6, a7));
    }
    
    template<class R,
        class B1, class B2, class B3, class B4, class B5, class B6, class B7, class B8,
        class A1, class A2, class A3, class A4, class A5, class A6, class A7, class A8>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2, B3, B4, B5, B6, B7, B8) BOOST_BIND_NOEXCEPT, typename _bi::list_av_8<A1, A2, A3, A4, A5, A6, A7, A8>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2, B3, B4, B5, B6, B7, B8) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2, A3 a3, A4 a4, A5 a5, A6 a6, A7 a7, A8 a8)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2, B3, B4, B5, B6, B7, B8) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_8<A1, A2, A3, A4, A5, A6, A7, A8>::type list_type;
        return _bi::bind_t<R, F, list_type>(f, list_type(a1, a2, a3, a4, a5, a6, a7, a8));
    }
    
    template<class R,
        class B1, class B2, class B3, class B4, class B5, class B6, class B7, class B8, class B9,
        class A1, class A2, class A3, class A4, class A5, class A6, class A7, class A8, class A9>
        _bi::bind_t<R, BOOST_BIND_ST R (BOOST_BIND_CC *) (B1, B2, B3, B4, B5, B6, B7, B8, B9) BOOST_BIND_NOEXCEPT, typename _bi::list_av_9<A1, A2, A3, A4, A5, A6, A7, A8, A9>::type>
        BOOST_BIND(BOOST_BIND_ST R (BOOST_BIND_CC *f) (B1, B2, B3, B4, B5, B6, B7, B8, B9) BOOST_BIND_NOEXCEPT, A1 a1, A2 a2, A3 a3, A4 a4, A5 a5, A6 a6, A7 a7, A8 a8, A9 a9)
    {
        typedef BOOST_BIND_ST R (BOOST_BIND_CC *F) (B1, B2, B3, B4, B5, B6, B7, B8, B9) BOOST_BIND_NOEXCEPT;
        typedef typename _bi::list_av_9<A1, A2, A3, A4, A5, A6, A7, A8, A9>::type list_type;
        return _bi::bind_t<R, F, list_type>(f, list_type(a1, a2, a3, a4, a5, a6, a7, a8, a9));
    }
    

      

      

    https://www.boost.org/

  • 相关阅读:
    【转载】opencvVS2019配置方法
    windows.h头文件中改变光标位置的函数——SetConsoleCursorPosition
    五行代码解决猴子选大王问题
    AtCoder Beginner Contest 192
    ACM做题注意事项
    数据库部分重点
    数据库7-11章期末复习
    数据库4-6章期末复习
    数据库1-3章期末复习
    ICPC Central Russia Regional Contest (CRRC 19)
  • 原文地址:https://www.cnblogs.com/iiiDragon/p/9747524.html
Copyright © 2011-2022 走看看