主要抽象
Flink RPC 框架主要抽象了RpcService
,RpcEndpoint
,RpcGateway
,RpcServer
这几个接口,具体实现可以采用多种方式,比如:akka
,netty
RpcService
我理解为RPC框架的引擎,可以用来启动、停止、连接一个RpcEndpoint
,以及执行某些异步任务或者周期性调度任务。
主要方法:
connect
:连接到一个RpcEndpoint
,返回一个RpcGateway
,然后调用者可以使用此gateway进行远程方法调用。startServer
:启动一个RpcEndpoint
,返回一个RpcServer
。fenceRpcServer
:获取新的RpcServer
,可用于重新选主后,更新fencingToken。stopServer
: 停止某个RpcEndpoint
。scheduleRunnable
:延迟调度执行某任务。execute
:异步执行某任务。
RpcEndpoint
所有提供远程调用的组件都会继承此抽象类并实现组件自身提供的业务方法,并且保证同一个RpcEndpoint
上的所有远程调用都在同一个线程中执行。
RpcGateway
用于远程调用RpcEndpoint
的某些方法。可以理解为客服端代理。
RpcServer
RpcService
的startServer
返回的对象,相当于RpcEndpoint
自身的代理对象(self gateway)。通过RpcEndpoint
的getSelfGateway
方法获取其自身的gateway对象然后调用该endpoint的方法。
akka实现
AkkaRpcService
基于akka实现的RpcService
,其startServer
可以创建一个Akka actor用于接收来自RpcGateway
的远程调用消息。
AkkaRpcActor
akka actor 继承UntypedActor
能处理RpcInvocation
,RunAsync
,CallAsync
,ControlMessages
,RemoteHandshakeMessage
等业务消息。其包含了具体的RpcEndpoint
,收到RpcInvocation
后会调用具体实现类RpcEndpoint
的相应方法。
private Method lookupRpcMethod(final String methodName, final Class<?>[] parameterTypes) throws NoSuchMethodException {
return rpcEndpoint.getClass().getMethod(methodName, parameterTypes);
}
rpcMethod = lookupRpcMethod(methodName, parameterTypes);
rpcMethod.setAccessible(true);
rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
AkkaInvocationHandler
实现了java动态代理InvocationHandler
接口,以及RpcServer
和RpcGateway
。AkkaRpcService
的connect
,startServer
方法都会使用该类创建返回给调用者进行远程调用的代理对象。该类会引用akka的ActorRef
并把远程过程调用包装为RpcInvocation
消息发送给对应的AkkaRpcActor
。
核心逻辑
注意RpcEndpoint
启动时需要执行的业务逻辑应该重写其onStart
回调方法:
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
//构造函数中已经创建好akka actor
this.rpcServer = rpcService.startServer(this);
this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}
// start方法调用rpcServer的start方法
public final void start() {
rpcServer.start();
}
// 用户实现的启动时执行的回调方法
public void onStart() throws Exception {}
在akka是实现中rpcServer发消息给actor通知其改变状态开始处理远程调用
@Override
public void start() {
rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
}
实例讲解之Dispatcher
DispatcherGateway
对外提供了一些列管理job的接口,比如提交jobsubmitJob
,查询job列表listJobs
等。
Dispatcher
重载了onStart
方法,启动了JobGraphStore监听器,master选举服务,注册了一个统计正在运行的job数量的metric
然后就是实现具体的业务逻辑,比如submitJob
: