zoukankan      html  css  js  c++  java
  • Flink源码解析(三)——从RM与TM的心跳交互分析Flink心跳机制

    0. 说明

    基于Flink 1.12

    1. 背景知识

    1.1 Actor模型

    Flink底层RPC是通过AKKA实现的,AKKA是基于Actor模型实现的框架。下面,将大致介绍一下actor模型。
    在Actor模型中,一切事物都是actor,一个actor是一个基本的计算单元,每个actor是完全隔离的,不会共享内存,也就不会有共享数据带来的并发问题;它们是自己维护自身的状态,该状态不会被其他actor直接修改。
    整体模型大致是:多个actor同时运行,每个actor接收消息,并根据消息做出相应的反应。消息本身是通过异步的形式发送给actor的,消息会被存储在一个叫做“邮箱(mailbox)”的地方,actor会顺序的处理收到的信息,避免锁的使用。从描述可以了解到actor模型中,消息的发送者和已发送消息解耦,是以并发的形式处理数据的。

    1.2 RPC

    RPC作用是让远程调用像本地调用,封装调用的细节。
    Flink定义了各个组件的Gateway,通过回调的方式隐藏实现细节,将业务本身和通信解绑了,方便RPC调用。目前,Flink的RPC请求的底层通信是通过AKKA的实现的。

    1.2.1 RPC相关的接口

    • RPCGateway
      所有Rpc组件的网关,定义了各组件的Rpc接口,提供了获取地址和主机名的功能;
    • RPCEndpoint
      RPCEndpoint是Flink RPC调用的基类,所有具有分布式调用能力的组件都需要继承该接口。
    • RpcService
      RPC服务提供者,提供开始、停止服务等功能,以及提供远程功能;
      三者的关系如下:
      public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsyn{
      //启动sever和获取RPC Gateway
      /** RPC service to be used to start the RPC server and to obtain rpc gateways. */
      private final RpcService rpcService;
      //RpcServer用于启动和连接到RpcEndpoint, 连接到rpc服务器将返回一个RpcGateway,为RpcService提供RPC服务/连接远程Server
      /** Interface to access the underlying rpc server. */
      protected final RpcServer rpcServer;
      }
      

    2. Flink心跳机制

    2.1 核心接口

    2.1.1 HeartbeatTarget

    是可以发送心跳和请求心跳相应组件接口,是对具备心跳能力对象的一种抽象。
    HeartbeatTarget的函数具备以下两种动作:

    • receiveHeartbeat 向某个节点发送心跳响应,其参数heartbeatOrigin就是该节点;
    • requestHeartbeat
      要求某个节点发送心跳信息,其参数requestOrigin就是心跳信息上报的节点。

    2.1.2 HeartbeatMonitor

    HeartbeatMonitor管理HeartbeatTarget的心跳状态。当在指定时间内未收到心跳信息时,monitor将会通知对应的HeartbeatListener,收到心跳信息后会重置其定时器。其工厂接口如下:

         HeartbeatMonitor<O> createHeartbeatMonitor(
                    ResourceID resourceID,
                    HeartbeatTarget<O> heartbeatTarget,
                    ScheduledExecutor mainThreadExecutor,
                    HeartbeatListener<?, O> heartbeatListener,  //用于处理心跳信息
                    long heartbeatTimeoutIntervalMs);
    

    2.1.3 HeartbeatListener

    HeartbeatListener是和HeartbeatManager交互的接口,Flink的业务的处理逻辑需要继承该接口以处理心跳结果,其三个回调函数如下:

    • notifyHeartbeatTimeout :通知心跳超时;
    • reportPayload:处理节点发来的Payload载荷;
    • retrievePayLoad:获取对某节点发下一次心跳请求的Payload载荷

    2.1.4 HeartbeatManager

    心跳的管理者,用于开始/停止对HeartbeatTarget的心跳监控,以及会处理某个节点的心跳超时。
    HeartbeatManager继承了HeartbeatTarget,其具有了HeartbeatTarget的函数功能以外,该接口还有以下四种函数:

    • monitorTarget
      开始监控HeartbeatTarget,HeartbeatTarget的心跳超时后,将会通知HeartbeatListener;
    • unmonitorTarget 停止监控某节点;
    • stop 停止HeartbeatManager;
    • getLastHeartbeatFrom 返回特定节点最近一次心跳信息;

    核心接口交互的大致过程:HeartbeatManager将HeartbeatTarget放入到监控列表中,当心跳超时时,HeartbeatMonitor回通知HeartbeatListener处理,通过对HeartbeatListener的实现,完成相关处理心跳超时的逻辑。

    2.2. 核心接口的实现

    下面通过分析1.3.1中核心接口的实现类,来具体分析心跳处理的过程。

    2.2.1 HearbeatManagerImpl

    该manager维护了一个heartbeat 的监控对象(HeartbeatMonitor)和资源ID信息,当收到新的心跳信息是,monitor对象将会被更新;心跳超时时,将会通知HeartbeatListenter对象。

    public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
      //心跳间隔
      /** Heartbeat timeout interval in milli seconds. */
      private final long heartbeatTimeoutIntervalMs;
      //心跳
      /** Heartbeat listener with which the heartbeat manager has been associated. */
      private final HeartbeatListener<I, O> heartbeatListener;
      //使用一个map存放资源-心跳的monitor信息,其monitorTarget方法就是将对应信息放入该map中
      /** Map containing the heartbeat monitors associated with the respective resource ID. */
      private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;
    
      /** Running state of the heartbeat manager. */
      protected volatile boolean stopped;
    

    HearbeatManagerImpl实现的主要函数有:

    • monitorTarget将一个节点加入监控列表中
      该方法会根据ResourceID和HeartbeatTarget生成一个HeartbeatMonitor对象,然后将resourceID和该对象组成KV的形式放入heartbeatTargets中。
    • requestHeartbeat
      心跳请求方调用requestHeartbeat要求上报一个心跳信息,该请求会通过RPC异步调用到心跳的上报方(HearbeatManagerImpl的创建者)的requestHeartbeat,以要求上报方向requestOrigin节点发起一个心跳响应。具体过程如下:
      • requestHeartbeat会记录下这个请求时间点,然后取消超时,重新创建一个ScheduleFuture去判断requestOrigin的心跳是否超时。后续若是超时了,则将heartbeatMonitor的state置为timeout状态,若是特定时间内requestOrigin响应了,则ScheduleFuture取消,monitor的状态依旧为RUNNING。
      • 调用heartbeatListener#reportPayload处理心跳信息,其具体过程依据具体的实现。
      • 最后调用receiveHearbeat函数,响应一个心跳给请求方。

    2.2.2 HeartbeatManagerSenderImpl

    继承于HearbeatManagerImpl,HeartbeatManagerSenderImpl向其监控的heartbeatTarget对象请求心跳的响应,属于主动触发心跳请求。实现了Runnable接口,在其run方法中,会遍历heartbeatMonitor,通过requestHeartbeat()方法向节点获取心跳信息。

      public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {
        @Override
        public void run() {
            if (!stopped) {
                log.debug("Trigger heartbeat request.");
                for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
                    requestHeartbeat(heartbeatMonitor);
                }
                // 周期性调度,事件周期可配
                getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
            }
        }
        // 主动发起心跳请求
        private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
            O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
            final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
            // 调用Target的 requestHeartbeat函数
            heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
        }
    
    }
    

    2.2.3 HeartbeatMonitorImpl

    HeartbeatMonitor管理心跳目标,它在初始化会启动一个ScheduledExecutor。

    • timeout超时会通知heartbeatListener执行响应的超时逻辑;
    • 在规定时间内收到心跳信息,会重置ScheduledExecutor,重新开始;
    public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {
    
        /** Resource ID of the monitored heartbeat target. */
        private final ResourceID resourceID; //监控的资源ID
        /** Associated heartbeat target. */
        private final HeartbeatTarget<O> heartbeatTarget;  //心跳目标
    
        private final ScheduledExecutor scheduledExecutor;
        /** Listener which is notified about heartbeat timeouts. */
        private final HeartbeatListener<?, ?> heartbeatListener;
    
        HeartbeatMonitorImpl(
                ResourceID resourceID,
                HeartbeatTarget<O> heartbeatTarget,
                ScheduledExecutor scheduledExecutor,
                HeartbeatListener<?, O> heartbeatListener,
                long heartbeatTimeoutIntervalMs) {
    
            this.resourceID = Preconditions.checkNotNull(resourceID);
            this.heartbeatTarget = Preconditions.checkNotNull(heartbeatTarget);
            this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
            this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener);
    
            Preconditions.checkArgument(
                    heartbeatTimeoutIntervalMs > 0L,
                    "The heartbeat timeout interval has to be larger than 0.");
            this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
    
            lastHeartbeat = 0L;
            //初始化的时候,就启动一个定时任务
            resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
        }
        
        @Override
        public void run() {
            // The heartbeat has timed out if we're in state running
            if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
              //通知heartbeatListener处理
                heartbeatListener.notifyHeartbeatTimeout(resourceID);
            }
        }
    
        void resetHeartbeatTimeout(long heartbeatTimeout) {
            if (state.get() == State.RUNNING) {
                cancelTimeout();
                //重新开启新的定时任务
                futureTimeout =
                        scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);
    
                // Double check for concurrent accesses (e.g. a firing of the scheduled future)
                if (state.get() != State.RUNNING) {
                    cancelTimeout();
                }
            }
        }
    
      }
    
    

    2.2.4 HeartbeatServices

    HeartbeatServices为所有需要心跳服务的创建heartbeat receivers and heartbeat senders。

      public class HeartbeatServices {
          /**
         * 创建 heartbeat receivers
         * Creates a heartbeat manager which does not actively send heartbeats.
         */
          public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
                ResourceID resourceId,
                HeartbeatListener<I, O> heartbeatListener,
                ScheduledExecutor mainThreadExecutor,
                Logger log) {
    
            return new HeartbeatManagerImpl<>(
                    heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log);
        }
         /**
         * 创建 heartbeat sender
         * Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
         */
            public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
                ResourceID resourceId,
                HeartbeatListener<I, O> heartbeatListener,
                ScheduledExecutor mainThreadExecutor,
                Logger log) {
    
            return new HeartbeatManagerSenderImpl<>(
                    heartbeatInterval,
                    heartbeatTimeout,
                    resourceId,
                    heartbeatListener,
                    mainThreadExecutor,
                    log);
        }
            // 从配置文件配置心跳间隔时间和心跳超时时间
        //两者的关系 0 < 心跳间隔时间 < 心跳超时时间
            public static HeartbeatServices fromConfiguration(Configuration configuration) {
            long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
    
            long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);
    
            return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
        }
      }
    

    3. RM和TM的交互

    3.1. 总述

    在一个Flink集群中只有一个ResourceManager(RM),和一个或多个TaskManager(TM)。两者的交互过程为:TM启动时会向RM注册,注册成功之后,RM会主动要求TM上报心跳信息。通过RM和TM的心跳信息,双方知道对方是否存活。
    在2.2.4小节总,我们知道HeartbeatManagerSenderImpl属于Sender,HeartbeatManagerImpl属于Receiver。sender要对心跳目标上报心跳信息,receiver收到信息请求后返回一个response。

    3.2. 初始化过程

    3.2.1 ResourceManager

    • RM启动
      public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
            extends FencedRpcEndpoint<ResourceManagerId>
            implements ResourceManagerGateway, LeaderContender {
              // RM启动时运行的方法
        @Override
        public final void onStart() throws Exception {
            try {
              // 启动RMServices
                startResourceManagerServices();
            } catch (Throwable t) {
                final ResourceManagerException exception =
                        new ResourceManagerException(
                                String.format("Could not start the ResourceManager %s", getAddress()),
                                t);
                onFatalError(exception);
                throw exception;
            }
        }  
    }
    
    • leaderElectionService#start方法
      leaderElectionService#start方法有多个实现,其中,主要是DefaultLeaderElectionService和StandaloneLeaderElectionService,前者是依赖外部组价的,这里我们以standalone模式分析。
      在standalone模式下,Flink集群中的leader是通过配置文件配置,所以在调用启动leader选举方法时,会直接将leadership赋给指定的节点,在赋予leadership角色过程会初始化心跳服务,大致的流程如下:
      StandaloneLeaderElectionService#start
      |
      ResourceManager#grantLeadership
      |
      ResourceManager#tryAcceptLeadership
      |
      ResourceManager#startServicesOnLeadership  //其具体实现如下
      
      private void startServicesOnLeadership() {
        //启动心跳服务
          startHeartbeatServices();
          //slotManager是RM中管理slot的组件,其具体过程后续博客分析
          slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
          //周期性判断是否存在未满足的slot请求
          onLeadership();
      }
      
      启动心跳服务,就是创建分别创建了taskManagerHeartbeatManager和jobManagerHeartbeatManager用于RM和TM、RM和JM的心跳服务
          private void startHeartbeatServices() {
            taskManagerHeartbeatManager =
                    heartbeatServices.createHeartbeatManagerSender(
                            resourceId,
                            new TaskManagerHeartbeatListener(),
                            getMainThreadExecutor(),
                            log);
    
            jobManagerHeartbeatManager =
                    heartbeatServices.createHeartbeatManagerSender(
                            resourceId,
                            new JobManagerHeartbeatListener(),
                            getMainThreadExecutor(),
                            log);
        }
    

    结合2.2.2小节,RM在心跳服务在和TM与JM的心跳过程中,充当的是请求心跳请求的发起方,即RM是主动去拉取心跳信息的。

    3.2.2 TaskExecutor

    TaskExecutor在创建时,就初始化了心跳组件。

      public TaskExecutor(
                RpcService rpcService,
                TaskManagerConfiguration taskManagerConfiguration,
                HighAvailabilityServices haServices,
                TaskManagerServices taskExecutorServices,
                ExternalResourceInfoProvider externalResourceInfoProvider,
                HeartbeatServices heartbeatServices,
                TaskManagerMetricGroup taskManagerMetricGroup,
                @Nullable String metricQueryServiceAddress,
                BlobCacheService blobCacheService,
                FatalErrorHandler fatalErrorHandler,
                TaskExecutorPartitionTracker partitionTracker,
                BackPressureSampleService backPressureSampleService) {
            //创建HeartbeatManagerImpl,对JM的心跳进行相应
            this.jobManagerHeartbeatManager =
                    createJobManagerHeartbeatManager(heartbeatServices, resourceId);
            // 创建HeartbeatManagerImpl,对RM的心跳进行相应
            this.resourceManagerHeartbeatManager =
                    createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
        }
    

    3.3 TM向RM注册过程

    3.3.1 流程图

    • TaskExecutor的启动过程如下:
      TaskExecutor#onStart
      |
      TaskExecutor#startTaskExecutorServices
      |
      StandaloneLeaderRetrievalService#start  //以standalone模式分析
      |
      |//在standalone模式下,已知晓JobManager的地址,会直接去链接RM
      TaskExecutor.ResourceManagerLeaderListener#notifyLeaderAddress 
      |
      TaskExecutor#notifyOfNewResourceManagerLeader
      |
      TaskExecutor#reconnectToResourceManager
      | 
      |//在该方法中会主动调用TaskExecutorToResourceManagerConnection类的start方法去链接RM
      TaskExecutor#connectToResourceManager  
      |
      | //在该函数的createNewRegistration方法中的回调函数,处理注册成功后的逻辑
      RegisteredRpcConnection#start
      |
      |//z在该方法中会先链接RM,然后连接成功后发起注册请求
      RetryingRegistration#startRegistration
      |
      RetryingRegistration#register
      |
      TaskExecutorToResourceManagerConnection#invokeRegistration
    
    

    到此,TM向RM发起了注册,通过AKKA RPC,请求来到了RM中。

    • ResourceManager处理逻辑
      ResourceManager#registerTaskExecutor
      |
      |// 该方法的返回值是RegistrationResponse,在该方法中会将调用taskManagerHeartbeatManager.monitorTarget,监控节点的心跳信息
      ResourceManager#registerTaskExecutorInternal
      |
      return new TaskExecutorRegistrationSuccess(
                        registration.getInstanceID(), resourceId, clusterInformation)
    
    • 注册成功后TaskExecutor的逻辑
      //注册成功后将会走start方法中createNewRegistration创建registration时的回调函数
      RegisteredRpcConnection#start
      |
      TaskExecutorToResourceManagerConnection#onRegistrationSuccess
      |
      TaskExecutor#onRegistrationSuccess
      |
      | //和RM建立联系,开始监控RM
      TaskExecutor#establishResourceManagerConnection
      |
      resourceManagerHeartbeatManager#monitorTarget
    

    3.3.2 具体分析

    下面主要分析心跳交互过程。

    • TaskExecutor的启动过程
      • 启动TaskExecutor服务第一步会与RM取得联系,并在此注册一个ResourceManagerLeaderListener用来监控RM leader的变化;
      • 获取RM leader的地址信息后,会调用listener的notifyLeaderAddress方法,该方法会异步的调用notifyOfNewResourceManagerLeader方法;
      • 在notifyOfNewResourceManagerLeader方法中,会根据了leader的地址调用reconnectToResourceManager方法去链接RM;
      • 通过执行resourceManagerConnection#start()去和RM建立链接,该方法是调用其父类RegisteredRpcConnection的start()方法,在该方法中,会定义链接成功的处理逻辑,具体代码如下:
        public void start() {
          checkState(!closed, "The RPC connection is already closed");
          checkState(
                  !isConnected() && pendingRegistration == null,
                  "The RPC connection is already started");
          //会在创建newRegistration时,定义链接成功后处理逻辑
          final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();
    
          if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
              // 启动注册
              newRegistration.startRegistration();
          } else {
              // concurrent start operation
              newRegistration.cancel();
          }
      }
    
    • RetryingRegistration#startRegistration方法中,我们可以看到在TM向RM注册之前,会尝试链接RM,当链接成功之后才会TM才会向RM注册;

    在此过程中有好多回调,需要慢慢的体会。

    • ResourceManager处理逻辑
      通过RPC调用,请求来到了RM中,RM的处理具体如下:
      • 调用ResourceManager#registerTaskExecutor方法,在方法RM会首先去链接TM,若是成功了,会注册一个新的TM;
      • 调用来到ResourceManager#registerTaskExecutorInternal中,在该方法中,若是该TM的ID已经存在则删除,重新以KV的形式将注册信息放入到RM的TM注册列表中;
      • 通过taskManagerHeartbeatManager#monitorTarget监控TM,并返回一个注册成功信息;
    • TM的处理过程
      注册成功之后,调用回到TaskExecutor。
      • 回调TaskExecutorToResourceManagerConnection#onRegistrationSuccess方法;
      • 通过异步的形式,和RM建立链接并监控RM;
      • 通过resourceManagerHeartbeatManager.monitorTarget 把RM注册到TM中;

    3.4 TM和RM的心跳过程

    3.4.1 RM请求心跳的过程

    由RM的初始化的分析,我们了解到,RM会主动要求TM上报心跳,其过程如下:

      // 在该该方法中会创建一个HeartbeatManagerSenderImpl
      ResourceManager#startHeartbeatServices
      |
      | //这里会一步步调用构造函数中,在该构造函数中,会将心跳检查加入周期性任务列表中
      HeartbeatManagerImpl
      |
      | //在任务启动时,会调用HeartbeatManagerSenderImpl的run方法,在该方法中会循环遍历HeartbeatMonitor,通过requestHeartbeat要求target上报心跳信息  
      HeartbeatManagerSenderImpl#run
      |
      |  //该调用会跑到ResourceManager#TaskManagerHeartbeatListener中,这里返回为null是因为RM不是任何组件的receiver,即不会有组件向RM请求心跳信息,并要求其返回心跳。
      getHeartbeatListener().retrievePayload
      |
      |  //这里会调用TM向RM注册时指定的requestHeartbeat
      heartbeatTarget.requestHeartbeat
      |
      |   //ResouceManager#registerTaskExecutorInternal
      taskExecutorGateway.heartbeatFromResourceManager
    

    3.4.2 TM处理心跳请求

    通过RPC调用,请求来到了TM中,其过程如下:

      TaskExecutor#heartbeatFromResourceManager
      |
      HeartbeatManagerImpl#requestHeartbeat
      |
      HeartbeatMonitorImpl#reportHeartbeat
      |
      | //在该方法中,判断若是running则会取消之前的Timeout定时任务ScheduledFuture,重新开始检查是否timeout超时的定时任务。
      HeartbeatMonitorImpl#resetHeartbeatTimeout
      |
      |  //因为从RM发来的请求中heartbeatPayload为null,则TM直接走返回心跳反应的流程
      HeartbeatMonitorImpl#reportHeartbea->heartbeatTarget.receiveHeartbeat
      |
      | //这里生成TM的心跳信息,包括slot信息
      TaskExecutor.ResourceManagerHeartbeatListener#retrievePayload
      |
      |  //通过在TM向RM注册过程中定义的receiveHeartbeat方法来实现调用RM中方法
      TaskExecutor#establishResourceManagerConnection-> resourceManagerGateway.heartbeatFromTaskManager
    
    

    3.4.3 RM处理周期性心跳信息过程

    RM收到TM的心跳信息,主要做了两件事:重置RM的Monitor线程;解析TM上报信息

      ResourceManager#heartbeatFromTaskManager
      |
      HeartbeatManagerImpl#reportHeartbeat
      |
      | //和TM一样,重置了monitor线程
      HeartbeatManagerImpl#reportHeartbeat->reportHeartbeat
      |
      | //在该方法中处理上报的slot信息,
      ResourceManager.TaskManagerHeartbeatListener#reportPayload
    

    周期性心跳的具体分析过程见上述流程中的注释。

    4. 参考文章

  • 相关阅读:
    雷林鹏分享:EJB回调
    雷林鹏分享:EJB注解
    雷林鹏分享:EJB持久性
    雷林鹏分享:EJB有状态会话Bean
    雷林鹏分享:EJB无状态Bean
    雷林鹏分享:EJB创建应用
    雷林鹏分享:EJB概述
    雷林鹏分享:EJB教程
    Java集合框架
    Java集合源码剖析——ArrayList源码剖析
  • 原文地址:https://www.cnblogs.com/love-yh/p/15037809.html
Copyright © 2011-2022 走看看