一 批处理
文件内容:
hello world hello scala hello flink
代码:
import org.apache.flink.api.scala._ object WordCount { def main(args: Array[String]): Unit = { //创建一个批处理的执行环境 val env = ExecutionEnvironment.getExecutionEnvironment val inputDataSet = env.readTextFile("D:\project\idea\FlinkTutorial\src\main\resources\hello.txt") val wordCountDataSet = inputDataSet .flatMap(_.split(" ")) .map((_,1)) .groupBy(0)//按下标为0的元素分组 .sum(1)//对下标为1的元素求和 wordCountDataSet.print() } }
二 流处理
import org.apache.flink.streaming.api.scala._ object StreamWordCount { def main(args: Array[String]): Unit = { //创建一个流处理的执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //接收socket数据流 val textDataStream = env.socketTextStream("hadoop102", 7777) val wordCountDataStream = textDataStream .flatMap(_.split("\s")) .map((_, 1)) .keyBy(0) .sum(1) wordCountDataStream.print() //执行任务 env.execute("任务名") } }