按照这个步骤1分钟内创建完成
idea-----File----new---Project------Maven----Create from archetype----Add Archetype
弹出框:
GroupId填org.apache.flink
ArtifactId填flink-quickstart-java
Version填1.14.0
选中刚刚添加的Archetype,点Next
填写你要创建的这个flink demo的GroupId,ArtifactId,Version,点击Next
创建项目,自动导入依赖的jar,pom中主要的jar
<flink.version>1.14.0</flink.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
done
写一个hello world,flink入门学习是SocketWindowWordCount
排查了好多问题,最终来到了这里,排查的重点错误可以看https://www.cnblogs.com/yb38156/p/15545379.html这篇
public class SocketWindowWordCount { public static void main(String[] args) throws Exception { //入口类,用来设置参数和创建数据源以及提交任务 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); log.info(env.getStreamTimeCharacteristic().name()); //创建一个从本地端口号 9000 的 socket 中读取数据的数据源 //DataStream是流处理的核心API,定义了很多常见的操作如过滤、转换、聚合、窗口、关联等 DataStream text = env.socketTextStream("10.192.78.17", 9000, "\n"); //首先将字符串数据解析成单词和次数(使用元组类型Tuple2<String, Integer>表示),第一个字段是单词,第二个字段是次数,次数初始值都设置成1 DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String in, Collector out) throws Exception { for (String word : in.split("\\s+")) { out.collect(Tuple2.of(word, 1)); } } }); //keyBy(0)表示使用Tuples的第一个字段作为key,5s的滚动窗口,Tuples的第2个字段做累加操作 //timeWindow()已经过时,使用window(),使用的是滚动窗口和eventTime //keyBy(0)过时,使用KeySelector DataStream<Tuple2<String, Integer>> windowCounts = wordCounts.keyBy( (KeySelector<Tuple2<String, Integer>, String>) tuple2 -> tuple2.f0) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // .timeWindow(Time.seconds(5)) .sum(1); windowCounts.print().setParallelism(1); //以上所有的算子的定义和操作只是构建了一个dag,最终需要执行 env.execute("yanbiao Socket Window WordCount"); }
过时的方法已经全部换成了最新的