zoukankan      html  css  js  c++  java
  • [DB] Flink

    概述

    • 流式计算,本质上是增量计算,需要不断查询过去的状态

    概念

    • Streams(流):分为有界流(固定大小,不随时间增加而增长)和无界流(随时间增加而增长),
    • State(状态):在进行流式计算过程中的信息,用于容错恢复和持久化
    • Time(时间):支持Event time、Ingestion time、Processing time等,用来判断业务状态是否滞后或延迟
    • API:分为SQL/Table API、DataStream API、ProcessFunction三层

    集群

    • JobManager:集群管理者,负责调度任务,协调checkpoints、协调故障恢复,收集Job状态,管理TaskManager
    • TaskManager:实际执行计算的Worker,在其上执行Flink Job 的一组Task,将所在节点的服务器信息如内存、磁盘、任务运行情况等向JobManager汇报
    • Clinent:将任务提交到集群,根据用户参数选择提交模式(yarn per job,stand-alone,yarn-session)

    模型

    • DataStream 的编程模型包括四个部分:Environment、DataSource、Transformation、Sink
    • DataSource(数据源):文件、Collection、Socket、自定义
    • Sink(数据目标):Kafka、Elasticsearch、RabbitMQ、Cassandra、Redis
    • 每个数据流起始于一个或多个Source,并终止于一个或多个Sink

    资源

    • 一个TaskManager就是一个JVM进程,会用独立的线程来执行Task
    • 每个TaskManager为集群提供Slot,每个task slot代表了TaskManager的一个固定大小的资源子集,slot数一般为每个节点的cpu核数
    • 一个Flink程序由多个任务组成(source、transformation和 sink)
    • 一个任务由多个并行的实例(线程)来执行,一个任务的并行实例 (线程) 数目就被称为该任务的并行度

    优点

    • 架构:主从模式
    • 容错:基于两阶段提交,实现了精确的一次处理语义
    • 反压:当消费者速度低于生产者时,需要消费者将信息反馈给生产者,使二者速度匹配,Flink使用分布式阻塞队列实现

    连接器

    • Kafka
    • Redis
    • ElasticSearch

    算子

    • Map:接受一个元素作为输入,根据开发者自定义的逻辑处理后输出
     1 class StreamingDemo {
     2     public static void main(String[] args) throws Exception {
     3 
     4         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     5         //获取数据源
     6         DataStreamSource<MyStreamingSource.Item> items = env.addSource(new MyStreamingSource()).setParallelism(1); 
     7         //Map
     8         SingleOutputStreamOperator<Object> mapItems = items.map(new MapFunction<MyStreamingSource.Item, Object>() {
     9             @Override
    10             public Object map(MyStreamingSource.Item item) throws Exception {
    11                 return item.getName();
    12             }
    13         });
    14         //打印结果
    15         mapItems.print().setParallelism(1);
    16         String jobName = "user defined streaming source";
    17         env.execute(jobName);
    18     }
    19 }
    View Code
    • FlatMap:接受一个元素,返回0到多个元素,和Map的区别是,当返回值是列表时,FlatMap会将列表平铺,以单个元素的形式输出
    1 SingleOutputStreamOperator<Object> flatMapItems = items.flatMap(new FlatMapFunction<MyStreamingSource.Item, Object>() {
    2     @Override
    3     public void flatMap(MyStreamingSource.Item item, Collector<Object> collector) throws Exception {
    4         String name = item.getName();
    5         collector.collect(name);
    6     }
    7 });
    View Code
    • Filter:过滤掉不需要的数据,每个元素都会被Filter处理,如果Filter函数返回true则保留,否则丢弃
    1 SingleOutputStreamOperator<MyStreamingSource.Item> filterItems = items.filter(new FilterFunction<MyStreamingSource.Item>() {
    2     @Override
    3     public boolean filter(MyStreamingSource.Item item) throws Exception {
    4 
    5         return item.getId() % 2 == 0;
    6     }
    7 });
    View Code
    • KeyBy:根据数据的某种属性分组,然后对不同的组采取不同的处理方式
    • Aggregations:聚合函数,常见的有sum、max、min等,需要指定一个key进行聚合
    • Reduce:按照用户自定义逻辑进行分组聚合

    状态

    • Flink框架的计算是有状态的
    • 状态即中间计算结果,是在流处理过程中需要记住的数据,包括业务数据和元数据
    • 状态存储在JVM中
    • Flink支持不同类型的状态,对状态的持久化提供专门机制和状态管理器
    • 对于任何一个状态数据,可以设置过期时间(TTL)
    • 基本类型:是否按照某个key进行分区
      • Keyed State:每个key都有自己的状态
      • Operator State(Keyed State):每个算子实例共享一个状态

    容错

    • Checkpoint

    窗口

    • 滚动窗口
    • 滑动窗口
    • 会话窗口

    时间

    • 生成时间
    • 接入时间
    • 处理时间

    水位

    • 由于网络延迟等因素,事件数据往往不能即使传递至Flink系统中,导致系统的不稳定或数据乱序
    • 衡量数据处理进度,确保事件数据全部到达Flink系统,即使乱序或迟到,也能像预期一样计算出正确和连续的结果
    • 任何Event进入Flink系统,都会根据当前最大事件时间产生Watermarks时间戳

    广播变量

    • 允许在每台机器上保持一个只读的缓存变量,即一个公共的共享变量
    • 可以把一个dataset数据集广播出去,然后不同的task在节点上都能获取到

    案例

    • 安装flink
      • tar -zxvf flink-1.9.2-bin-scala_2.11.tgz -C ~/training/
    • 修改flink配置文件
      • vim flink-conf.yaml
    • 启动hadoop,zookeeper,flink
      •  bin/start-cluster.sh
    • socket数据源
      • nc -lk 9999
    • 在idea中创建maven工程,开发计数程序

    FlinkStreaming.scala

     1 package com.kaikeba.demo1
     2 
     3 import org.apache.flink.runtime.state.filesystem.FsStateBackend
     4 import org.apache.flink.streaming.api.CheckpointingMode
     5 import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
     6 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
     7 import org.apache.log4j.{Level, Logger}
     8 
     9 //导入隐式转换的包
    10 import org.apache.flink.api.scala._
    11 
    12 /**
    13   * flink接受socket数据,进行单词计数
    14   */
    15 object FlinkStream {
    16   Logger.getLogger("org").setLevel(Level.ERROR)
    17 
    18   def main(args: Array[String]): Unit = {
    19       //todo:1、构建流处理的环境
    20       val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    21 
    22      //todo:2、从socket获取数据
    23        val sourceStream: DataStream[String] = environment.socketTextStream("bigdata111",9999)
    24 
    25     //todo:3、对数据进行处理     hadoop spark
    26     val result: DataStream[(String, Int)] = sourceStream
    27                                                         .flatMap(x => x.split(" ")) //按照空格切分
    28                                                         .map(x => (x, 1))   //每个单词计为1
    29                                                         .keyBy(0)         //按照下标为0的单词进行分组
    30                                                         .sum(1)           //按照下标为1累加相同单词出现的1
    31 
    32 
    33     //todo: 4、对数据进行打印  sink
    34     result.print()
    35 
    36 
    37     //todo: 5、开启任务
    38     environment.execute("FlinkStream")
    39   }
    40 
    41 }
    View Code

    FlinkWordCount.scala

     1 package com.kaikeba.demo1
     2 
     3 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
     4 import org.apache.flink.streaming.api.windowing.time.Time
     5 
     6 /**
     7   * 使用滑动窗口
     8   * 每隔1秒钟统计最近2秒钟的每个单词出现的次数
     9   */
    10 object FlinkStream {
    11 
    12   def main(args: Array[String]): Unit = {
    13       //构建流处理的环境
    14         val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    15 
    16      //从socket获取数据
    17        val sourceStream: DataStream[String] = env.socketTextStream("node01",9999)
    18 
    19      //导入隐式转换的包
    20       import org.apache.flink.api.scala._
    21 
    22      //对数据进行处理
    23      val result: DataStream[(String, Int)] = sourceStream
    24           .flatMap(x => x.split(" ")) //按照空格切分
    25           .map(x => (x, 1))           //每个单词计为1
    26           .keyBy(0)                   //按照下标为0的单词进行分组      
    27           .timeWindow(Time.seconds(2),Time.seconds(1)) //每隔1s处理2s的数据
    28           .sum(1)            //按照下标为1累加相同单词出现的次数
    29 
    30         //对数据进行打印
    31         result.print()
    32 
    33         //开启任务
    34          env.execute("FlinkStream")
    35       }
    36 
    37 }
    View Code
    • 打包jar文件,提交到yarn  
      • ~/training/flink-1.9.2/bin/flink run -m yarn-cluster -yjm 1024 -c com.kaikeba.demo1.FlinkStream original-flask_demo-1.0-SNAPSHOT.jar
    • 查看结果
      • http://bigdata111:8088

    参考

    Spark Streaming 和 Flink 

    https://blog.csdn.net/csdnnews/article/details/81518143

    读写MySQL

    https://blog.csdn.net/hyy1568786/article/details/105886518/

    Flink 和 kafka

    https://blog.csdn.net/SqrsCbrOnly1/article/details/100011933

    State

    https://blog.csdn.net/mhaiy24/article/details/102707958

    Flink 广播

    https://blog.csdn.net/nazeniwaresakini/article/details/107404951

    https://www.jianshu.com/p/520376ae837e

    Flink 状态

    https://blog.csdn.net/mhaiy24/article/details/102707958

    Flink入门到项目

    https://blog.csdn.net/lp284558195/article/details/92798595

    Flink 使用 broadcast 实现维表或配置的实时更新

    https://blog.csdn.net/tzs_1041218129/article/details/105283325

    flink+kafka实现wordcount实时计算+错误解决方案

    https://blog.csdn.net/xiaoyutongxue6/article/details/88861087

    flink流处理访问mysql

    https://blog.csdn.net/u012447842/article/details/89175772

  • 相关阅读:
    高精度计算
    高精度除以低精度
    P1258 小车问题
    POJ 2352 stars (树状数组入门经典!!!)
    HDU 3635 Dragon Balls(超级经典的带权并查集!!!新手入门)
    HDU 3938 Portal (离线并查集,此题思路很强!!!,得到所谓的距离很巧妙)
    POJ 1703 Find them, Catch them(确定元素归属集合的并查集)
    HDU Virtual Friends(超级经典的带权并查集)
    HDU 3047 Zjnu Stadium(带权并查集,难想到)
    HDU 3038 How Many Answers Are Wrong(带权并查集,真的很难想到是个并查集!!!)
  • 原文地址:https://www.cnblogs.com/cxc1357/p/12709836.html
Copyright © 2011-2022 走看看