zoukankan      html  css  js  c++  java
  • Flink实例(四十九): Operators(九)多流转换算子(四)join

    join操作很常见,与我们数据库中常见的inner join类似,它数据的数据侧重与pair,它会按照一定的条件取出两个流或者数据集中匹配的数据返回给下游处理或者输出。

    Join操作DataStream时只能用在window中,和cogroup操作一样。

    1.操作DataStream

    我们都知道window有三种window类型,因此join与其相对,也有三种,除此之外,还有Interval join:

    1. Tumbling Window Join
    2. Sliding Window Join
    3. Session Window Join
    4. Interval Join

    它的编程模型如下:

    stream.join(otherStream)
        .where(<KeySelector>)
        .equalTo(<KeySelector>)
        .window(<WindowAssigner>)
        .apply(<JoinFunction>)

    Tumbling Window Join
    在这里插入图片描述
    从上图中可以看出,join的是每一个窗口中的数据流,它会将一个窗口中相同key的数据按照inner join的方式进行连接,然后在apply方法中实现JoinFunction或者FlatJoinFunction方法来处理并且发送到下游。

    Sliding Window Join
    在这里插入图片描述
    和上面的TumblingWindowJoin一样,只不过是窗口的类型不一样,需要实现JoinFunction或者FlatJoinFunction方法来处理并且发送到下游。

    Session Window Join
    在这里插入图片描述
    下面看一个Tumbling Window Join的实例:

    public class TumblingWindowJoinDemo {
    
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            DataStream<Tuple2<String,String>> input1=env.socketTextStream("192.168.217.110",9002)
                    .map(new MapFunction<String, Tuple2<String,String>>() {
    
                        @Override
                        public Tuple2<String,String> map(String s) throws Exception {
    
                            return Tuple2.of(s.split(" ")[0],s.split(" ")[1]);
                        }
                    }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, String>>() {
                        private long max=2000;
                        private long currentTime;
                        @Nullable
                        @Override
                        public Watermark getCurrentWatermark() {
                            return new Watermark(currentTime-max);
                        }
    
                        @Override
                        public long extractTimestamp(Tuple2<String, String> element, long event) {
                            long timestamp=event;
                            currentTime=Math.max(timestamp,currentTime);
                            return currentTime;
                        }
                    });
    
            DataStream<Tuple2<String,String>> input2=env.socketTextStream("192.168.217.110",9001)
                    .map(new MapFunction<String, Tuple2<String,String>>() {
    
                        @Override
                        public Tuple2<String,String> map(String s) throws Exception {
    
                            return Tuple2.of(s.split(" ")[0],s.split(" ")[1]);
                        }
                    }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, String>>() {
                        private long max=5000;
                        private long currentTime;
                        @Nullable
                        @Override
                        public Watermark getCurrentWatermark() {
                            return new Watermark(System.currentTimeMillis()-max);
                        }
    
                        @Override
                        public long extractTimestamp(Tuple2<String, String> element, long event) {
                            long timestamp=event;
                            currentTime=Math.max(timestamp,currentTime);
                            return currentTime;
                        }
                    });
    
            input1.join(input2)
                    .where(new KeySelector<Tuple2<String,String>, Object>() {
    
                        @Override
                        public Object getKey(Tuple2<String, String> t) throws Exception {
                            return t.f0;
                        }
                    }).equalTo(new KeySelector<Tuple2<String,String>, Object>() {
    
                        @Override
                        public Object getKey(Tuple2<String, String> T) throws Exception {
                            return T.f0;
                        }
                    })
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .trigger(CountTrigger.of(1))
                    .apply(new JoinFunction<Tuple2<String,String>, Tuple2<String,String>, Object>() {
    
                        @Override
                        public Object join(Tuple2<String, String> tuple1, Tuple2<String, String> tuple2) throws Exception {
                            if(tuple1.f0.equals(tuple2.f0)){
                                return tuple1.f1+" "+tuple2.f1;
                            }
                            return null;
                        }
                    }).print();
    
            env.execute();
    
        }
    
    }

    打开两个终端,使用nc工具开启两个端口:

    [shinelon@hadoop-senior Desktop]$ nc -lk 9001
    1 hello
    2 world
    3 shinelon
    5 lllll
    4 sssss
    [shinelon@hadoop-senior Desktop]$ nc -lk 9002
    1 hello
    2 limig
    3 nihao
    4 oooo

    运行结果如下:

    4> hello hello
    2> limig world
    3> nihao shinelon
    1> oooo sssss

    Interval Join
    Interval Join会将两个数据流按照相同的key,并且在其中一个流的时间范围内的数据进行join处理。通常用于把一定时间范围内相关的分组数据拉成一个宽表。我们通常可以用类似下面的表达式来使用interval Join来处理两个数据流:

    key1 == key2 && e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

     我们通常可以使用下面的编程模型来处理两个数据流:

    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    ...
    
    DataStream<Integer> orangeStream = ...
    DataStream<Integer> greenStream = ...
    
    orangeStream
        .keyBy(<KeySelector>)
        .intervalJoin(greenStream.keyBy(<KeySelector>))
        .between(Time.milliseconds(-2), Time.milliseconds(1))
        .process (new ProcessJoinFunction<Integer, Integer, String(){
    
            @Override
            public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
                out.collect(first + "," + second);
            }
        });

    2.操作DataSet

    实例如下:

    public class JoinDemo {
    
        public static void main(String[] args) throws Exception {
            final ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
    
            DataSet<Tuple2<String,Integer>> data1=env.fromElements(
                    Tuple2.of("class1",100),
                    Tuple2.of("class1",400),
                    Tuple2.of("class2",200),
                    Tuple2.of("class2",400)
            );
    
            DataSet<Tuple2<String,Integer>> data2=env.fromElements(
                    Tuple2.of("class1",300),
                    Tuple2.of("class1",600),
                    Tuple2.of("class2",200),
                    Tuple2.of("class3",200)
            );
    
            data1.join(data2)
                    .where(0).equalTo(0)
                    .with(new JoinFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, Object>() {
    
                        @Override
                        public Object join(Tuple2<String, Integer> tuple1,
                                           Tuple2<String, Integer> tuple2) throws Exception {
                            return new String(tuple1.f0+" : "+tuple1.f1+" "+tuple2.f1);
                        }
                    }).print();
        }
    }

    运行结果:

    class1 : 100 300
    class1 : 400 300
    class1 : 100 600
    class1 : 400 600
    class2 : 200 200
    class2 : 400 200

    除此之外,在操作DataSet时还有很多join,如Outer Join,Flat Join等等,具体可以查看官方文档:

    https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#join

  • 相关阅读:
    HeidiSQL
    PostgreSQL
    MariaDB
    NLog
    0 vs null
    忘带手机的那么一天
    江城子·己亥年戊辰月丁丑日话凄凉
    单体 VS 微服务
    java面向对象(四)之重写、重载
    java面向对象(三)之抽象类,接口,向上转型
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13796741.html
Copyright © 2011-2022 走看看