zoukankan      html  css  js  c++  java
  • Flink学习笔记:Flink Runtime

     

    本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程:

    Flink大数据项目实战:http://t.cn/EJtKhaz

    1. Flink运行时架构

    1.1Flink架构

    Flink 运行时架构主要包含几个部分:Client、JobManager(master节点)和TaskManger(slave节点)。

     

    Client:Flink 作业在哪台机器上面提交,那么当前机器称之为Client。用户开发的Program 代码,它会构建出DataFlow graph,然后通过Client提交给JobManager。

    JobManager:是主(master)节点,相当于YARN里面的REsourceManager,生成环境中一般可以做HA 高可用。JobManager会将任务进行拆分,调度到TaskManager上面执行。

    TaskManager:是从节点(slave),TaskManager才是真正实现task的部分。

    Client提交作业到JobManager,就需要跟JobManager进行通信,它使用Akka框架或者库进行通信,另外Client与JobManager进行数据交互,使用的是Netty框架。Akka通信基于Actor System,Client可以向JobManager发送指令,比如Submit job或者Cancel /update job。JobManager也可以反馈信息给Client,比如status updates,Statistics和results 

    Client提交给JobManager的是一个Job,然后JobManager将Job拆分成task,提交给TaskManager(worker)。JobManager与TaskManager也是基于Akka进行通信,JobManager发送指令,比如Deploy/Stop/Cancel Tasks或者触发Checkpoint,反过来TaskManager也会跟JobManager通信返回Task Status,Heartbeat(心跳),Statistics等。另外TaskManager之间的数据通过网络进行传输,比如Data Stream做一些算子的操作,数据往往需要在TaskManager之间做数据传输。

    1.2. TaskManger Slot

    TaskManager是进程,他下面运行的task(整个Flink应用是Job,Job可以拆分成很多个task)是线程,每个task/subtask(线程)下可运行一个或者多个operator,即OperatorChain。Task是class,抽象的,subtask是Object(类比学习),具体的。

    一个TaskManager通过Slot(任务槽)来控制它上面可以接受多少个task,比如一个TaskManager划分了3个Task Slot(仅限内存托管,目前CPU未做隔离),它只能接受3个task。Slot均分TaskManager所托管的内存,比如一个TaskManager有6G内存,那么每个Slot分配2G。

    同一个TaskManager中的task共享TCP连接(通过多路复用)和心跳消息。它们还可以共享数据集和数据结构,从而减少每个任务的开销。一个TaskManager有N个槽位只能接受N个Task吗?不是,后面会讲共享槽位。

    1.3. OperatorChain && Task

    为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。以wordcount为例,解析不同视图下的数据流,如下图所示。

     

    数据流(逻辑视图)

    创建Source(并行度设置为1)读取数据源,数据经过FlatMap(并行度设置为2)做转换操作,然后数据经过Key Agg(并行度设置为2)做聚合操作,最后数据经过Sink(并行度设置为2)将数据输出。

    数据流(并行化视图)

    并行度为1的Source读取数据源,然后FlatMap并行度为2读取数据源进行转化操作,然后数据经过Shuffle交给并行度为2的Key Agg进行聚合操作,然后并行度为2的Sink将数据输出,未优化前的task总和为7。

    数据流(优化后视图)

    并行度为1的Source读取数据源,然后FlatMap并行度为2读取数据源进行转化操作,然后数据经过Shuffle交给Key Agg进行聚合操作,此时Key Agg和Sink操作合并为一个task(注意:将KeyAgg和Sink两个operator进行了合并,因为这两个合并后并不会改变整体的拓扑结构),它们一起的并行度为2,数据经过Key Agg和Sink之后将数据输出,优化后的task总和为5.

    1.4. OperatorChain的优点和组成条件

    OperatorChain的优点

    1.减少线程切换

    2.减少序列化与反序列化

    3.减少数据在缓冲区的交换

    4.减少延迟并且提高吞吐能力

    OperatorChain 组成条件

    1.没有禁用Chain

    2.上下游算子并行度一致 。

    3.下游算子的入度为1(也就是说下游节点没有来自其他节点的输入)。

    4.上下游算子在同一个slot group(后面紧跟着就会讲如何通过slot group先分配到同一个solt,然后才能chain) 。

    5.下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)。

    6.上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)。

    7.上下游算子之间没有数据shuffle (数据分区方式是 forward)。

    1.5. 编程改变OperatorChain行为

    Operator chain的行为可以通过编程API中进行指定,可以通过在DataStream的operator后面(如someStream.map(..))调用startNewChain()来指示从该operator开始一个新的chain(与前面截断,不会被chain到前面)。可以调用disableChaining()来指示该operator不参与chaining(不会与前后的operator chain一起)。可以通过调用StreamExecutionEnvironment.disableOperatorChaining()来全局禁用chaining。可以设置Slot group,例如someStream.filter(...).slotSharingGroup(“name”)。可以通过调整并行度,来调整Operator chain。

    2. Slot分配与共享

    2.1共享Slot

     

    默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。

    允许slot共享有以下两点好处:

    1.Flink集群需要的任务槽与作业中使用的最高并行度正好相同(前提,保持默认SlotSharingGroup)。也就是说我们不需要再去计算一个程序总共会起多少个task了。

    2.更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有slot共享,将task的2个并行度增加到6个,能充分利用slot资源,同时保证每个TaskManager能平均分配到重的subtasks。

    2.2共享Slot实例

     

    将 WordCount 的并行度从之前的2个增加到6个(Source并行度仍为1),并开启slot共享(所有operator都在default共享组),将得到如上图所示的slot分布图。

    首先,我们不用去计算这个job会其多少个task,总之该任务最终会占用6个slots(最高并行度为6)。其次,我们可以看到密集型操作 keyAggregation/sink 被平均地分配到各个 TaskManager。

    2.3 SlotSharingGroup(soft)

    SlotSharingGroup是Flink中用来实现slot共享的类,它尽可能地让subtasks共享一个slot。

    保证同一个group的并行度相同的sub-tasks 共享同一个slots。算子的默认group为default(即默认一个job下的subtask都可以共享一个slot)

    为了防止不合理的共享,用户也能通过API来强制指定operator的共享组,比如:someStream.filter(...).slotSharingGroup("group1");就强制指定了filter的slot共享组为group1。怎么确定一个未做SlotSharingGroup设置算子的SlotSharingGroup什么呢(根据上游算子的group 和自身是否设置group共同确定)。适当设置可以减少每个slot运行的线程数,从而整体上减少机器的负载。

     

    2.4 CoLocationGroup(强制)

    CoLocationGroup可以保证所有的并行度相同的sub-tasks运行在同一个slot,主要用于迭代流(训练机器学习模型)。

    3. Slot & parallelism的关系

    3.1 Slots && parallelism

     

    如上图所示,有两个TaskManager,每个TaskManager有3个槽位。假设source操作并行度为3,map操作的并行度为4,sink的并行度为4,所需的task slots数与job中task的最高并行度一致,最高并行度为4,那么使用的Slot也为4。

    3.2如何计算Slot

    如何计算一个应用需要多少slot?

     

    如果不设置SlotSharingGroup,那么需要的Slot数为应用的最大并行度数。如果设置了SlotSharingGroup,那么需要的Slot数为所有SlotSharingGroup中的最大并行度之和。比如已经强制指定了map的slot共享组为test,那么map和map下游的组为test,map的上游source的组为默认的default,此时default组中最大并行度为10,test组中最大并行度为20,那么需要的Slot=10+20=30。

    4.Flink部署模式

    4.1 Local 本地部署

    Flink 可以运行在 Linux、Mac OS X 和 Windows 上。本地模式的安装唯一需要的只是 Java 1.7.x或更高版本,本地运行会启动Single JVM,主要用于测试调试代码。

    4.2 Standalone Cluster集群部署

    软件需求

    1.安装Java1.8或者更高版本

    2.集群各个节点需要ssh免密登录

    Flink Standalone 运行流程前面已经讲过,这里就不在赘叙。

    4.3Flink ON YARN

     

    Flink ON YARN工作流程如下所示:

    首先提交job给YARN,就需要有一个Flink YARN Client。

    第一步:Client将Flink 应用jar包和配置文件上传到HDFS。

    第二步:Client向REsourceManager注册resources和请求APPMaster  Container

    第三步:REsourceManager就会给某一个Worker节点分配一个Container来启动APPMaster,JobManager会在APPMaster中启动。

    第四步:APPMaster为Flink的TaskManagers分配容器并启动TaskManager,TaskManager内部会划分很多个Slot,它会自动从HDFS下载jar文件和修改后的配置,然后运行相应的task。TaskManager也会与APPMaster中的JobManager进行交互,维持心跳等。

    5.Flink Standalone集群部署

    安装Flink之前需要提前安装好JDK,这里我们安装的是JDK1.8版本。

     

    5.1下载

    可以到官网:https://archive.apache.org/dist/flink/ 将Flink1.6.2版本下载到本地。

     

    5.2解压

    将下载的flink-1.6.2-bin-hadoop26-scala_2.11.tgz上传至主节点

     

    使用tar -zxvf flink-1.6.2-bin-hadoop26-scala_2.11.tgz命令解压flink安装包

     

    方便后期flink多版本的使用,可以创建flink软连接

    ln -s flink-1.6.2 flink

     

    5.3配置环境变量

    vi ~/.bashrc

    export FLINK_HOME=/home/hadoop/app/flink

    export PATH=$FLINK_HOME/bin:$PATH

    使配置文件生效

    source ~/.bashrc

    查看flink版本

    flink -v

    5.4修改配置文件

    1.修改flink-conf.yaml配置文件

    vi flink-conf.yaml

    #JobManager地址

    jobmanager.rpc.address: cdh01

    #槽位配置为3

    taskmanager.numberOfTaskSlots: 3

    #设置并行度为3

    parallelism.default: 3

    2.修改masters配置

    vi masters

    cdh01:8081

    3.修改slaves配置

    vi slaves

    cdh01

    cdh02

    cdh03

    5.5主节点安装目录同步到从节点

    通过deploy.sh脚本将flink安装目录同步到其他节点。

    deploy.sh flink-1.6.2 /home/hadoop/app/ slave

    在从节点分别创建flink软连接

    ln -s flink-1.6.2 flink

    5.6启动服务

    进入flink bin目录执行启动集群脚本start-cluster.sh

    bin/start-cluster.sh

     

    通过web查看flink集群,查看相关集群信息。

    http://cdh01:8081

    5.7测试运行

    查看官网案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/

    1.启动nc服务

    nc -l 9000

    2.提交flink作业

    bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

    3.输入测试数据

     

    4.查看运行结果

    在TaskManager界面查看Flink运行结果

     

  • 相关阅读:
    关于WP7的Loaded事件[转]
    皮皮书屋的变态验证码
    近期学习内容for mobile
    一个js问题引发的同时吐槽
    powerdesigner 概念模型转物理模型时的丢表问题
    偶的处女文近期学习计划
    web布局实现圆角,兼容所有的浏览器
    最近面试asp.net碰到的一些题
    网站推广心得
    兼容ie6的png格式图片的背景透明问题
  • 原文地址:https://www.cnblogs.com/dajiangtai/p/10730689.html
Copyright © 2011-2022 走看看