zoukankan      html  css  js  c++  java
  • Flink架构分析之RPC详解

    主要抽象

    Flink RPC 框架主要抽象了RpcService,RpcEndpoint,RpcGateway,RpcServer这几个接口,具体实现可以采用多种方式,比如:akka,netty

    RpcService

    我理解为RPC框架的引擎,可以用来启动、停止、连接一个RpcEndpoint,以及执行某些异步任务或者周期性调度任务。

    主要方法:

    • connect:连接到一个RpcEndpoint,返回一个RpcGateway,然后调用者可以使用此gateway进行远程方法调用。
    • startServer:启动一个RpcEndpoint,返回一个RpcServer
    • fenceRpcServer:获取新的RpcServer,可用于重新选主后,更新fencingToken。
    • stopServer: 停止某个RpcEndpoint
    • scheduleRunnable:延迟调度执行某任务。
    • execute:异步执行某任务。

    RpcEndpoint

    所有提供远程调用的组件都会继承此抽象类并实现组件自身提供的业务方法,并且保证同一个RpcEndpoint上的所有远程调用都在同一个线程中执行。

    RpcGateway

    用于远程调用RpcEndpoint的某些方法。可以理解为客服端代理。

    RpcServer

    RpcServicestartServer返回的对象,相当于RpcEndpoint自身的代理对象(self gateway)。通过RpcEndpointgetSelfGateway方法获取其自身的gateway对象然后调用该endpoint的方法。

    akka实现

    AkkaRpcService

    基于akka实现的RpcService,其startServer可以创建一个Akka actor用于接收来自RpcGateway的远程调用消息。

    AkkaRpcActor

    akka actor 继承UntypedActor能处理RpcInvocation,RunAsync,CallAsync,ControlMessages,RemoteHandshakeMessage等业务消息。其包含了具体的RpcEndpoint,收到RpcInvocation后会调用具体实现类RpcEndpoint的相应方法。

    	private Method lookupRpcMethod(final String methodName, final Class<?>[] parameterTypes) throws NoSuchMethodException {
    		return rpcEndpoint.getClass().getMethod(methodName, parameterTypes);
    	}
    
    	rpcMethod = lookupRpcMethod(methodName, parameterTypes);
    	rpcMethod.setAccessible(true);
    	rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
    

    AkkaInvocationHandler

    实现了java动态代理InvocationHandler接口,以及RpcServerRpcGatewayAkkaRpcServiceconnect,startServer方法都会使用该类创建返回给调用者进行远程调用的代理对象。该类会引用akka的ActorRef并把远程过程调用包装为RpcInvocation消息发送给对应的AkkaRpcActor

    核心逻辑

    注意RpcEndpoint启动时需要执行的业务逻辑应该重写其onStart回调方法:

    	protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
    		this.rpcService = checkNotNull(rpcService, "rpcService");
    		this.endpointId = checkNotNull(endpointId, "endpointId");
    		//构造函数中已经创建好akka actor
    		this.rpcServer = rpcService.startServer(this);
    
    		this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
    	}
    	// start方法调用rpcServer的start方法
    	public final void start() {
    		rpcServer.start();
    	}
    	// 用户实现的启动时执行的回调方法
    	public void onStart() throws Exception {}
    

    在akka是实现中rpcServer发消息给actor通知其改变状态开始处理远程调用

    	@Override
    	public void start() {
    		rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
    	}
    

    实例讲解之Dispatcher

    DispatcherGateway

    对外提供了一些列管理job的接口,比如提交jobsubmitJob,查询job列表listJobs等。

    Dispatcher

    重载了onStart方法,启动了JobGraphStore监听器,master选举服务,注册了一个统计正在运行的job数量的metric

    然后就是实现具体的业务逻辑,比如submitJob:

  • 相关阅读:
    帝国cms在任意位置调用指定id的栏目名称和链接
    Sublime Text 2中前端必备的常用插件
    sublime text 2代码片段(Snippet)功能的使用
    写好PPT的四大要点
    解码郭台铭语录,50句你应该知道的“郭台铭语录”
    java.sql.SQLSyntaxErrorException: Table 'demo.hibernate_sequence' doesn't exist
    Caused by: java.lang.IllegalArgumentException: Not a managed type: class com.example.demo.domain.DeptInfo
    com.mysql.jdbc.Driver 和 com.mysql.cj.jdbc.Driver的区别
    Spring Boot引入Lombok
    Spring Boot(二)jpa操作数据库
  • 原文地址:https://www.cnblogs.com/andyhe/p/10579604.html
Copyright © 2011-2022 走看看