与IPC相关的代码在org.apache.hadoop.ipc包下。共七个文件,其中4个辅助类:
RemoteException
Status
VersionedProtocol
ConnectionHeader
主要实现类3个:
Client
Server
RPC
客户端Client:
如上图:
与IPC连接相关的
- Client.Connection
- Client.ConnectionId
- ConnectionHeader
与远程调用Call相关的
- Client.Call
- Client.ParallelCall
- Client.ParallelResults
服务器端Server:
与IPC连接相关的
- Server.Connection
- ConnectionHeader
与远程调用Call相关的
- Server.Call
- Server.Responder
- Server.Listener
- Server.Handler
RPC
RPC是在Server及Client的基础上实现了Hadoop IPC。
与客户端相关的功能:
- RPC.ClientCache
- RPC.Invoker(继承java.lang.reflect.InvocationHandler)
- RPC.Invocation
与服务端相关的功能:
- RPC.Server
Connection
客户端与服务器端对连接的抽象不一样,所以有Server.Connection和Client.Connection。Hadoop远程调用采用TCP协议通信。
1)客户端Client.ConnectionId
连接复用:当多个IPC客户端的ConnectionId相同时,他们共享一个IPC连接。连接复用可以减少Hadoop Server、Client的资源占用,同时节省IPC连接时间。
2)ConnectionHeader
Server与Client间TCP连接建立后交换的第一条信息,包含ConnectionId.ticket(UserGroupInformation)用户信息和IPC接口信息,检验是否实现了IPC接口,以及该用户是否有权使用接口。
Call
建立连接后,即可以进行远程过程调用服务,即对IPC接口方法的调用,源码抽象为Call。
远程调用Client.Call对象和Server.Call对象,是一个IPC调用产生的,存在于IPC客户端(存根)和IPC服务端(骨架)中的实体。
Client.Call对象通过IPC连接到服务器后,自然会构成相应的Server.Call对象。
Client.Call何时产生以及如何产生?
如上图所示流程:
- 用户发起远程接口调用
- 动态代理,RPC.Invoker调用句柄捕获远程调用
- 根据invoke的输入参数method、args生成RPC.Invocation对象
- 并调用Client.call,call会将上一步的Invocation对象序列化并通过IPC连接发送到服务器。Client.call会等待服务端返回的结果。
- 服务器端Listener监听Client发来的连接请求和数据请求,并调用Server端的连接对象。
- 连接对象接收远程调用请求帧,反序列化,并将请求放于阻塞队列中,由Handler处理。
- Handler调用对应的IPC接口实现类,完成过程调用,将结果序列化。
- 如果此时连接的应答队列为空,返回给客户端。
- 否则,客户端比较忙,应答队列不为空,Handler将结果放入响应队列,由Responser通过IPC发送给客户端。
Client.Connection对象,需要通过setupIOstreams方法和服务器建立连接,该方法首先通过Java套接字与server建立Socket连接,如果失败,则进行一定次数的重试,如下代码,是setupIOstreams调用的setupConnection:
private synchronized void setupConnection() throws IOException { short ioFailures = 0; short timeoutFailures = 0; while (true) { try { this.socket = socketFactory.createSocket(); this.socket.setTcpNoDelay(tcpNoDelay); //禁用tcp的Nagle算法,关闭socket底层缓冲 // 配置项 ${ipc.client.tcpnodelay} /* * Bind the socket to the host specified in the principal name of the * client, to ensure Server matching address of the client connection * to host name in principal passed. */ if (UserGroupInformation.isSecurityEnabled()) { KerberosInfo krbInfo = remoteId.getProtocol().getAnnotation(KerberosInfo.class); if (krbInfo != null && krbInfo.clientPrincipal() != null) { String host = SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName()); // If host name is a valid local address then bind socket to it InetAddress localAddr = NetUtils.getLocalInetAddress(host); if (localAddr != null) { this.socket.bind(new InetSocketAddress(localAddr, 0)); } } } // connection time out is 20s NetUtils.connect(this.socket, server, 20000); if (rpcTimeout > 0) { pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval } this.socket.setSoTimeout(pingInterval); return; } catch (SocketTimeoutException toe) { /* Check for an address change and update the local reference. * Reset the failure counter if the address was changed */ if (updateAddress()) { timeoutFailures = ioFailures = 0; } /* The max number of retries is 45, * which amounts to 20s*45 = 15 minutes retries. */ handleConnectionFailure(timeoutFailures++, 45, toe); } catch (IOException ie) { if (updateAddress()) { timeoutFailures = ioFailures = 0; } handleConnectionFailure(ioFailures++, maxRetries, ie); } } }
IPC连接
- 连接建立
- 连接上的数据读写
- 连接维护
- 连接关闭
在接下来的几篇内分别介绍以上内容。