zoukankan      html  css  js  c++  java
  • Flink 双流合并Join

    知识点:

    如果同一批流中有多个相同的id,Flink的双流Join是左连接形式

    参考博客:

    https://blog.csdn.net/dafei1288/article/details/98919202
    https://cloud.tencent.com/developer/article/1596145

    1、主类

    package com.example.demo.flinkJoin;
    
    /**
     * @program: demo
     * @description:
     * @author: yang
     * @create: 2020-12-30 16:57
     */
    
    import org.apache.flink.api.common.functions.JoinFunction;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Date;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    public class JoinTestString {
    
        private static final Logger LOG = LoggerFactory.getLogger(JoinTestString.class);
        private static final String[] TYPE = {"a", "b", "c", "d"};
    
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //添加自定义数据源,每秒发出一笔订单信息{商品名称,商品数量}
            DataStreamSource<Tuple2<String, String>> orderSource1 = env.addSource(new SourceFunction<Tuple2<String, String>>() {
                private volatile boolean isRunning = true;
                private final Random random = new Random();
    
                @Override
                public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
                    while (isRunning) {
                        TimeUnit.SECONDS.sleep(1);
                        Tuple2<String, String> tuple2 = Tuple2.of(TYPE[random.nextInt(TYPE.length)], String.valueOf(random.nextInt(10)));
                        System.out.println(new Date() + ",orderSource1提交元素:" + tuple2);
                        ctx.collect(tuple2);
                    }
                }
    
                @Override
                public void cancel() {
                    isRunning = false;
                }
    
            }, "orderSource1");
    
            DataStreamSource<Tuple2<String, String>> orderSource2 = env.addSource(new SourceFunction<Tuple2<String, String>>() {
                private volatile boolean isRunning = true;
                private final Random random = new Random();
    
                @Override
                public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
                    while (isRunning) {
                        TimeUnit.SECONDS.sleep(1);
                        Tuple2<String, String> tuple2 =  Tuple2.of(TYPE[random.nextInt(TYPE.length)], String.valueOf(random.nextInt(10)));
                        System.out.println(new Date() + ",orderSource2提交元素:" + tuple2);
                        ctx.collect(tuple2);
                    }
                }
    
                @Override
                public void cancel() {
                    isRunning = false;
                }
    
            }, "orderSource2");
    
            orderSource1.join(orderSource2).where(new KeySelector<Tuple2<String, String>, String>() {
                @Override
                public String getKey(Tuple2<String, String> value) throws Exception {
                    return value.f0;
                }
            }).equalTo(new KeySelector<Tuple2<String, String>, String>() {
                @Override
                public String getKey(Tuple2<String, String> value) throws Exception {
                    return value.f0;
                }
            }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple2<String, String>, Tuple2<String, String>, Tuple2<String, String>>() {
                @Override
                public Tuple2<String, String> join(Tuple2<String, String> first, Tuple2<String, String> second) throws Exception {
                    return Tuple2.of("key:"+first.f0, first.f1+"+"+ second.f1);//计算key相同的属性1值
                }
            }).print();
            env.execute("Flink JoinTest");
        }
    }
  • 相关阅读:
    tree命令详解
    rm 命令详解
    rename命令详解
    pwd命令详解
    mv命令详解
    mkdir命令详情
    find命令详解
    dockerfile中配置时区
    docker导入导出
    docker上传私有仓库报错
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14228848.html
Copyright © 2011-2022 走看看