public class StreamWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String inputPath = "D:\IDEAWorkspace\2017.3\bigData\Flink\src\main\resources\hello.txt"; DataStream<String> inputDataStream = env.readTextFile(inputPath); //对数据流进行转换操作 DataStream<Tuple2<String, Integer>> resDataStream = inputDataStream.flatMap(new MyFlatMapfunction()) //流处理,来一条数据处理一条,根据元组的第一位进行分区 .keyBy(0) .sum(1);//根据元组的第二个位置进行求和 resDataStream.print(); env.execute(); } public static class MyFlatMapfunction implements FlatMapFunction<String,Tuple2<String,Integer>>{ private Tuple2<String, Integer> wordTuple = new Tuple2<>(); @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = s.split(" ", -1); for (String word : words) { wordTuple.setFields(word,1); collector.collect(wordTuple); } } } }