相关文章链接
Flink之基础内容(2):DataStream的创建和使用
具体代码如下:
// 获取需要输入的参数(从args中获取,每个任务启动时指定) // ParameterTool是Flink中的工具类 val params: ParameterTool = ParameterTool.fromArgs(args) val host: String = params.get("host") val port: Int = params.getInt("port") // 创建流处理环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 接收socket文本流 val textDstream: DataStream[String] = env.socketTextStream(host, port) // 对接收的文本流进行处理 val result: DataStream[(String, Int)] = textDstream .flatMap(_.split("\s")) .filter(_.nonEmpty) .map((_, 1)) .keyBy(0) .sum(1) result.print().setParallelism(2) // 启动executor,执行任务,在flink的流任务中,需要启动 env.execute("StreamWordCount")