知识点:
如果同一批流中有多个相同的id,Flink的双流Join是左连接形式
参考博客:
https://blog.csdn.net/dafei1288/article/details/98919202 https://cloud.tencent.com/developer/article/1596145
1、主类
package com.example.demo.flinkJoin; /** * @program: demo * @description: * @author: yang * @create: 2020-12-30 16:57 */ import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.Random; import java.util.concurrent.TimeUnit; public class JoinTestString { private static final Logger LOG = LoggerFactory.getLogger(JoinTestString.class); private static final String[] TYPE = {"a", "b", "c", "d"}; public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //添加自定义数据源,每秒发出一笔订单信息{商品名称,商品数量} DataStreamSource<Tuple2<String, String>> orderSource1 = env.addSource(new SourceFunction<Tuple2<String, String>>() { private volatile boolean isRunning = true; private final Random random = new Random(); @Override public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception { while (isRunning) { TimeUnit.SECONDS.sleep(1); Tuple2<String, String> tuple2 = Tuple2.of(TYPE[random.nextInt(TYPE.length)], String.valueOf(random.nextInt(10))); System.out.println(new Date() + ",orderSource1提交元素:" + tuple2); ctx.collect(tuple2); } } @Override public void cancel() { isRunning = false; } }, "orderSource1"); DataStreamSource<Tuple2<String, String>> orderSource2 = env.addSource(new SourceFunction<Tuple2<String, String>>() { private volatile boolean isRunning = true; private final Random random = new Random(); @Override public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception { while (isRunning) { TimeUnit.SECONDS.sleep(1); Tuple2<String, String> tuple2 = Tuple2.of(TYPE[random.nextInt(TYPE.length)], String.valueOf(random.nextInt(10))); System.out.println(new Date() + ",orderSource2提交元素:" + tuple2); ctx.collect(tuple2); } } @Override public void cancel() { isRunning = false; } }, "orderSource2"); orderSource1.join(orderSource2).where(new KeySelector<Tuple2<String, String>, String>() { @Override public String getKey(Tuple2<String, String> value) throws Exception { return value.f0; } }).equalTo(new KeySelector<Tuple2<String, String>, String>() { @Override public String getKey(Tuple2<String, String> value) throws Exception { return value.f0; } }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple2<String, String>, Tuple2<String, String>, Tuple2<String, String>>() { @Override public Tuple2<String, String> join(Tuple2<String, String> first, Tuple2<String, String> second) throws Exception { return Tuple2.of("key:"+first.f0, first.f1+"+"+ second.f1);//计算key相同的属性1值 } }).print(); env.execute("Flink JoinTest"); } }