zoukankan      html  css  js  c++  java
  • [DB] Flink

    概述

    • 流式计算,本质上是增量计算,需要不断查询过去的状态

    概念

    • Streams(流):分为有界流(固定大小,不随时间增加而增长)和无界流(随时间增加而增长),
    • State(状态):在进行流式计算过程中的信息,用于容错恢复和持久化
    • Time(时间):支持Event time、Ingestion time、Processing time等,用来判断业务状态是否滞后或延迟
    • API:分为SQL/Table API、DataStream API、ProcessFunction三层

    集群

    • JobManager:集群管理者,负责调度任务,协调checkpoints、协调故障恢复,收集Job状态,管理TaskManager
    • TaskManager:实际执行计算的Worker,在其上执行Flink Job 的一组Task,将所在节点的服务器信息如内存、磁盘、任务运行情况等向JobManager汇报
    • Clinent:将任务提交到集群,根据用户参数选择提交模式(yarn per job,stand-alone,yarn-session)

    模型

    • DataStream 的编程模型包括四个部分:Environment、DataSource、Transformation、Sink
    • DataSource(数据源):文件、Collection、Socket、自定义
    • Sink(数据目标):Kafka、Elasticsearch、RabbitMQ、Cassandra、Redis
    • 每个数据流起始于一个或多个Source,并终止于一个或多个Sink

    资源

    • 一个TaskManager就是一个JVM进程,会用独立的线程来执行Task
    • 每个TaskManager为集群提供Slot,每个task slot代表了TaskManager的一个固定大小的资源子集,slot数一般为每个节点的cpu核数
    • 一个Flink程序由多个任务组成(source、transformation和 sink)
    • 一个任务由多个并行的实例(线程)来执行,一个任务的并行实例 (线程) 数目就被称为该任务的并行度

    优点

    • 架构:主从模式
    • 容错:基于两阶段提交,实现了精确的一次处理语义
    • 反压:当消费者速度低于生产者时,需要消费者将信息反馈给生产者,使二者速度匹配,Flink使用分布式阻塞队列实现

    连接器

    • Kafka
    • Redis
    • ElasticSearch

    算子

    • Map:接受一个元素作为输入,根据开发者自定义的逻辑处理后输出
     1 class StreamingDemo {
     2     public static void main(String[] args) throws Exception {
     3 
     4         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     5         //获取数据源
     6         DataStreamSource<MyStreamingSource.Item> items = env.addSource(new MyStreamingSource()).setParallelism(1); 
     7         //Map
     8         SingleOutputStreamOperator<Object> mapItems = items.map(new MapFunction<MyStreamingSource.Item, Object>() {
     9             @Override
    10             public Object map(MyStreamingSource.Item item) throws Exception {
    11                 return item.getName();
    12             }
    13         });
    14         //打印结果
    15         mapItems.print().setParallelism(1);
    16         String jobName = "user defined streaming source";
    17         env.execute(jobName);
    18     }
    19 }
    View Code
    • FlatMap:接受一个元素,返回0到多个元素,和Map的区别是,当返回值是列表时,FlatMap会将列表平铺,以单个元素的形式输出
    1 SingleOutputStreamOperator<Object> flatMapItems = items.flatMap(new FlatMapFunction<MyStreamingSource.Item, Object>() {
    2     @Override
    3     public void flatMap(MyStreamingSource.Item item, Collector<Object> collector) throws Exception {
    4         String name = item.getName();
    5         collector.collect(name);
    6     }
    7 });
    View Code
    • Filter:过滤掉不需要的数据,每个元素都会被Filter处理,如果Filter函数返回true则保留,否则丢弃
    1 SingleOutputStreamOperator<MyStreamingSource.Item> filterItems = items.filter(new FilterFunction<MyStreamingSource.Item>() {
    2     @Override
    3     public boolean filter(MyStreamingSource.Item item) throws Exception {
    4 
    5         return item.getId() % 2 == 0;
    6     }
    7 });
    View Code
    • KeyBy:根据数据的某种属性分组,然后对不同的组采取不同的处理方式
    • Aggregations:聚合函数,常见的有sum、max、min等,需要指定一个key进行聚合
    • Reduce:按照用户自定义逻辑进行分组聚合

    状态

    • Flink框架的计算是有状态的
    • 状态即中间计算结果,是在流处理过程中需要记住的数据,包括业务数据和元数据
    • 状态存储在JVM中
    • Flink支持不同类型的状态,对状态的持久化提供专门机制和状态管理器
    • 对于任何一个状态数据,可以设置过期时间(TTL)
    • 基本类型:是否按照某个key进行分区
      • Keyed State:每个key都有自己的状态
      • Operator State(Keyed State):每个算子实例共享一个状态

    容错

    • Checkpoint

    窗口

    • 滚动窗口
    • 滑动窗口
    • 会话窗口

    时间

    • 生成时间
    • 接入时间
    • 处理时间

    水位

    • 由于网络延迟等因素,事件数据往往不能即使传递至Flink系统中,导致系统的不稳定或数据乱序
    • 衡量数据处理进度,确保事件数据全部到达Flink系统,即使乱序或迟到,也能像预期一样计算出正确和连续的结果
    • 任何Event进入Flink系统,都会根据当前最大事件时间产生Watermarks时间戳

    广播变量

    • 允许在每台机器上保持一个只读的缓存变量,即一个公共的共享变量
    • 可以把一个dataset数据集广播出去,然后不同的task在节点上都能获取到

    案例

    • 安装flink
      • tar -zxvf flink-1.9.2-bin-scala_2.11.tgz -C ~/training/
    • 修改flink配置文件
      • vim flink-conf.yaml
    • 启动hadoop,zookeeper,flink
      •  bin/start-cluster.sh
    • socket数据源
      • nc -lk 9999
    • 在idea中创建maven工程,开发计数程序

    FlinkStreaming.scala

     1 package com.kaikeba.demo1
     2 
     3 import org.apache.flink.runtime.state.filesystem.FsStateBackend
     4 import org.apache.flink.streaming.api.CheckpointingMode
     5 import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
     6 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
     7 import org.apache.log4j.{Level, Logger}
     8 
     9 //导入隐式转换的包
    10 import org.apache.flink.api.scala._
    11 
    12 /**
    13   * flink接受socket数据,进行单词计数
    14   */
    15 object FlinkStream {
    16   Logger.getLogger("org").setLevel(Level.ERROR)
    17 
    18   def main(args: Array[String]): Unit = {
    19       //todo:1、构建流处理的环境
    20       val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    21 
    22      //todo:2、从socket获取数据
    23        val sourceStream: DataStream[String] = environment.socketTextStream("bigdata111",9999)
    24 
    25     //todo:3、对数据进行处理     hadoop spark
    26     val result: DataStream[(String, Int)] = sourceStream
    27                                                         .flatMap(x => x.split(" ")) //按照空格切分
    28                                                         .map(x => (x, 1))   //每个单词计为1
    29                                                         .keyBy(0)         //按照下标为0的单词进行分组
    30                                                         .sum(1)           //按照下标为1累加相同单词出现的1
    31 
    32 
    33     //todo: 4、对数据进行打印  sink
    34     result.print()
    35 
    36 
    37     //todo: 5、开启任务
    38     environment.execute("FlinkStream")
    39   }
    40 
    41 }
    View Code

    FlinkWordCount.scala

     1 package com.kaikeba.demo1
     2 
     3 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
     4 import org.apache.flink.streaming.api.windowing.time.Time
     5 
     6 /**
     7   * 使用滑动窗口
     8   * 每隔1秒钟统计最近2秒钟的每个单词出现的次数
     9   */
    10 object FlinkStream {
    11 
    12   def main(args: Array[String]): Unit = {
    13       //构建流处理的环境
    14         val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    15 
    16      //从socket获取数据
    17        val sourceStream: DataStream[String] = env.socketTextStream("node01",9999)
    18 
    19      //导入隐式转换的包
    20       import org.apache.flink.api.scala._
    21 
    22      //对数据进行处理
    23      val result: DataStream[(String, Int)] = sourceStream
    24           .flatMap(x => x.split(" ")) //按照空格切分
    25           .map(x => (x, 1))           //每个单词计为1
    26           .keyBy(0)                   //按照下标为0的单词进行分组      
    27           .timeWindow(Time.seconds(2),Time.seconds(1)) //每隔1s处理2s的数据
    28           .sum(1)            //按照下标为1累加相同单词出现的次数
    29 
    30         //对数据进行打印
    31         result.print()
    32 
    33         //开启任务
    34          env.execute("FlinkStream")
    35       }
    36 
    37 }
    View Code
    • 打包jar文件,提交到yarn  
      • ~/training/flink-1.9.2/bin/flink run -m yarn-cluster -yjm 1024 -c com.kaikeba.demo1.FlinkStream original-flask_demo-1.0-SNAPSHOT.jar
    • 查看结果
      • http://bigdata111:8088

    参考

    Spark Streaming 和 Flink 

    https://blog.csdn.net/csdnnews/article/details/81518143

    读写MySQL

    https://blog.csdn.net/hyy1568786/article/details/105886518/

    Flink 和 kafka

    https://blog.csdn.net/SqrsCbrOnly1/article/details/100011933

    State

    https://blog.csdn.net/mhaiy24/article/details/102707958

    Flink 广播

    https://blog.csdn.net/nazeniwaresakini/article/details/107404951

    https://www.jianshu.com/p/520376ae837e

    Flink 状态

    https://blog.csdn.net/mhaiy24/article/details/102707958

    Flink入门到项目

    https://blog.csdn.net/lp284558195/article/details/92798595

    Flink 使用 broadcast 实现维表或配置的实时更新

    https://blog.csdn.net/tzs_1041218129/article/details/105283325

    flink+kafka实现wordcount实时计算+错误解决方案

    https://blog.csdn.net/xiaoyutongxue6/article/details/88861087

    flink流处理访问mysql

    https://blog.csdn.net/u012447842/article/details/89175772

  • 相关阅读:
    【Nginx+Tomcat】高性能负载均衡的Tomcat集群
    【JS-Excel】使用JS导出表格数据、附带解决科学计数法等问题
    【Util】日期工具类总结
    【SpringMVC】url映射传参
    【Linux+Windows】Linux,Windows打包发布到Tomcat并修改映射的ip地址
    【Spring】解决返回json乱码问题
    【API】高德地图API JS实现获取坐标和回显点标记
    ELK-Python(二)
    ELK-Python(一)
    zookeeper集群
  • 原文地址:https://www.cnblogs.com/cxc1357/p/12709836.html
Copyright © 2011-2022 走看看