zoukankan      html  css  js  c++  java
  • flink学习笔记-split & select(拆分流)

    说明:本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程:

     Flink大数据项目实战:http://t.cn/EJtKhaz

    split

    1.DataStream SplitStream

    2.按照指定标准将指定的DataStream拆分成多个流用SplitStream来表示

    select

    1.SplitStream DataStream

    2.split搭配使用,从SplitStream中选择一个或多个流

    案例:

    public class TestSplitAndSelect {

        public static void main(String[] args) throws Exception {

            final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

            DataStream<Long> input=env.generateSequence(0,10);

            SplitStream<Long> splitStream = input.split(new OutputSelector<Long>() {

                @Override

                public Iterable<String> select(Long value) {

     List<String> output = new ArrayList<String>();

                    if (value % 2 == 0) {

                        output.add("even");

                    }

                    else {

                        output.add("odd");

                    }

                    return output;

                }

            });

            //splitStream.print();

            DataStream<Long> even = splitStream.select("even");

            DataStream<Long> odd = splitStream.select("odd");

            DataStream<Long> all = splitStream.select("even","odd");

            //even.print();

            odd.print();

            //all.print();

            env.execute();

        }

    }

    1.12 project

    含义:从Tuple中选择属性的子集

    限制:

    1.仅限event数据类型为TupleDataStream

    2.仅限Java API

    使用场景:

    ETL时删减计算过程中不需要的字段

    案例:

    public class TestProject {

        public static void main(String[] args) throws Exception {

            final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

     DataStreamSource<Tuple4<String,String,String,Integer>> input=env.fromElements(TRANSCRIPT);

            DataStream<Tuple2<String, Integer>> out = input.project(1,3);

            out.print();

            env.execute();

        }

        public static final Tuple4[] TRANSCRIPT = new Tuple4[] {

                Tuple4.of("class1","张三","语文",100),

                Tuple4.of("class1","李四","语文",78),

                Tuple4.of("class1","王五","语文",99),

                Tuple4.of("class2","赵六","语文",81),

                Tuple4.of("class2","钱七","语文",59),

                Tuple4.of("class2","马二","语文",97)

        };

    }

    1.13 assignTimestampsAndWatermarks

    含义:提取记录中的时间戳作为Event time,主要在window操作中发挥作用,不设置默认就是ProcessingTime

    限制:

    只有基于event time构建window时才起作用

    使用场景:

    当你需要使用event time来创建window时,用来指定如何获取event的时间戳

    案例:讲到window时再说

    1.14 window相关Operators

    放在讲解完Event Time之后在细讲

    构建window

    1.window

    2.windowAll

    window上的操作

    1.Window ApplyWindow Reduce

    2.Window Fold

    3.Aggregations on windows(summinmaxminBymaxBy)

    4.Window Join

    5.Window CoGroup

    2. 物理分区

    2.1回顾 Streaming DataFlow

    2.2并行化DataFlow

    2.3算子间数据传递模式

    One-to-one streams

    保持元素的分区和顺序

    Redistributing streams

    1.改变流的分区

    2.重新分区策略取决于使用的算子

    akeyBy() (re-partitions by hashing the key) 

    bbroadcast()

    crebalance() (which re-partitions randomly)

    2.4物理分区

    能够对分区在物理上进行改变的算子如下图所示:

    2.5 rescale

    通过轮询调度将元素从上游的task一个子集发送到下游task的一个子集。

    原理:

    第一个task并行度为2,第二个task并行度为6,第三个task并行度为2。从第一个task到第二个taskSrc的子集Src1 Map的子集Map123对应起来,Src1会以轮询调度的方式分别向Map123发送记录。从第二个task到第三个taskMap的子集123对应Sink的子集1,这三个流的元素只会发送到Sink1。假设我们每个TaskManager有三个Slot,并且我们开了SlotSharingGroup,那么通过rescale,所有的数据传输都在一个TaskManager内,不需要通过网络。

    2.6任务链和资源组相关操作

    startNewChain()表示从这个操作开始,新启一个新的chain

    someStream.filter(...).map(...).startNewChain().map(...)

    如上一段操作,表示从map()方法开始,新启一个新的chain

    如果禁用任务链可以调用disableChaining()方法。

    如果想单独设置一个SharingGroup,可以调用slotSharingGroup("name")方法。

  • 相关阅读:
    超微主板不识别M2-解决方案
    Centos7安装zookpeer
    PowerBI主题制作
    [python错误]UnicodeDecodeError: 'gbk' codec can't decode byte...
    使用Python批量合并PDF文件(带书签功能)
    Oracle使用超大SQL脚本文件恢复数据问题记录
    Linux Mint 18.2安装后需要进行的设置
    Excel使用SUMIF函数注意事项
    CSV文件分割与列异常处理的python脚本
    小程序例子
  • 原文地址:https://www.cnblogs.com/dajiangtai/p/10621861.html
Copyright © 2011-2022 走看看