Flink API介绍
1.Stateful Stream Processing 最低级的抽象接口是状态化的数据流接口 2.DataStream/DataSet API 是 Flink 提供的核心 API ,DataSet 处理 有界的数据集,DataStream 处理有界或者无界的数据流。 3.Table API 4.SQL
Dataflows数据流图
Flink Dataflows是由三部分组成,分别是:source、transformation、sink结束 1.source数据源会源源不断的产生数据 2.transformation将产生的数据进行各种业务逻辑的数据处理 3.由sink输出到外部(console、kafka、redis、DB......)
下图中这段代码和流程图就是一个Dataflows

flink上的Dataflow

当并行度如下设置,Dataflow如下流程
| spark | flink | |
| pipeline | 算子链 operator chain | |
| stage | task | |
| task | 线程 |

1对一 窄依赖
1对多 宽依赖
1. stage(spark) = task(flink)
2. pipeline(spark) = 算子链operator chain(flink)
3. Task(spark)是shuffle切开的
4. Task中的子任务对应的是thread (subTask0=subTask1, subTask2 = subTask3)
设置并行度如下

设置算子并行度方式4种
1.配置文件设置,系统全局 parallelism.default: 1 2.在提交job的时候,通过-p选项来设置 3.在代码中通过环境变量来设置 代码全局 env.setParallelism(1) 4. 直接在算子上设置 val wordStream = initStream.flatMap(_.split(" ")).setParallelism(2)
Show plan
val env = StreamExecutionEnvironment.getExecutionEnvironment val initStream: DataStream[String] = env.socketTextStream("192.168.75.85", 8888) val wordStream = initStream.flatMap(_.split(" ")).setParallelism(3) val pairStream = wordStream.map((_, 1)).setParallelism(2) val keyByStream = pairStream.keyBy(0) val restStream = keyByStream.sum(1).setParallelism(3) restStream.print() env.execute("first flink job")

点击对应的算子,在点击SubTask可以查看当前算子在那个机器上运行

停掉任务有两种方式
1.在页面点击Cancel Job 2.命令:flink canel id
startNewChain()和disableChaining()提高执行效率
1.业务逻辑简单,使用startNewChain提高效率 2.业务逻辑复杂,通过提高并行度+disableChaining提高执行效率 如: val wordStream = initStream.flatMap(_.split(" ")).startNewChain() val pairStream = wordStream.map((_, 1)) 如: val wordStream = initStream.flatMap(_.split(" ")).setParallelism(100).disableChaining() val pairStream = wordStream.map((_, 1)) 特例:如果并行度改变,则会发生shuffle,startNewChain和disableChaining不起作用 val wordStream = initStream.flatMap(_.split(" ")).setParallelism(2).startNewChain() val pairStream = wordStream.map((_, 1)).setParallelism(3)

因: task slot内存隔离,CPU不隔离 要提高效率,则: TaskManager上有(subtask0, subtask2),这样就可以内存不隔离,提高效率 既: 可以提高本地化级别 问:TaskManager上是否可以放(subtask0,1, 2, 3) 答: 不可以,也不行 flink分发Task规律 1. 不同Task的subtask要分发到同一个task slot中(降低数据传输、提高执行效率) 2. 相同Task的subtask要分发到不同的task slot中(充分利用集群资源)
so:
Task slot 个数 决定了 job未来执行的并行度
并行度设置为2
val env = StreamExecutionEnvironment.getExecutionEnvironment val initStream: DataStream[String] = env.socketTextStream("192.168.75.85", 8888) val wordStream = initStream.flatMap(_.split(" ")).setParallelism(2) val pairStream = wordStream.map((_, 1)).setParallelism(2) val keyByStream = pairStream.keyBy(0) val restStream = keyByStream.sum(1).setParallelism(2) restStream.print() //这里没有设置默认为1 env.execute("first flink job") 结果: socket:ke03:41726 flatMap: ke03:41726 ke03:41726 aggregation: ke03:41726 ke03:41726 Sink: ke03:41726
并行度设置为4
val env = StreamExecutionEnvironment.getExecutionEnvironment val initStream: DataStream[String] = env.socketTextStream("192.168.75.85", 8888) val wordStream = initStream.flatMap(_.split(" ")).setParallelism(4) val pairStream = wordStream.map((_, 1)).setParallelism(4) val keyByStream = pairStream.keyBy(0) val restStream = keyByStream.sum(1).setParallelism(4) restStream.print() //这里没有设置默认为1 env.execute("first flink job") 结果: Socket : ke03:41726 Flat Map: ke03:38944 ke03:38944 ke04:41726 ke04:41726 aggregation: ke03:38944 ke03:38944 ke04:41726 ke04:41726 Sink: ke03:41726
设置并行度为50
因为每台机器只有2个槽,3台机器6个槽 task slot,则启动失败 1.启动设置并行度-p 或者页面设置 2.代码中restStream.print() 没有这只并行度,在启动的时候设置会给这里加并行度为50,因为没有这么多槽位,则status过一会会失败
以上代码导包和打包maven_pom
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.2</version>
</dependency>
<build>
<plugins>
<!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- maven 打jar包需要插件 -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
<!--<appendAssemblyId>false</appendAssemblyId>-->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
从HDFS上读取文件
//在算子转换的时候,会将数据转换成Flink内置的数据类型,所以需要将隐式转换导入进来,才能自动进行类型转换 import org.apache.flink.streaming.api.scala._ val env = StreamExecutionEnvironment.getExecutionEnvironment // readTextFile 只读一次 val textStream = env.readTextFile("hdfs://ke01:8020/xiaoke002/two") textStream.print() env.execute() 结果: 6> 1,小明 8> 2,小红 11> 3,小柯 2> 4,wang readTextFile底层代码:调用readFile,传入了 FileProcessingMode.PROCESS_ONCE(只允许一次)
so: 需要持续读取数据需要自己写readFile()方法, 使用FileProcessingMode.PROCESS_CONTINUOUSLY持续读入
val env = StreamExecutionEnvironment.getExecutionEnvironment
val filePath = "hdfs://ke01:8020/xiaoke002/two"
val textInputFormat = new TextInputFormat(new Path(filePath))
// 每隔10s中读取 hdfs上新增文件内容
val textStream = env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 10)
textStream.print()
env.execute()
结果:持续运行,每次有新数据进来,都会全部读一遍
12> 1,小明
5> 3,小柯
8> 4,wang
2> 2,小红
hdfs dfs -appendToFile test /xiaoke002/two
3> 3,小柯
6> aa bb
7> cc dd
5> 4,wang
1> 2,小红
12> 1,小明
9> e