Remote Procedure Call 远程方法调用。不需要了解网络细节,某一程序即可使用该协议请求来自网络内另一台及其程序的服务。它是一个 Client/Server 的结构,提供服务的一方称为Server,消费服务的一方称为Client。
Hadoop 底层的交互都是通过 rpc 进行的。例 如:datanode 和 namenode、tasktracker 和 jobtracker、secondary namenode 和 namenode 之间的通信都是通过 rpc 实现的。
TODO: 此文未写明了。明显需要画 4张图, rpc 原理图,Hadoop rpc 时序图, 客户端 流程图,服端流程图。最好帖几个包图+ 类图(组件图)。待完善。
要实现远程过程调用,需要有3要素: 1、server 必须发布服务 2、在 client 和 server 两端都需要有模块来处理协议和连接 3、server 发布的服务,需要将接口给到 client
Hadoop RPC
- 序列化层。 Client 与 Server 端通讯传递的信息采用实现自 Writable 类型
- 函数调用层。 Hadoop RPC 通过动态代理和 java 反射实现函数调用
- 网络传输层。Hadoop RPC 采用 TCP/IP socket 机制
- 服务器框架层。Hadoop RPC 采用 java NIO 事件驱动模型提高 RPC Server 吞吐量
TODO 缺个 RPC 图
Hadoop RPC 源代码主要在org.apache.hadoop.ipc包下。org.apache.hadoop.ipc.RPC 内部包含5个内部类。
- Invocation :用于封装方法名和参数,作为数据传输层,相当于VO(Value Object)。
- ClientCache :用于存储client对象,用 socket factory 作为 hash key,存储结构为 hashMap 。
- Invoker :是动态代理中的调用实现类,继承了 java.lang.reflect.InvocationHandler。
- Server :是ipc.Server的实现类。
- VersionMismatch : 协议版本。
从客户端开始进行通讯源代码分析
org.apache.hadoop.ipc.Client 有5个内部类
- Call: A call waiting for a value.
- Connection: Thread that reads responses and notifies callers. Each connection owns a socket connected to a remote address. Calls are multiplexed through this socket: responses may be delivered out of order.
- ConnectionId: This class holds the address and the user ticket. The client connections to servers are uniquely identified by
- ParallelCall: Call implementation used for parallel calls.
- ParallelResults: Result collector for parallel calls.
客户端和服务端建立连接的大致执行过程为:
-
在 Object org.apache.hadoop.ipc.RPC.Invoker.invoke(Object proxy, Method method, Object[] args) 方法中调用
client.call(new Invocation(method, args), remoteId); -
上述的 new Invocation(method, args) 是 org.apache.hadoop.ipc.RPC 的内部类,它包含被调用的方法名称及其参数。此处主要是设置方法和参数。 client 为 org.apache.hadoop.ipc.Client 的实例对象。
-
org.apache.hadoop.ipc.Client.call() 方法的具体源代码。在call()方法中 getConnection()内部获取一个 org.apache.hadoop.ipc.Client.Connection 对象并启动 io 流 setupIOstreams()。
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748Writable org.apache.hadoop.ipc.Client.call(Writable param, ConnectionId remoteId) throwsInterruptedException, IOException {
Call call =
new
Call(param);
//A call waiting for a value.
// Get a connection from the pool, or create a new one and add it to the
// pool. Connections to a given ConnectionId are reused.
Connection connection = getConnection(remoteId, call);
// 主要在 org.apache.hadoop.net 包下。
connection.sendParam(call);
//客户端发送数据过程
boolean
interrupted =
false
;
synchronized
(call) {
while
(!call.done) {
try
{
call.wait();
// wait for the result
}
catch
(InterruptedException ie) {
// save the fact that we were interrupted
interrupted =
true
;
}
}
… …
}
}
// Get a connection from the pool, or create a new one and add it to the
// pool. Connections to a given ConnectionId are reused.
private
Connection getConnection(ConnectionId remoteId,
Call call)
throws
IOException, InterruptedException {
if
(!running.get()) {
// the client is stopped
throw
new
IOException(
"The client is stopped"
);
}
Connection connection;
// we could avoid this allocation for each RPC by having a
// connectionsId object and with set() method. We need to manage the
// refs for keys in HashMap properly. For now its ok.
do
{
synchronized
(connections) {
connection = connections.get(remoteId);
if
(connection ==
null
) {
connection =
new
Connection(remoteId);
connections.put(remoteId, connection);
}
}
}
while
(!connection.addCall(call));
//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
connection.setupIOstreams();
// 向服务段发送一个 header 并等待结果
return
connection;
}
-
setupIOstreams() 方法。
123456789101112131415void
org.apache.hadoop.ipc.Client.Connection.setupIOstreams()
throws
InterruptedException {
// Connect to the server and set up the I/O streams. It then sends
// a header to the server and starts
// the connection thread that waits for responses.
while
(
true
) {
setupConnection();
// 建立连接
InputStream inStream = NetUtils.getInputStream(socket);
// 输入
OutputStream outStream = NetUtils.getOutputStream(socket);
// 输出
writeRpcHeader(outStream);
}
… …
// update last activity time
touch();
// start the receiver thread after the socket connection has been set up start();
}
-
启动org.apache.hadoop.ipc.Client.Connection 客户端获取服务器端放回数据过程
1234void
org.apache.hadoop.ipc.Client.Connection.run()
while
(waitForWork()) {
//wait here for work - read or close connection
receiveResponse();
}
ipc.Server源码分析
ipc.Server 有6个内部类:
- Call :用于存储客户端发来的请求
- Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
- ExceptionsHandler: 异常管理
- Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
- Connection :连接类,真正的客户端请求读取逻辑在这个类中。
- Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。
大致过程为:
-
Namenode的初始化时,RPC的server对象是通过ipc.RPC类的getServer()方法获得的。
123456789101112131415void
org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(Configuration conf) throwsIOException
// create rpc server
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if
(dnSocketAddr !=
null
) {
int
serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
this
.serviceRpcServer = RPC.getServer(
this
, dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false
, conf, namesystem.getDelegationTokenSecretManager());
this
.serviceRPCAddress =
this
.serviceRpcServer.getListenerAddress();
setRpcServiceServerAddress(conf);
}
… …
this
.server.start();
//start RPC server
-
启动 server
1234567891011void
org.apache.hadoop.ipc.Server.start()
// Starts the service. Must be called before any calls will be handled.
public
synchronized
void
start() {
responder.start();
listener.start();
handlers =
new
Handler[handlerCount];
for
(
int
i =
0
; i < handlerCount; i++) {
handlers[i] =
new
Handler(i);
handlers[i].start();
//处理call
}
}
-
Server处理请求, server 同样使用非阻塞 nio 以提高吞吐量
1234567org.apache.hadoop.ipc.Server.Listener.Listener(Server)
throws
IOException
public
Listener()
throws
IOException {
address =
new
InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(
false
);
… … }
-
真正建立连接
1void
org.apache.hadoop.ipc.Server.Listener.doAccept(SelectionKey key)
throws
IOException,OutOfMemoryError
Reader 读数据接收请求
1234567void
org.apache.hadoop.ipc.Server.Listener.doRead(SelectionKey key)
throws
InterruptedException
try
{
count = c.readAndProcess();
}
catch
(InterruptedException ieo) {
LOG.info(getName() +
": readAndProcess caught InterruptedException"
, ieo);
throw
ieo;
}
12345678910111213int
org.apache.hadoop.ipc.Server.Connection.readAndProcess()
throws
IOException,InterruptedException
if
(!rpcHeaderRead) {
//Every connection is expected to send the header.
if
(rpcHeaderBuffer ==
null
) {
rpcHeaderBuffer = ByteBuffer.allocate(
2
);
}
count = channelRead(channel, rpcHeaderBuffer);
if
(count <
0
|| rpcHeaderBuffer.remaining() >
0
) {
return
count;
}
int
version = rpcHeaderBuffer.get(
0
);
… …
processOneRpc(data.array());
// 数据处理
-
下面贴出Server.Connection类中的processOneRpc()方法和processData()方法的源码。
123456789101112131415void
org.apache.hadoop.ipc.Server.Connection.processOneRpc(
byte
[] buf)
throws
IOException,InterruptedException
private
void
processOneRpc(
byte
[] buf)
throws
IOException,
InterruptedException {
if
(headerRead) {
processData(buf);
}
else
{
processHeader(buf);
headerRead =
true
;
if
(!authorizeConnection()) {
throw
new
AccessControlException(
"Connection from "
+
this
+
" for protocol "
+ header.getProtocol()
+
" is unauthorized for user "
+ user);
}
}
}
-
处理call
1234567891011121314void
org.apache.hadoop.ipc.Server.Handler.run()
while
(running) {
try
{
final
Call call = callQueue.take();
// pop the queue; maybe blocked here
… …
CurCall.set(call);
try
{
// Make the call as the user via Subject.doAs, thus associating
// the call with the Subject
if
(call.connection.user ==
null
) {
value = call(call.connection.protocol, call.param,
call.timestamp);
}
else
{
… …}
-
返回请求
下面贴出Server.Responder类中的doRespond()方法源码:
1
2
3
4
5
6
7
8
9
10
11
12
|
void
org.apache.hadoop.ipc.Server.Responder.doRespond(Call call) throws
IOException // //
Enqueue a response from the application. // void
doRespond(Call call) throws
IOException { synchronized
(call.connection.responseQueue) { call.connection.responseQueue.addLast(call); if
(call.connection.responseQueue.size() == 1 )
{ processResponse(call.connection.responseQueue,
true ); } } } |
补充: notify()让因wait()进入阻塞队列里的线程(blocked状态)变为runnable,然后发出notify()动作的线程继续执行完,待其完成后,进行调度时,调用wait()的线程可能会被再次调度而进入running状态。