zoukankan      html  css  js  c++  java
  • Hadoop源码分析9:IPC流程(4) Client 的 wait() 和 notify()

    1. ClientCall的 wait() 和 notify()

    public class Client {

       public Writablecall(Writable param, ClientConnectionId remoteId) 
                        throwsInterruptedException, IOException {
        ClientCall call =new ClientCall(param,this);
        ClientConnectionconnection = getConnection(remoteId, call);
       connection.sendParam(call);               // sendthe parameter
        boolean interrupted =false;
        synchronized(call) {
          while(!call.done) {
           try {
             call.wait();                      // wait for the result
           } catch (InterruptedException ie) {
             // save the fact that we wereinterrupted
             interrupted = true;
           }
         }
         if(interrupted) {
           // set the interrupt flag now that we are donewaiting
           Thread.currentThread().interrupt();
         }
          if(call.error != null) {
           if (call.error instanceof RemoteException){
            call.error.fillInStackTrace();
             throw call.error;
           } else { // local exception
             // use the connection becauseit will reflect an ip change, unlike
             // the remoteId
             throwwrapException(connection.getRemoteAddress(), call.error);
           }
          }else {
           return call.value;
         }
        }
       }
     }



    public class ClientCall {

         
        protectedsynchronized void callComplete() {
         this.done = true;
         notify();                            // notify caller
        }
    }

    2. ClientConnection 的 wait()和 notify()


    public class ClientConnection extends Thread {

        privatesynchronized boolean waitForWork() {
          if(calls.isEmpty() && !shouldCloseConnection.get() && client.running.get()) {
           long timeout = maxIdleTime-
               (System.currentTimeMillis()-lastActivity.get());
           if (timeout>0) {
             try {
              wait(timeout);
             } catch (InterruptedExceptione) {}
           }
         }
         
          if(!calls.isEmpty() && !shouldCloseConnection.get()&& client.running.get()) {
           return true;
          }else if (shouldCloseConnection.get()) {
           return false;
          }else if (calls.isEmpty()) { // idle connection closed orstopped
           markClosed(null);
           return false;
          }else { // get stopped but there are still pendingrequests 
           markClosed((IOException)newIOException().initCause(
               newInterruptedException()));
           return false;
         }
        }

        publicsynchronized boolean addCall(ClientCall call) {
          if(shouldCloseConnection.get())
           return false;
         calls.put(call.id, call);
         notify();
         return true;
        }

        
        privatesynchronized void markClosed(IOException e) {
          if(shouldCloseConnection.compareAndSet(false, true)) {
           closeException = e;
           notifyAll();
         }
       } 
    }

    3.ClientParallelResults的wait() 和 notify()

    public class Client {

       public Writable[]call(Writable[] params, InetSocketAddress[] addresses,
         Class<?> protocol, UserGroupInformation ticket, Configurationconf)
         throws IOException, InterruptedException {
        if (addresses.length ==0) return new Writable[0];

        ClientParallelResultsresults = new ClientParallelResults(params.length);
        synchronized (results){
          for(int i = 0; i < params.length; i++) {
           ClientParallelCall call = newClientParallelCall(params[i], results, i,this);
           try {
             ClientConnectionId remoteId =ClientConnectionId.getConnectionId(addresses[i],
                protocol, ticket, 0, conf);
             ClientConnection connection =getConnection(remoteId, call);
             connection.sendParam(call);           // sendeach parameter
           } catch (IOException e) {
             results.size--;                     // wait for one fewer result
           }
         }
          while(results.count != results.size) {
           try {
             results.wait();                // wait for all results
           } catch (InterruptedException e) {}
         }
         return results.values;
        }
      }
    }

    public class ClientParallelResults {

       
        publicsynchronized void callComplete(ClientParallelCallcall) {
         values[call.index] = call.value;          // store the value
         count++;                               // countit
          if(count == size)                      // if all values are in
           notify();                            // thennotify waiting caller
        }
    }

  • 相关阅读:
    《机电传动控制》学习笔记08-1
    《机电传动控制》学习笔记-07
    《机电传动控制》学习笔记-06
    《机电传动控制》学习笔记05-2
    《机电传动控制》学习笔记05-1
    《团队项目》日志一
    《实时控制软件》第四周作业
    《实时控制软件》第三周作业
    《实时控制软件》第二周作业
    《机电传动控制》PLC仿真
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276528.html
Copyright © 2011-2022 走看看