DataStream → DataStream
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<Integer> list = new ArrayList<>(); list.add(1); list.add(2); list.add(3); list.add(4); DataStream<Integer> source = env.fromCollection(list); SingleOutputStreamOperator<Integer> map = source.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return value * 2; } }); map.printToErr().setParallelism(1); env.execute(); }
DataStream → DataStream
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<String> list = new ArrayList<>(); list.add("hello flink spark java"); DataStream<String> source = env.fromCollection(list); SingleOutputStreamOperator<String> flatMapResult = source.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for (String word : value.split(" ")) { out.collect(word); } } }); flatMapResult.printToErr().setParallelism(1); env.execute(); }
DataStream → DataStream
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<Integer> list = new ArrayList<>(); list.add(1); list.add(2); list.add(3); list.add(4); DataStream<Integer> source = env.fromCollection(list); SingleOutputStreamOperator<Integer> filterResult = source.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value % 2 == 0; } }); filterResult.printToErr().setParallelism(1); env.execute(); }
DataStream → KeyedStream
将数据流中的数据分到不同的分组中去,相同key的数据分到一块,内部是通过hash partitioning来实现的,指定key的方式:
dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple
dataStream.map(s -> { String[] split = s.split(","); return new Tuple2<>(split[0], Long.parseLong(split[1])); }).keyBy(0) // 来个滚动窗口 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum(1) .print();
该函数会把数据按照用户指定的 key 进行分组,那么相同分组的数据会被分发到一个 subtask 上进行处理,如果大数据量大且 key 分布不均匀非常容易出现数据倾斜和反压,导致任务失败。常见的解决方式是把所有数据加上随机前后缀
KeyedStream → DataStream
keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } });
KeyedStream → DataStream
A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.
A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<Tuple2<String,Integer>> list = new ArrayList<>(); list.add(new Tuple2<>("a",10)); list.add(new Tuple2<>("b",20)); list.add(new Tuple2<>("a",30)); list.add(new Tuple2<>("b",50)); DataStream<Tuple2<String, Integer>> source = env.fromCollection(list); KeyedStream<Tuple2<String, Integer>, Tuple> keyByResult = source.keyBy(0); //fold函数的第一个参数是初始值,第二个是我们定义的逻辑,后面会在这个初始值上以我们定义的逻辑来处理数据 SingleOutputStreamOperator<String> foldResult = keyByResult.fold("ksw", new FoldFunction<Tuple2<String, Integer>, String>() { @Override public String fold(String current, Tuple2<String, Integer> value) throws Exception { return current + "-" + value.f1; } }); foldResult.printToErr().setParallelism(1); env.execute(); //输出结果: //ksw-20 //ksw-20-50 //ksw-10 //ksw-10-30 }
KeyedStream → DataStream
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
在KeyedStream 数据流上执行滚动聚合,Aggregations 为聚合函数的总称,常见的聚合函数包括但不限于 sum、max、min 等。
max 和 maxBy 的区别在于,max 会返回我们指定字段的最大值,maxBy会返回对应的元素(min和 minBy 同理)。
max 和 maxBy 都会返回整个元素,只是 max 会根据用户指定的字段取最大值,并且把这个值保存在对应的位置,而对于其他的字段,并不能保证其数值正确。min和 minBy同理。
事实上,对于 Aggregations 函数,Flink 帮助我们封装了状态数据,这些状态数据不会被清理,所以在实际生产环境中应该尽量避免在一个无限流上使用 Aggregations。而且,对于同一个 keyedStream ,只能调用一次 Aggregation 函数。
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<Tuple3<Integer,Integer,Integer>> data = new ArrayList<>(); data.add(new Tuple3<>(0,1,1)); data.add(new Tuple3<>(0,2,3)); data.add(new Tuple3<>(0,1,2)); data.add(new Tuple3<>(0,1,5)); data.add(new Tuple3<>(1,2,5)); data.add(new Tuple3<>(1,2,4)); data.add(new Tuple3<>(1,2,13)); data.add(new Tuple3<>(1,2,11)); DataStreamSource source = env.fromCollection(data); //以tuple的第一个元素分组,然后选择第三个元素最大的那个 source.keyBy(0).max(2).printToErr(); //6> (0,1,1) //6> (0,1,3) 我们数据源里并没有(0,1,3)这条数据 //6> (0,1,3) //6> (0,1,5) //6> (1,2,5) //6> (1,2,5) //6> (1,2,13) //6> (1,2,13) source.keyBy(0).maxBy(2).printToErr(); //6> (0,1,1) //6> (0,2,3) //6> (0,2,3) //6> (0,1,5) //6> (1,2,5) //6> (1,2,5) //6> (1,2,13) //6> (1,2,13) env.execute(); }
KeyedStream → WindowedStream
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("172.xx.x.xx", 9999); source.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] split = value.split(","); return new Tuple2<>(split[0],Integer.parseInt(split[1])); } }).keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1) .printToErr().setParallelism(1); env.execute(); }
shoes,1 tie,2 shoes,1 cup,2 shoes,2 cup,3 tie,3
shoes,2 tie,1 shoes,2 cup,1 shoes,2 cup,2 tie,1
DataStream → AllWindowedStream
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("", 9999); source.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] split = value.split(","); return new Tuple2<>(split[0],Integer.parseInt(split[1])); } }).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1) .printToErr().setParallelism(1); env.execute(); }
shoes,1 tie,2 shoes,1 cup,2 shoes,2 cup,3 tie,3
DataStream* → DataStream
dataStream.union(otherStream1, otherStream2, ...);
DataStream → SplitStream
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<Integer> list = new ArrayList<>(); list.add(1); list.add(2); list.add(3); list.add(4); DataStreamSource<Integer> source = env.fromCollection(list); SplitStream<Integer> split = source.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } }); DataStream<Integer> even = split.select("even"); even.printToErr().setParallelism(1); //输出结果: //2 4 env.execute(); }
SplitStream → DataStream
Select one or more streams from a split stream.
DataStream<Integer> even = split.select("even"); DataStream<Integer> odd = split.select("odd"); DataStream<Integer> all = split.select("even","odd");
Custom partitioning
DataStream → DataStream
自定义分区策略:user-defined Partitioner
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
1.基于POJO DataStream的自定义分区
/** * Partitions a POJO DataStream on the specified key fields using a custom partitioner. * This method takes the key expression to partition on, and a partitioner that accepts the key type. * * <p>Note: This method works only on single field keys. * * @param partitioner The partitioner to assign partitions to keys. * @param field The expression for the field on which the DataStream is partitioned. * @return The partitioned DataStream. */ public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, String field) { Keys.ExpressionKeys<T> outExpressionKeys = new Keys.ExpressionKeys<>(new String[]{field}, getType()); return partitionCustom(partitioner, outExpressionKeys); }
2.基于Tuple DataStream的自定义分区
/** * Partitions a tuple DataStream on the specified key fields using a custom partitioner. * This method takes the key position to partition on, and a partitioner that accepts the key type. * * <p>Note: This method works only on single field keys. * * @param partitioner The partitioner to assign partitions to keys. * @param field The field index on which the DataStream is partitioned. * @return The partitioned DataStream. */ public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, int field) { Keys.ExpressionKeys<T> outExpressionKeys = new Keys.ExpressionKeys<>(new int[]{field}, getType()); return partitionCustom(partitioner, outExpressionKeys); }
举例:基于Tuple DataStream的自定义分区
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<Integer> list = new ArrayList<>(); list.add(1); list.add(2); list.add(3); list.add(4); DataStreamSource<Integer> source = env.fromCollection(list); SingleOutputStreamOperator<Tuple1<Integer>> tupleData = source.map(new MapFunction<Integer, Tuple1<Integer>>() { @Override public Tuple1<Integer> map(Integer value) throws Exception { return new Tuple1<>(value); } }); // DataStream<Tuple1<Integer>> dataStream = tupleData.partitionCustom(new org.apache.flink.api.common.functions.Partitioner<Integer>() { @Override public int partition(Integer key, int numPartitions) { if (key % 2 == 0) { return 0; } else { return 1; } } }, 0); dataStream.map(new MapFunction<Tuple1<Integer>, Integer>() { @Override public Integer map(Tuple1<Integer> value) throws Exception { System.out.println("当前线程ID:"+ Thread.currentThread().getId()+",value" + value); return value.getField(0); } }).printToErr(); env.execute(); }
Random partitioning
DataStream → DataStream
Partitions elements randomly according to a uniform distribution.
Rebalancing (Round-robin partitioning)
DataStream → DataStream
Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew.
DataStream → DataStream
DataStream → DataStream
Broadcasts elements to every partition.