zoukankan      html  css  js  c++  java
  • Flink-API(二)

    Flink API介绍

    1.Stateful Stream Processing 最低级的抽象接口是状态化的数据流接口
    2.DataStream/DataSet API  是 Flink 提供的核心 API ,DataSet 处理
    有界的数据集,DataStream 处理有界或者无界的数据流。
    3.Table API 
    4.SQL

    Dataflows数据流图

    Flink Dataflows是由三部分组成,分别是:source、transformation、sink结束
        1.source数据源会源源不断的产生数据
        2.transformation将产生的数据进行各种业务逻辑的数据处理
        3.由sink输出到外部(console、kafka、redis、DB......)

    下图中这段代码和流程图就是一个Dataflows

    flink上的Dataflow

    当并行度如下设置,Dataflow如下流程

      spark flink
      pipeline 算子链 operator chain
      stage task
      task 线程

    1对一 窄依赖

    1对多 宽依赖 

     1. stage(spark) = task(flink) 

     2. pipeline(spark) = 算子链operator chain(flink)

     3. Task(spark)是shuffle切开的

     4.  Task中的子任务对应的是thread   (subTask0=subTask1, subTask2 = subTask3)

    设置并行度如下

     设置算子并行度方式4种

    1.配置文件设置,系统全局
     parallelism.default: 1
    2.在提交job的时候,通过-p选项来设置
    3.在代码中通过环境变量来设置   代码全局
        env.setParallelism(1)
    4. 直接在算子上设置
        val wordStream = initStream.flatMap(_.split(" ")).setParallelism(2)

     Show plan

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val initStream: DataStream[String] = env.socketTextStream("192.168.75.85", 8888)
        val wordStream = initStream.flatMap(_.split(" ")).setParallelism(3)
    
        val pairStream = wordStream.map((_, 1)).setParallelism(2)
        val keyByStream = pairStream.keyBy(0)
        val restStream = keyByStream.sum(1).setParallelism(3)
        restStream.print()
        env.execute("first flink job")

     

     点击对应的算子,在点击SubTask可以查看当前算子在那个机器上运行

    停掉任务有两种方式

    1.在页面点击Cancel Job
    2.命令:flink canel id

    startNewChain()和disableChaining()提高执行效率

    1.业务逻辑简单,使用startNewChain提高效率
    2.业务逻辑复杂,通过提高并行度+disableChaining提高执行效率
    
    如:
        val wordStream = initStream.flatMap(_.split(" ")).startNewChain()
        val pairStream = wordStream.map((_, 1))
    如:
        val wordStream = initStream.flatMap(_.split(" ")).setParallelism(100).disableChaining()
        val pairStream = wordStream.map((_, 1))
    
    
    
    特例:如果并行度改变,则会发生shuffle,startNewChain和disableChaining不起作用
        val wordStream = initStream.flatMap(_.split(" ")).setParallelism(2).startNewChain()
        val pairStream = wordStream.map((_, 1)).setParallelism(3)

    因: task slot内存隔离,CPU不隔离
    
    要提高效率,则: TaskManager上有(subtask0, subtask2),这样就可以内存不隔离,提高效率
     既: 可以提高本地化级别
    
    问:TaskManager上是否可以放(subtask0,1, 2, 3)
    答: 不可以,也不行
    
    flink分发Task规律
    1. 不同Task的subtask要分发到同一个task slot中(降低数据传输、提高执行效率)
    2. 相同Task的subtask要分发到不同的task slot中(充分利用集群资源)



    so:
    Task slot 个数 决定了 job未来执行的并行度

    并行度设置为2

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val initStream: DataStream[String] = env.socketTextStream("192.168.75.85", 8888)
        val wordStream = initStream.flatMap(_.split(" ")).setParallelism(2)
        val pairStream = wordStream.map((_, 1)).setParallelism(2)
        val keyByStream = pairStream.keyBy(0)
        val restStream = keyByStream.sum(1).setParallelism(2)
        restStream.print() //这里没有设置默认为1
        env.execute("first flink job")
    
    结果:
    socket:ke03:41726
    flatMap: ke03:41726  ke03:41726
    aggregation:  ke03:41726  ke03:41726
    Sink:  ke03:41726

    并行度设置为4

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val initStream: DataStream[String] = env.socketTextStream("192.168.75.85", 8888)
        val wordStream = initStream.flatMap(_.split(" ")).setParallelism(4)
        val pairStream = wordStream.map((_, 1)).setParallelism(4)
        val keyByStream = pairStream.keyBy(0)
        val restStream = keyByStream.sum(1).setParallelism(4)
        restStream.print() //这里没有设置默认为1
        env.execute("first flink job")
    
    结果:
    Socket :    ke03:41726
    Flat Map: ke03:38944 ke03:38944 ke04:41726 ke04:41726
    aggregation: ke03:38944 ke03:38944 ke04:41726 ke04:41726
    Sink: ke03:41726

    设置并行度为50

    因为每台机器只有2个槽,3台机器6个槽 task slot,则启动失败
    
    1.启动设置并行度-p 或者页面设置
    2.代码中restStream.print() 没有这只并行度,在启动的时候设置会给这里加并行度为50,因为没有这么多槽位,则status过一会会失败

    以上代码导包和打包maven_pom

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>1.9.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>1.9.2</version>
            </dependency>
    
    <build>
            <plugins>
                <!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
                <plugin>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <version>2.15.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <!-- maven 打jar包需要插件 -->
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>2.4</version>
                    <configuration>
                        <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
                        <!--<appendAssemblyId>false</appendAssemblyId>-->
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>assembly</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>

    从HDFS上读取文件

    //在算子转换的时候,会将数据转换成Flink内置的数据类型,所以需要将隐式转换导入进来,才能自动进行类型转换
    import org.apache.flink.streaming.api.scala._
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // readTextFile 只读一次
        val textStream = env.readTextFile("hdfs://ke01:8020/xiaoke002/two")
        textStream.print()
        env.execute()
    
    结果:
    6> 1,小明
    8> 2,小红
    11> 3,小柯
    2> 4,wang
    
    readTextFile底层代码:调用readFile,传入了 FileProcessingMode.PROCESS_ONCE(只允许一次)

    so: 需要持续读取数据需要自己写readFile()方法, 使用
    FileProcessingMode.PROCESS_CONTINUOUSLY持续读入
     
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val filePath = "hdfs://ke01:8020/xiaoke002/two"
    val textInputFormat = new TextInputFormat(new Path(filePath))
    // 每隔10s中读取 hdfs上新增文件内容
    val textStream = env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 10)
    textStream.print()
    env.execute()

    结果:持续运行,每次有新数据进来,都会全部读一遍

    12> 1,小明
    5> 3,小柯
    8> 4,wang
    2> 2,小红

    hdfs dfs -appendToFile test /xiaoke002/two 

    3> 3,小柯
    6> aa bb
    7> cc dd
    5> 4,wang
    1> 2,小红
    12> 1,小明
    9> e



     
  • 相关阅读:
    [zz]Ubuntu源签名错误/Ubuntu 更新源签名错误 –BADSIG 40976EAF437D05B5
    [zz]GNU C __attribute__ 机制简介
    [zz]为小米创建虚拟机路由器
    liburcu 库
    多代理集群调度:可伸缩性和灵活性
    automake的使用速查
    automake之简单的例子
    ajax原生
    Cookie 和Session 的原理
    路径问题
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/14865101.html
Copyright © 2011-2022 走看看