zoukankan      html  css  js  c++  java
  • Flink比Spark香在哪?

    1 Flink介绍

    Flink 是一个面向分布式数据流处理和批量数据处理的开源计算平台。和 Spark 类似,两者都希望提供一个统一功能的计算平台给用户,都在尝试建立一个统一的平台以运行批量,流式,交互式,图处理,机器学习等应用。

    1.1部署模式

    Flink 集群的部署,本身不依赖 Hadoop 集群,如果用到 HDFS 或是 HBase 中的存储数据,就需要选择对应的 Hadoop 版本。

    • Standalone

    • YARN

    • Mesos

    • Cloud

    1.2整合支持

    1. Flink支持消费kafka的数据;

    2. 支持HBase,Cassandra, ElasticSearch

    3. 支持与Alluxio的整合

    4. 支持RabbitMQ

    1.3 API支持

    • 对Streaming数据类应用,提供DataStream API

    • 对批处理类应用,提供DataSet API(支持Java/Scala)

    • 对流处理和批处理,都支持Table API

    • 支持双流join

    1.4 Libraries支持

    • 支持机器学习(FlinkML)

    • 支持图分析(Gelly)

    • 支持关系数据处理(Table)

    • 支持复杂事件处理(CEP)

    1.5 Flink on YARN

    Flink提供两种Yarn的部署方式Yarn Setup:

    Start a long-running Flink cluster on YARN

    • 通过命令yarn-session.sh来实现,本质上是在yarn集群上启动一个flink集群。

    • 由yarn预先给flink集群分配若干个container给flink使用,在yarn的界面上只能看到一个Flink session with X TaskManagers的任务。

    • 只有一个Flink界面,可以从Yarn的ApplicationMaster链接进入。

    • 使用bin/flink run命令发布任务时,本质上是使用Flink自带的调度,与普通的在Flink集群上发布任务并没有不同。不同的任务可能在一个TaskManager中,也即是在一个JVM进程中,无法实现资源隔离。

    Run a Flink job on YARN

    • 通过命令bin/flink run -m yarn-cluster实现,一次只发布一个任务,本质上给每个flink任务启动了一个集群。

    • yarn不事先给flink分配container,而是在任务发布时,启动JobManager(对应Yarn的AM)和TaskManager,如果一个任务指定了n个TaksManager(-yn n),则会启动n+1个Container,其中一个是JobManager。

    • 发布m个应用,则有m个Flink界面,对比方式一,同样发布m个应用,会多出m-1个JobManager的。

    • 发布任务时,实际上是使用了Yarn的调用。不同的任务不可能在一个Container(JVM)中,也即是实现了资源隔离。

    以第一种启动方式为例,其主要启动流程如下:

    首先我们通过下面的命令行启动flink on yarn的集群
    这里将产生总共五个进程:

    • 1个FlinkYarnSessionCli ---> Yarn Client

    • 1个YarnApplicationMasterRunner ---> AM + JobManager

    • 3个YarnTaskManager --> TaskManager

    即一个客户端+4个container,1个container启动AM,3个container启动TaskManager。

    yarn-session.sh支持的参数:

    一个Flink环境在YARN上的启动流程:

    1. FlinkYarnSessionCli 启动的过程中首先会检查Yarn上有没有足够的资源去启动所需要的container,如果有,则上传一些flink的jar和配置文件到HDFS,这里主要是启动AM进程和TaskManager进程的相关依赖jar包和配置文件。

    2. 接着yarn client会首先向RM申请一个container来启动 ApplicationMaster(YarnApplicationMasterRunner进程),然后RM会通知其中一个NM启动这个container,被分配到启动AM的NM会首先去HDFS上下载第一步上传的jar包和配置文件到本地,接着启动AM;在这个过程中会启动JobManager,因为JobManager和AM在同一进程里面,它会把JobManager的地址重新作为一个文件上传到HDFS上去,TaskManager在启动的过程中也会去下载这个文件获取JobManager的地址,然后与其进行通信;AM还负责Flink的web 服务,Flink里面用到的都是随机端口,这样就允许了用户能够启动多个yarn session。

          从这个启动过程中可以看出,在每次启动Flink on YARN之前,需要指定启动多少个TaskManager,每个taskManager分配的资源是固定的,也就是说这个资源量从taskManager出生到死亡,资源情况一直是这么多,不管它所承载的作业需求资源情况,这样在作业需要更多资源的时候,没有更多的资源分配给对应的作业,相反,当一个作业仅需要很少的资源就能够运行的时候,仍然分配的是那些固定的资源,造成资源的浪费。

    用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。

    下面是一个由Flink程序映射为Streaming Dataflow的示意图,如下所示:

    FlinkKafkaConsumer是一个Source Operator,map、keyBy、timeWindow、apply是Transformation Operator,RollingSink是一个Sink Operator。

    1.6 CEP(Complex event processing)

    Flink CEP 是一套极具通用性、易于使用的实时流式事件处理方案。作为 Flink 的原生组件,省去了第三方库与 Flink 配合使用时可能会导致的各种问题。但其功能现阶段看来还比较基础,不能表达复杂的业务场景,同时它不能够做到动态更新。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);          // (Event, timestamp)          DataStream<Event> input = env.fromElements(              Tuple2.of(new Event(1, "start", 1.0), 5L),              Tuple2.of(new Event(2, "middle", 2.0), 1L),              Tuple2.of(new Event(3, "end", 3.0), 3L),              Tuple2.of(new Event(4, "end", 4.0), 10L),              Tuple2.of(new Event(5, "middle", 6.0), 7L),              Tuple2.of(new Event(6, "middle", 5.0), 7L),              // last element for high final watermark              Tuple2.of(new Event(7, "middle", 5.0), 100L)          ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() { 

    具体的业务逻辑

    Pattern<Event, ? extends Event> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {              @Override              public boolean filter(Event value) throws Exception {                  return value.getName().equals("start");              }          }).followedByAny("middle").where(new SimpleCondition<Event>() {               @Override              public boolean filter(Event value) throws Exception {                  return value.getName().equals("middle");              }          }).followedByAny("end").where(new SimpleCondition<Event>() {              @Override              public boolean filter(Event value) throws Exception {                  return value.getName().equals("end");              }          });          DataStream<String> result = CEP.pattern(input, pattern, comparator).select(              new PatternSelectFunction<Event, String>() {                  @Override                  public String select(Map<String, List<Event>> pattern) {                      StringBuilder builder = new StringBuilder();                      builder.append(pattern.get("start").get(0).getId()).append(",")                          .append(pattern.get("middle").get(0).getId()).append(",")                          .append(pattern.get("end").get(0).getId());                      return builder.toString();                  }              }          );  

    从例子代码中可以看到,patterns需要用java代码写,需要编译,很冗长很麻烦,没法动态配置;需要可配置,或提供一种DSL;再者,对于一个流同时只能设置一个pattern,比如对于不同的用户实例想配置不同的pattern,就没法支持;需要支持按key设置pattern。

    1.7 Flink目前存在的一些问题

    在实时计算中有这么一个普遍的逻辑:业务逻辑中以一个流式数据源与几个相关的配置表进行join操作,而配置表并不是一成不变的,会定期的进行数据更新,可以看成一个缓慢变化的流。这种join环境存在以下几个尚未解决的问题:

    1.对元数据库的读压力;如果分析程序有1000并发,是否需要读1000次;

    2.读维表数据不能拖慢主数据流的throughput,每秒千万条数据量;

    3.动态维表更新问题和一致性问题;元数据是不断变化的,如何把更新同步到各个并发上;

    4.冷启动问题,如何保证主数据流流过的时候,维表数据已经ready,否则会出现数据无法处理;

    5.超大维表数据会导致流量抖动和频繁gc,比如几十万条的实例数据,可能上百兆。

    在Flink社区,对该问题也进行了关注

    https://issues.apache.org/jira/browse/FLINK-6131

    https://issues.apache.org/jira/browse/FLINK-2320

    https://issues.apache.org/jira/browse/FLINK-3514

    当然在生产环境上也有相应的解决方案:

    使用redis来做cache,只用一个job,负责从元数据库同步数据到redis,这样就解决1,3

    然后所有的并发都从redis直接查询需要的元数据,这样就解决4;对于2,在并发上做local cache,只有第一次需要真正查询redis,后续定期异步更新就好,不会影响到主数据流;对于5,因为现在不需要一下全量的读取维表数据到内存,用到的时候才去读,分摊了负载,也可以得到缓解。

    这个方案也有一定的弊端,增加了架构的外部依赖,要额外保障外部redis和同步job的稳定性。

    2 Flink vs Spark

    2.1 框架

    Spark把streaming看成是更快的批处理,而Flink把批处理看成streaming的special case。这里面的思路决定了各自的方向,其中两者的差异点有如下这些:
    实时 vs 近实时的角度:Flink提供了基于每个事件的流式处理机制,所以可以被认为是一个真正的流式计;而Spark,不是基于事件的粒度,而是用小批量来模拟流式,也就是多个事件的集合。所以Spark被认为是近实时的处理系统。
      Spark streaming 是更快的批处理,而Flink Batch是有限数据的流式计算。

    2.1.1 流式计算和批处理API

    Spark对于流式计算和批处理,都是基于RDD的抽象。这样很方便将两种计算方式合并表示。而Flink将流式计算和批处理分别抽象出来DataStream和DataSet两种API,这一点上Flink相对于spark来说是一个糟糕的设计。

    2.2 社区活跃度对比

    Spark 2.3 继续向更快、更易用、更智能的目标迈进,引入了低延迟的持续处理能力和流到流的连接,让 Structured Streaming 达到了一个里程碑式的高度。

    3 提交一个Flink作业

    启动flink服务

    ./bin/yarn-session.sh -n 4 -jm 2048 -tm 2048

    在yarn监控界面上可以看到该作业的执行状态

    并验证Wordcount例子

    ./bin/flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 2048 ./examples/batch/WordCount.jar

    在client端可以看到log:

    --end--

  • 相关阅读:
    Debian8搭建LEMP环境
    ProjectManager Beta 7 项目管理器发布
    我的Linux软件集
    修改/home内子目录的名字
    Nginx配置特定二级域名
    Debian8 安装wordpress博客
    LinuxMint18使用单独分区作为Home挂载点
    LinuxMint18配置Grub2默认启动操作系统
    《失恋33天》从绝境中走出来的故事
    爱的世界很拥挤,写在读《爱,就这么简单》之后
  • 原文地址:https://www.cnblogs.com/purple5252/p/13858387.html
Copyright © 2011-2022 走看看