前2天刚刚小小的分析下Client端的流程,走的还是比较通顺的,但是RPC的服务端就显然没有那么简单了,毕竟C-S这种模式的,压力和重点都是放在Server端的,所以我也只能做个大概的分析,因为里面细节的东西太多,我也不可能理清所有细节,但是我会集合源代码把主要的流程理理清。如果读者想进一步学习的话,可自行查阅源码。
Server服务端和Client客户端在某些变量的定义上还是一致的,比如服务端也有Call,和Connection,这个很好理解,Call回调,和Connection连接是双向的。首先看一个Server类的定义:
public abstract class Server {
private final boolean authorize;
private boolean isSecurityEnabled;
/**
* The first four bytes of Hadoop RPC connections
* Hadoop RPC的连接魔数字符‘hrpc’
*/
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
// 1 : Introduce ping and server does not throw away RPCs
// 3 : Introduce the protocol into the RPC connection header
// 4 : Introduced SASL security layer
public static final byte CURRENT_VERSION = 4;
....
这里定义了基本的一些信息,版本号了,还有用于验证的魔数了等等。下面看看他的2个关键内部类,Connection连接和Call回调类
/** A call queued for handling. */
/** 服务端的Call列表队列 ,与客户端的是不同的*/
private static class Call {
//客户端的Call Id,是从客户端上传过类的
private int id; // the client's call id
//Call回调参数
private Writable param; // the parameter passed
//还保存了与客户端的连接
private Connection connection; // connection to client
//接收到response回应的时间
private long timestamp; // the time received when response is null
// the time served when response is not null
//对于此回调的回应值
private ByteBuffer response; // the response for this call
......
在内部变量的设置上还是有小小的不同的,到时服务端就是通过往Call中写response处理回复的。还有一个是连接类:
/** Reads calls from a connection and queues them for handling. */
public class Connection {
//连接的RPC头部是否已读
private boolean rpcHeaderRead = false; // if initial rpc header is read
//版本号之后的头部信息是否已读
private boolean headerRead = false; //if the connection header that
//follows version is read.
private SocketChannel channel;
//字节缓冲用于读写回复
private ByteBuffer data;
private ByteBuffer dataLengthBuffer;
//回复Call列表
private LinkedList<Call> responseQueue;
//此连接下的RPC请求数
private volatile int rpcCount = 0; // number of outstanding rpcs
private long lastContact;
private int dataLength;
private Socket socket;
// Cache the remote host & port info so that even if the socket is
// disconnected, we can say where it used to connect to.
private String hostAddress;
private int remotePort;
private InetAddress addr;
.....
上面的变量也很好理解,不解释了,在Server端多出了下面几个关键的变量:
.....
volatile private boolean running = true; // true while server runs
//阻塞式Call待处理的队列
private BlockingQueue<Call> callQueue; // queued calls
//与客户端的连接数链表
private List<Connection> connectionList =
Collections.synchronizedList(new LinkedList<Connection>());
//maintain a list
//of client connections
//服务端的监听线程
private Listener listener = null;
//处理应答线程
private Responder responder = null;
private int numConnections = 0;
//处理请求线程组
private Handler[] handlers = null;
.....
callQueue,待处理请求列表,ConnectionList连接列表,还有3大线程,监听,处理,应答请求线程,待处理请求人家用的还是BlockingQueue阻塞式队列,队列如果满了是插入不了需要等待的,队列为空是取不出数据也是要等待。在这点上作者是有自己的考虑的。通过上面的描述,Server类的大体框图就出来了:
好了,下面的分析重点就是3大线程的具体操作了。3大线程的在Server start操作后就会开始工作:
/** Starts the service. Must be called before any calls will be handled. */
/** 服务端的启动方法 */
public synchronized void start() {
//开启3大进程监听线程,回复线程,处理请求线程组
responder.start();
listener.start();
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}
初始化操作在构造函数中已经执行过了的,所以这里的操作很干脆,直接开启线程。按照正常的顺序,第一步显然是listener线程干的事了,就是监听请求。
public Listener() throws IOException {
.....
bind(acceptChannel.socket(), address, backlogLength);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads);
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
readers[i] = reader;
//reader Runnable放入线程池中执行
readPool.execute(reader);
}
// Register accepts on the server socket with the selector.
//Java NIO的知识,在selector上注册key的监听事件
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
Listener在构造函数中做了上面一些事,初始化一些线程池了,注册读事件了。下面是他的主要在跑的程序:
@Override
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
while (running) {
SelectionKey key = null;
try {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
//Listener的作用就是监听客户端的额连接事件
doAccept(key);
}
} catch (IOException e) {
}
key = null;
.....
在读之前就是监听连接的请求,方法就来到了doAccept(),
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader();
try {
//连接成功之后,在NIO上注册Read读事件
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);
c = new Connection(readKey, channel, System.currentTimeMillis());
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
....
accept操作之后就是把Reader操作注册到通道上: public synchronized SelectionKey registerChannel(SocketChannel channel)
throws IOException {
return channel.register(readSelector, SelectionKey.OP_READ);
}
后面的事情就又来到了Reader的主操作了:
public void run() {
LOG.info("Starting SocketReader");
synchronized (this) {
while (running) {
SelectionKey key = null;
try {
readSelector.select();
while (adding) {
this.wait(1000);
}
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
//Reader的作用就是监听Read读事件
doRead(key);
}
}
key = null;
}
.....
跟连接的监听非常类似,操作就发生在了doRead()方法上了:
void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment();
if (c == null) {
return;
}
c.setLastContact(System.currentTimeMillis());
try {
//监听到RPC请求的读事件后,首先调用下面的方法
count = c.readAndProcess();
....
public int readAndProcess() throws IOException, InterruptedException {
while (true) {
/* Read at most one RPC. If the header is not read completely yet
* then iterate until we read first RPC or until there is no data left.
*/
int count = -1;
//首先读取数据的header头部信息
if (dataLengthBuffer.remaining() > 0) {
count = channelRead(channel, dataLengthBuffer);
if (count < 0 || dataLengthBuffer.remaining() > 0)
return count;
}
if (!rpcHeaderRead) {
//Every connection is expected to send the header.
if (rpcHeaderBuffer == null) {
rpcHeaderBuffer = ByteBuffer.allocate(2);
}
count = channelRead(channel, rpcHeaderBuffer);
if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
return count;
}
//从头部获取版本信息和验证的method类型
int version = rpcHeaderBuffer.get(0);
byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
authMethod = AuthMethod.read(new DataInputStream(
new ByteArrayInputStream(method)));
dataLengthBuffer.flip();
//在这里做if的验证,不符合要求的直接返回
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
//Warning is ok since this is not supposed to happen.
LOG.warn("Incorrect header or version mismatch from " +
hostAddress + ":" + remotePort +
" got version " + version +
" expected version " + CURRENT_VERSION);
return -1;
}
....
//继承从channel通道读入数据到data中
count = channelRead(channel, data);
if (data.remaining() == 0) {
dataLengthBuffer.clear();
data.flip();
if (skipInitialSaslHandshake) {
data = null;
skipInitialSaslHandshake = false;
continue;
}
boolean isHeaderRead = headerRead;
//根据是否用了sasl的方式与否进行不同的处理
//SASL是一种用来扩充C/S模式验证能力的机制,我们卡简单的不用这种机制的
if (useSasl) {
saslReadAndProcess(data.array());
} else {
processOneRpc(data.array());
}
.....
然后来到了下面的这个方法:
private void processOneRpc(byte[] buf) throws IOException,
InterruptedException {
if (headerRead) {
//头部信息验证完毕,正式处理处理请求数据
processData(buf);
} else {
//继续验证头部的剩余信息,协议和用户组信息
processHeader(buf);
headerRead = true;
if (!authorizeConnection()) {
throw new AccessControlException("Connection from " + this
+ " for protocol " + header.getProtocol()
+ " is unauthorized for user " + user);
}
}
}
processData就是最终的处理方法了,这一路上的方法真是多啊。 private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // try to read an id
if (LOG.isDebugEnabled())
LOG.debug(" got #" + id);
//从配置根据反射获取参数类型
Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param
//数据读入此类似
param.readFields(dis);
//依据ID,和参数构建Server服务的Call回调对象
Call call = new Call(id, param, this);
//放入阻塞式Call队列
callQueue.put(call); // queue the call; maybe blocked here
//增加RPC请求数的数量
incRpcCount(); // Increment the rpc count
}
到了这里方法结束了,所以他的核心操作就是把读请求中的参数变为Call放入到阻塞式队列中,这个就是listener干的事。然后与此相关的一个线程就有事情做了Handler处理线程:
/** 处理请求Call队列 */
private class Handler extends Thread {
public Handler(int instanceNumber) {
this.setDaemon(true);
this.setName("IPC Server handler "+ instanceNumber + " on " + port);
}
@Override
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
ByteArrayOutputStream buf =
new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
//while一直循环处理
while (running) {
try {
//从队列中取出call请求
final Call call = callQueue.take(); // pop the queue; maybe blocked here
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": has #" + call.id + " from " +
call.connection);
String errorClass = null;
String error = null;
Writable value = null;
//设置成当前处理的call请求
CurCall.set(call);
....
CurCall.set(null);
synchronized (call.connection.responseQueue) {
// setupResponse() needs to be sync'ed together with
// responder.doResponse() since setupResponse may use
// SASL to encrypt response data and SASL enforces
// its own message ordering.
//设置回复初始条件
setupResponse(buf, call,
(error == null) ? Status.SUCCESS : Status.ERROR,
value, errorClass, error);
// Discard the large buf and reset it back to
// smaller size to freeup heap
if (buf.size() > maxRespSize) {
LOG.warn("Large response size " + buf.size() + " for call " +
call.toString());
buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
}
//交给responder线程执行写回复操作
responder.doRespond(call);
....
Handler的处理还算直接,就是从刚刚的待回复队列中取出Call交给下个response线程写回复的,相当于一个中转操作。阻塞式队列的一个好处是如果callQueue里面没有数据,他会阻塞在callQueue.take()这行代码上的,后面的就无法执行了。然后就把后面的操作扔给了response线程了。
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue, true);
}
}
}
继续看processResponse方法:
private boolean processResponse(LinkedList<Call> responseQueue,
boolean inHandler) throws IOException {
boolean error = true;
boolean done = false; // there is more data for this channel.
int numElements = 0;
Call call = null;
try {
synchronized (responseQueue) {
//
// If there are no items for this channel, then we are done
//
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true; // no more data for this channel.
}
//
// Extract the first call
//从Call列表中取出一个做回复
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " +
call.connection);
}
//
// Send as much data as we can in the non-blocking fashion
//向call.response写入回复
int numBytes = channelWrite(channel, call.response);
if (numBytes < 0) {
return true;
}
if (!call.response.hasRemaining()) {
call.connection.decRpcCount();
if (numElements == 1) { // last call fully processes.
done = true; // no more data for this channel.
} else {
done = false; // more calls pending to be sent.
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " +
call.connection + " Wrote " + numBytes + " bytes.");
}
} else {
//
// If we were unable to write the entire response out, then
// insert in Selector queue.
//重新把这个call加回call列表
call.connection.responseQueue.addFirst(call);
if (inHandler) {
//inHandler说明此回复将会过会被发送回去,需要改写时间
// set the serve time when the response has to be sent later
//改写Call中收到回复的时间
call.timestamp = System.currentTimeMillis();
incPending();
try {
// Wakeup the thread blocked on select, only then can the call
// to channel.register() complete.
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
.....
里面的写回复的操作函数:
private int channelWrite(WritableByteChannel channel,
ByteBuffer buffer) throws IOException {
//channel向call.response 的buffer中写入数据
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.write(buffer) : channelIO(null, channel, buffer);
if (count > 0) {
rpcMetrics.incrSentBytes(count);
}
return count;
}
这里的buffer就是参数call.response,写完的回复是放入Connection类中的回复列表中的,因为一个连接可能要处理很多回复的//回复Call列表
private LinkedList<Call> responseQueue;
上面的事就是Response干的事情了,3大线程围绕着一个关键的callQueue工作的,所以画了一个协议图:
还有一张函数操作的时序图,各个函数的调用流程:
在Hadoop RPC还有一个RPC的辅助类,用来你获取服务端和客户端实例的:
/** Construct a server for a protocol implementation instance listening on a
* port and address, with a secret manager. */
/** 获取服务端的实例 */
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers,
final boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}
客户端搞了一个缓存机制:
/**
* Construct & cache an IPC client with the user-provided SocketFactory
* if no cached client exists.
* 获取端缓存中取出客户端,如果没有则创建一个
* @param conf Configuration
* @return an IPC client
*/
private synchronized Client getClient(Configuration conf,
SocketFactory factory) {
// Construct & cache client. The configuration is only used for timeout,
// and Clients have connection pools. So we can either (a) lose some
// connection pooling and leak sockets, or (b) use the same timeout for all
// configurations. Since the IPC is usually intended globally, not
// per-job, we choose (a).
Client client = clients.get(factory);
if (client == null) {
client = new Client(ObjectWritable.class, conf, factory);
clients.put(factory, client);
} else {
client.incCount();
}
return client;
}
以上就是Hadoop RPC服务端的主要流程分析,确实的忽略了很多细节。整个Hadoop RPC结构是非常复杂的,在Java NIO的基础之上,用了很多动态代理,反射的思想。