POM 文件。
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.10.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.10.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.10.1</version> </dependency> </dependencies> <build> <plugins> <!-- 该插件用于将Scala 代码编译成class 文件--> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <!-- 声明绑定到maven 的compile 阶段--> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
源码:
package com.kpwong.wc
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
//流处理wordcount
object StreamWordCount {
def main(args: Array[String]): Unit = {
//创建流处理的执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// env.setParallelism(8)
// val tool: ParameterTool = ParameterTool.fromArgs(args)
// val host: String = tool.get("host")
// val port: Int = tool.getInt("port")
//接收一个socket 文本流
val socketDS: DataStream[String] = env.socketTextStream("hadoop202",9999)
val wcDS: DataStream[(String, Int)] = socketDS
.flatMap(_.split(" "))
.filter(_.nonEmpty).startNewChain()
.map((_,1))
.keyBy(0)
.sum(1)
wcDS.print()
//启动任务执行
env.execute("Stream word count")
}
}
运行netcat :
查看结果: