1 在IDEA中编写Flink程序
Scala版Flink程序编写
本项目使用的Flink版本为最新版本,也就是1.11.0。现在提供maven项目的配置文件。
- 使用Intellij IDEA创建一个Maven新项目
- 勾选
Create from archetype
,然后点击Add Archetype
按钮 GroupId
中输入org.apache.flink
,ArtifactId
中输入flink-quickstart-scala
,Version
中输入1.11.0
,然后点击OK
- 点击向右箭头,出现下拉列表,选中
flink-quickstart-scala:1.11.0
,点击Next
Name
中输入FlinkTutorial
,GroupId
中输入com.atguigu
,ArtifactId
中输入FlinkTutorial
,点击Next
- 最好使用IDEA默认的Maven工具:Bundled(Maven 3),点击
Finish
,等待一会儿,项目就创建好了
编写WordCount.scala
程序
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object StreamingJob { /** Main program method */ def main(args: Array[String]) : Unit = { // get the execution environment StreamExecutionEnvironment env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment // get input data by connecting to the socket val text: DataStream[String] = env .socketTextStream("localhost", 9999, ' ') // parse the data, group it, window it, and aggregate the counts val windowCounts = text .flatMap { w => w.split("\s") } .map { w => WordWithCount(w, 1) } .keyBy("word") .timeWindow(Time.seconds(5)) .sum("count") // print the results with a single thread, rather than in parallel windowCounts .print() .setParallelism(1) env.execute("Socket Window WordCount") } /** Data type for words with count */ case class WordWithCount(word: String, count: Long) }
打开一个终端(Terminal),运行以下命令
$ nc -lk 9999
接下来使用IDEA
运行就可以了。
Java版Flink程序编写
- 使用Intellij IDEA创建一个Maven新项目
- 勾选
Create from archetype
,然后点击Add Archetype
按钮 GroupId
中输入org.apache.flink
,ArtifactId
中输入flink-quickstart-java
,Version
中输入1.11.0
,然后点击OK
- 点击向右箭头,出现下拉列表,选中
flink-quickstart-java:1.11.0
,点击Next
Name
中输入FlinkTutorial
,GroupId
中输入com.atguigu
,ArtifactId
中输入FlinkTutorial
,点击Next
- 最好使用IDEA默认的Maven工具:Bundled(Maven 3),点击
Finish
,等待一会儿,项目就创建好了
编写WordCount.java
程序
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCountFromSocket { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<String> stream = env.socketTextStream("localhost", 9999); stream.flatMap(new Tokenizer()).keyBy(r -> r.f0).sum(1).print(); env.execute("Flink Streaming Java API Skeleton"); } public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] stringList = value.split("\s"); for (String s : stringList) { // 使用out.collect方法向下游发送数据 out.collect(new Tuple2(s, 1)); } } } }
2 下载Flink运行时环境,提交Jar包的运行方式
下载链接:http://mirror.bit.edu.cn/apache/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.11.tgz
然后解压
$ tar xvfz flink-1.11.1-bin-scala_2.11.tgz
启动Flink集群
$ cd flink-1.11.1
$ ./bin/start-cluster.sh
可以打开Flink WebUI查看集群状态:http://localhost:8081
在IDEA
中使用maven package
打包。
提交打包好的JAR
包
$ cd flink-1.11.1
$ ./bin/flink run 打包好的JAR包的绝对路径
停止Flink集群
$ ./bin/stop-cluster.sh
查看标准输出日志的位置,在log
文件夹中。
$ cd flink-1.11.1/log