1. ClientCall的 wait() 和 notify()
public Writablecall(Writable param, ClientConnectionId remoteId)
throwsInterruptedException, IOException {
ClientCall call =new ClientCall(param,this);
ClientConnectionconnection = getConnection(remoteId, call);
connection.sendParam(call); // sendthe parameter
boolean interrupted =false;
synchronized(call) {
while(!call.done) {
try {
call.wait(); // wait for the result
} catch (InterruptedException ie) {
// save the fact that we wereinterrupted
interrupted = true;
}
}
if(interrupted) {
// set the interrupt flag now that we are donewaiting
Thread.currentThread().interrupt();
}
if(call.error != null) {
if (call.error instanceof RemoteException){
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
// use the connection becauseit will reflect an ip change, unlike
// the remoteId
throwwrapException(connection.getRemoteAddress(), call.error);
}
}else {
return call.value;
}
}
}
}
public class ClientCall {
protectedsynchronized void callComplete() {
this.done = true;
notify(); // notify caller
}
}
2. ClientConnection 的 wait()和 notify()
public class ClientConnection extends Thread {
privatesynchronized boolean waitForWork() {
if(calls.isEmpty() && !shouldCloseConnection.get() && client.running.get()) {
long timeout = maxIdleTime-
(System.currentTimeMillis()-lastActivity.get());
if (timeout>0) {
try {
wait(timeout);
} catch (InterruptedExceptione) {}
}
}
if(!calls.isEmpty() && !shouldCloseConnection.get()&& client.running.get()) {
return true;
}else if (shouldCloseConnection.get()) {
return false;
}else if (calls.isEmpty()) { // idle connection closed orstopped
markClosed(null);
return false;
}else { // get stopped but there are still pendingrequests
markClosed((IOException)newIOException().initCause(
newInterruptedException()));
return false;
}
}
publicsynchronized boolean addCall(ClientCall call) {
if(shouldCloseConnection.get())
return false;
calls.put(call.id, call);
notify();
return true;
}
privatesynchronized void markClosed(IOException e) {
if(shouldCloseConnection.compareAndSet(false, true)) {
closeException = e;
notifyAll();
}
}
}
3.ClientParallelResults的wait() 和 notify()
public class Client {
public Writable[]call(Writable[] params, InetSocketAddress[] addresses,
Class<?> protocol, UserGroupInformation ticket, Configurationconf)
throws IOException, InterruptedException {
if (addresses.length ==0) return new Writable[0];
ClientParallelResultsresults = new ClientParallelResults(params.length);
synchronized (results){
for(int i = 0; i < params.length; i++) {
ClientParallelCall call = newClientParallelCall(params[i], results, i,this);
try {
ClientConnectionId remoteId =ClientConnectionId.getConnectionId(addresses[i],
protocol, ticket, 0, conf);
ClientConnection connection =getConnection(remoteId, call);
connection.sendParam(call); // sendeach parameter
} catch (IOException e) {
results.size--; // wait for one fewer result
}
}
while(results.count != results.size) {
try {
results.wait(); // wait for all results
} catch (InterruptedException e) {}
}
return results.values;
}
}
}
public class ClientParallelResults {
publicsynchronized void callComplete(ClientParallelCallcall) {
values[call.index] = call.value; // store the value
count++; // countit
if(count == size) // if all values are in
notify(); // thennotify waiting caller
}
}