一、概述
1.大致流程:主要分五步
获取一个执行环境
加载/创建初始数据
指定数据上的转换
指定计算结果放在哪里
触发程序执行
详细步骤,可以参考:https://www.cnblogs.com/cjsblog/p/12967555.html
1)在sacla中可以通过静态方法获取执行环境:(根据上下文自动推断)
val env = StreamExecutionEnvironment.getExecutionEnvironment()
2)DataSource数据输入
主要包含:内置数据源和第三方数据源Connector,例如Kafka Connector等
从文件中读取数据示例:
DataStream<String> text = env.readTextFile("file:///path/to/file");
从kafka读取示例如下:
https://blog.csdn.net/u013076044/article/details/102651473
二、Transfrom算子
1.map
对数据流中的每一个元素进行转换逻辑操作,最终返回每一个输入元素转换后的结果数据继续封装成一个DataStream对象返回
(例如每个元素拼接一个字符串或者数字*2等再输出),属于比较简单比较基础的:
val result = dataStream.map(e => "map"+e)
2.flatMap
对每个输入元素进行压平、压扁操作,即输入一个元素,根据你的业务逻辑决定返回几个元素,可以返回0个元素或1个元素或者n个元素
val result = dataStream.flatMap(_.split(" "))
// 传入的可以不止是一个元素,可以是一个List进行压扁输出
3.fillter
元素过滤:
val streamFilter = stream.filter{x => x == 1}
4.keyby
分区操作,开窗的前提:
val keyedStream = dataStream.keyBy(0) // val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream.keyBy(_.symbol)
5.聚合算子
可以使用字段顺序,也可以使用字段名
sum()
min()
max()
minBy()
maxBy()
以上必须是KeyBy之后才能聚合
val sumed: DataStream[SensorReading] = maped.keyBy(0).min(2)
6.reduce
一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果
7.多流转换算子
Split和Select
split:将一个流拆分成多个流,再结合select选择出拆分出的流
例如按照温度30来分割成两个流:
connect和coMap,union
connect对应的是上面分流之后的合流,当然,这里仅仅是外面包了一层,实际上还是两个流各管各的
union则是数据类型需要对齐才能union,这点和SQL是类似的
三、flink的UDF函数
可以通过暴露出来的接口进行细粒度编程控制,其实这也是flink的编程方式
富函数Rich Function可以实现上下文的获取控制,不展开
四、DataStream的sink
最常用的就是:
stream.addSink(new Sink)
当然,主流的都是官方有支持的,甚至Print都是一个基础的sink:
主流sink:
redis的sink,essink暂不展开。
与MySQL等的连接,在1.11版本后有更新:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html