zoukankan      html  css  js  c++  java
  • Flink DataStream API

    官网:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/#operators

    Map

    DataStream → DataStream

    在原来的数据源上对每个元素做一定的映射操作,比如将每个数乘于2

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            List<Integer> list = new ArrayList<>();
            list.add(1);
            list.add(2);
            list.add(3);
            list.add(4);
    ​
            DataStream<Integer> source = env.fromCollection(list);
            SingleOutputStreamOperator<Integer> map = source.map(new MapFunction<Integer, Integer>() {
                @Override
                public Integer map(Integer value) throws Exception {
                    return value * 2;
                }
            });
    ​
            map.printToErr().setParallelism(1);
            env.execute();
        }

    FlatMap

    DataStream → DataStream

    采用一个元素生成零个,一个或多个元素。比如将句子分割为单词的flatmap函数:

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            List<String> list = new ArrayList<>();
            list.add("hello flink spark java");
    ​
            DataStream<String> source = env.fromCollection(list);
            SingleOutputStreamOperator<String> flatMapResult = source.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String value, Collector<String> out) throws Exception {
                    for (String word : value.split(" ")) {
                        out.collect(word);
                    }
                }
            });
    ​
            flatMapResult.printToErr().setParallelism(1);
            env.execute();
        }

    Filter

    DataStream → DataStream

    过滤操作,保留返回true的数据

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            List<Integer> list = new ArrayList<>();
            list.add(1);
            list.add(2);
            list.add(3);
            list.add(4);
    ​
            DataStream<Integer> source = env.fromCollection(list);
            SingleOutputStreamOperator<Integer> filterResult = source.filter(new FilterFunction<Integer>() {
                @Override
                public boolean filter(Integer value) throws Exception {
                    return value % 2 == 0;
                }
            });
    ​
            filterResult.printToErr().setParallelism(1);
            env.execute();
        }

    KeyBy

    DataStream → KeyedStream

    将数据流中的数据分到不同的分组中去,相同key的数据分到一块,内部是通过hash partitioning来实现的,指定key的方式:

    指定key的方式可参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/api_concepts.html#specifying-keys

    dataStream.keyBy("someKey") // Key by field "someKey"
    dataStream.keyBy(0) // Key by the first element of a Tuple

    注意,以下情况的数据不能指定为key,比如:

    1. POJO类型(我们自己定义的实体类),没有重写hashcode()方法

    2. 任何类型的数组

            dataStream.map(s -> {
                String[] split = s.split(",");
                return new Tuple2<>(split[0], Long.parseLong(split[1]));
            }).keyBy(0)
                    // 来个滚动窗口
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .sum(1)
                    .print();

    该函数会把数据按照用户指定的 key 进行分组,那么相同分组的数据会被分发到一个 subtask 上进行处理,如果大数据量大且 key 分布不均匀非常容易出现数据倾斜和反压,导致任务失败。常见的解决方式是把所有数据加上随机前后缀

    Reduce

    KeyedStream → DataStream

    keyedStream.reduce(new ReduceFunction<Integer>() {
        @Override
        public Integer reduce(Integer value1, Integer value2) throws Exception {
            return value1 + value2;
        }
    });

    Fold

    KeyedStream → DataStream

    A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

    A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

    不翻译了,来感受吧

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            List<Tuple2<String,Integer>> list = new ArrayList<>();
            list.add(new Tuple2<>("a",10));
            list.add(new Tuple2<>("b",20));
            list.add(new Tuple2<>("a",30));
            list.add(new Tuple2<>("b",50));
            DataStream<Tuple2<String, Integer>> source = env.fromCollection(list);
            KeyedStream<Tuple2<String, Integer>, Tuple> keyByResult = source.keyBy(0);
            //fold函数的第一个参数是初始值,第二个是我们定义的逻辑,后面会在这个初始值上以我们定义的逻辑来处理数据
            SingleOutputStreamOperator<String> foldResult = keyByResult.fold("ksw", new FoldFunction<Tuple2<String, Integer>, String>() {
                @Override
                public String fold(String current, Tuple2<String, Integer> value) throws Exception {
                    return current + "-" + value.f1;
                }
            });
    ​
            foldResult.printToErr().setParallelism(1);
            env.execute();
            //输出结果:
                //ksw-20
                //ksw-20-50
                //ksw-10
                //ksw-10-30
        }

    Aggregations

    KeyedStream → DataStream

    Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

    在KeyedStream 数据流上执行滚动聚合,Aggregations 为聚合函数的总称,常见的聚合函数包括但不限于 sum、max、min 等。

    max 和 maxBy 的区别在于,max 会返回我们指定字段的最大值,maxBy会返回对应的元素(min和 minBy 同理)。

    max 和 maxBy 都会返回整个元素,只是 max 会根据用户指定的字段取最大值,并且把这个值保存在对应的位置,而对于其他的字段,并不能保证其数值正确。min和 minBy同理。

    事实上,对于 Aggregations 函数,Flink 帮助我们封装了状态数据,这些状态数据不会被清理,所以在实际生产环境中应该尽量避免在一个无限流上使用 Aggregations。而且,对于同一个 keyedStream ,只能调用一次 Aggregation 函数。

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    ​
            List<Tuple3<Integer,Integer,Integer>> data = new ArrayList<>();
            data.add(new Tuple3<>(0,1,1));
            data.add(new Tuple3<>(0,2,3));
            data.add(new Tuple3<>(0,1,2));
            data.add(new Tuple3<>(0,1,5));
    ​
            data.add(new Tuple3<>(1,2,5));
            data.add(new Tuple3<>(1,2,4));
            data.add(new Tuple3<>(1,2,13));
            data.add(new Tuple3<>(1,2,11));
    ​
            DataStreamSource source = env.fromCollection(data);
            //以tuple的第一个元素分组,然后选择第三个元素最大的那个
            source.keyBy(0).max(2).printToErr();
            //6> (0,1,1)
            //6> (0,1,3) 我们数据源里并没有(0,1,3)这条数据
            //6> (0,1,3)
            //6> (0,1,5)
            //6> (1,2,5)
            //6> (1,2,5)
            //6> (1,2,13)
            //6> (1,2,13)
            source.keyBy(0).maxBy(2).printToErr();
            //6> (0,1,1)
            //6> (0,2,3)
            //6> (0,2,3)
            //6> (0,1,5)
            //6> (1,2,5)
            //6> (1,2,5)
            //6> (1,2,13)
            //6> (1,2,13)
            env.execute();
        }

    Window

    KeyedStream → WindowedStream

    Windows可以基于KeyedStreams上操作。Windows会根据一些特征(例如,最近5秒内到达的数据)对每个key中的数据进行分组

    举例:通过socket读取商品销售数据,然后以每5秒的滚动窗口来计算上5秒每个商品卖出了多少件

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> source = env.socketTextStream("172.xx.x.xx", 9999);
            source.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    String[] split = value.split(",");
                    return new Tuple2<>(split[0],Integer.parseInt(split[1]));
                }
            }).keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1)
                    .printToErr().setParallelism(1);
    ​
            env.execute();
        }

    测试数据:

    shoes,1 tie,2 shoes,1 cup,2 shoes,2 cup,3 tie,3

    shoes,2 tie,1 shoes,2 cup,1 shoes,2 cup,2 tie,1

    WindowAll

    DataStream → AllWindowedStream

    基于一定的时间特性(比如最近5秒内到达的数据),将所有的数据聚合到一起,WindowAll不是并行处理的,它是一个task来处理。

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> source = env.socketTextStream("172.30.4.33", 9999);
            source.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    String[] split = value.split(",");
                    return new Tuple2<>(split[0],Integer.parseInt(split[1]));
                }
            }).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1)
                    .printToErr().setParallelism(1);
    ​
            env.execute();
        }

    输入:

    shoes,1 tie,2 shoes,1 cup,2 shoes,2 cup,3 tie,3

    输出结果:

    shoes,14

    Union

    DataStream* → DataStream

    dataStream.union(otherStream1, otherStream2, ...);

    Split

    DataStream → SplitStream

    根据一定的标准将原有的流split为两个或者更多的流

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            List<Integer> list = new ArrayList<>();
            list.add(1);
            list.add(2);
            list.add(3);
            list.add(4);
            DataStreamSource<Integer> source = env.fromCollection(list);
            SplitStream<Integer> split = source.split(new OutputSelector<Integer>() {
                @Override
                public Iterable<String> select(Integer value) {
                    List<String> output = new ArrayList<>();
                    if (value % 2 == 0) {
                        output.add("even");
                    } else {
                        output.add("odd");
                    }
                    return output;
                }
            });
    ​
            DataStream<Integer> even = split.select("even");
            even.printToErr().setParallelism(1);
            //输出结果:
            //2 4
            env.execute();
        }

    Select

    SplitStream → DataStream

    Select one or more streams from a split stream.

    DataStream<Integer> even = split.select("even");
    DataStream<Integer> odd = split.select("odd");
    DataStream<Integer> all = split.select("even","odd");

    数据分区

    我们可以定义特定的数据分区策略,flink提供了以下几种方式:

    Custom partitioning

    DataStream → DataStream

    自定义分区策略:user-defined Partitioner

    dataStream.partitionCustom(partitioner, "someKey");
    dataStream.partitionCustom(partitioner, 0);

    在调用partitionCustom()方法时,可以视实际情况选择以下两种方式:

    1.基于POJO DataStream的自定义分区

        /**
         * Partitions a POJO DataStream on the specified key fields using a custom partitioner.
         * This method takes the key expression to partition on, and a partitioner that accepts the key type.
         *
         * <p>Note: This method works only on single field keys.
         *
         * @param partitioner The partitioner to assign partitions to keys.
         * @param field The expression for the field on which the DataStream is partitioned.
         * @return The partitioned DataStream.
         */
        public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, String field) {
            Keys.ExpressionKeys<T> outExpressionKeys = new Keys.ExpressionKeys<>(new String[]{field}, getType());
            return partitionCustom(partitioner, outExpressionKeys);
        }

    2.基于Tuple DataStream的自定义分区

        /**
         * Partitions a tuple DataStream on the specified key fields using a custom partitioner.
         * This method takes the key position to partition on, and a partitioner that accepts the key type.
         *
         * <p>Note: This method works only on single field keys.
         *
         * @param partitioner The partitioner to assign partitions to keys.
         * @param field The field index on which the DataStream is partitioned.
         * @return The partitioned DataStream.
         */
        public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, int field) {
            Keys.ExpressionKeys<T> outExpressionKeys = new Keys.ExpressionKeys<>(new int[]{field}, getType());
            return partitionCustom(partitioner, outExpressionKeys);
        }

    举例:基于Tuple DataStream的自定义分区

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            List<Integer> list = new ArrayList<>();
            list.add(1);
            list.add(2);
            list.add(3);
            list.add(4);
            DataStreamSource<Integer> source = env.fromCollection(list);
            SingleOutputStreamOperator<Tuple1<Integer>> tupleData = source.map(new MapFunction<Integer, Tuple1<Integer>>() {
                @Override
                public Tuple1<Integer> map(Integer value) throws Exception {
                    return new Tuple1<>(value);
                }
            });
        //  
            DataStream<Tuple1<Integer>> dataStream = tupleData.partitionCustom(new org.apache.flink.api.common.functions.Partitioner<Integer>() {
                @Override
                public int partition(Integer key, int numPartitions) {
                    if (key % 2 == 0) {
                        return 0;
                    } else {
                        return 1;
                    }
                }
            }, 0);
    ​
            dataStream.map(new MapFunction<Tuple1<Integer>, Integer>() {
                @Override
                public Integer map(Tuple1<Integer> value) throws Exception {
                    System.out.println("当前线程ID:"+ Thread.currentThread().getId()+",value" + value);
                    return value.getField(0);
                }
            }).printToErr();
    ​
            env.execute();
        }

    Random partitioning

    DataStream → DataStream

    Partitions elements randomly according to a uniform distribution.

    根据均匀分布将数据随机分区

    dataStream.shuffle();

    Rebalancing (Round-robin partitioning)

    DataStream → DataStream

    Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew.

    RebalancePartitioner会先随机选择一个下游算子的实例,然后用轮询(round-robin)的方式从该实例开始循环输出。该方式能保证完全的下游负载均衡,所以常用来处理有倾斜的原数据流。

    dataStream.rebalance();

    Rescaling

    DataStream → DataStream

    dataStream.rescale();

    Broadcasting

    DataStream → DataStream

    Broadcasts elements to every partition.

    dataStream.broadcast();

     

     

  • 相关阅读:
    Redis 集群
    应用系统日志采集解决方案
    Flume学习应用:Java写日志数据到MongoDB
    一次开发逻辑规范的总结
    JSP2 自定义标签
    mysql的left join、 right join和inner join
    静态网页、动态网页、apache和tomcat之间区别和联系
    spring与shiro配置详解
    maven项目在eclipse启动报错:java.lang.ClassNotFoundException
    如何修改maven的默认jdk版本
  • 原文地址:https://www.cnblogs.com/zz-ksw/p/12966601.html
Copyright © 2011-2022 走看看