zoukankan      html  css  js  c++  java
  • Hadoop源码分析8: IPC流程(3)客户端的clients、connections、calls复用

    1. RPCClientCache 中的 clients

    publicclass RPCClientCache {


       private Map<SocketFactory,Client> clients = new HashMap<SocketFactory,Client>();
       
        synchronizedClient getClient(Configuration conf,
          SocketFactory factory) {
         // Construct & cacheclient.  The configuration is only used fortimeout,
         // and Clients haveconnection pools.  So we can either (a) losesome
         // connection pooling andleak sockets, or (b) use the same timeout for all
         // configurations. Since the IPC is usually intended globally,not
         // per-job, we choose(a).
         Client client= clients.get(factory);
         if (client == null) {
           client =new Client(ObjectWritable.class, conf, factory);
          clients.put(factory,client);
         } else {
          client.incCount();
         }
         return client;
       }

          
       void stopClient(Clientclient) {
         synchronized (this) {
          client.decCount();
           if(client.isZeroReference()) {
            clients.remove(client.getSocketFactory());
           }
         }
         if (client.isZeroReference()){
          client.stop();
         }
       }
    }

    以上方法在下列代码中调用:

    publicclass RPCInvoker implements InvocationHandler{

        privateClientConnectionId remoteId;
        private Clientclient;
        private boolean isClosed= false;

       public RPCInvoker(Class<? extendsVersionedProtocol> protocol,
           InetSocketAddress address, UserGroupInformationticket,
           Configuration conf, SocketFactory factory,
           int rpcTimeout) throws IOException {
         this.remoteId = ClientConnectionId.getConnectionId(address,protocol,
             ticket, rpcTimeout,conf);
         this.client = RPC.CLIENTS.getClient(conf, factory);
        }  

      
        synchronizedvoid close() {
          if(!isClosed) {
           isClosed = true;
           RPC.CLIENTS.stopClient(client);
         }
       } 
    }

    publicclass RPC {

      public staticRPCClientCache CLIENTS=newRPCClientCache();

       //for unit testing only
      staticClient getClient(Configuration conf){
      return CLIENTS.getClient(conf);
     } 

     
      public staticObject[] call(Method method, Object[][]params,
                             InetSocketAddress[]addrs, 
                             UserGroupInformation ticket,Configuration conf)
       throws IOException, InterruptedException {

       RPCInvocation[] invocations = newRPCInvocation[params.length];
       for (int i = 0; i < params.length; i++)
         invocations[i] = newRPCInvocation(method, params[i]);
       Client client= CLIENTS.getClient(conf);
       try {
       Writable[] wrappedValues= 
         client.call(invocations,addrs, method.getDeclaringClass(), ticket, conf);
       
       if (method.getReturnType() == Void.TYPE) {
         return null;
       }

       Object[] values =
        (Object[])Array.newInstance(method.getReturnType(),wrappedValues.length);
       for (int i = 0; i < values.length; i++)
         if (wrappedValues[i] !=null)
           values[i]= ((ObjectWritable)wrappedValues[i]).get();
       
       return values;
       } finally {
        CLIENTS.stopClient(client);
       }
      }

     
      public staticVersionedProtocol getProxy(
         Class<? extendsVersionedProtocol> protocol,
         long clientVersion,InetSocketAddress addr, UserGroupInformation ticket,
         Configuration conf,SocketFactory factory, int rpcTimeout) throws IOException {

       if (UserGroupInformation.isSecurityEnabled()){
        SaslRpcServer.init(conf);
       }
       VersionedProtocol proxy =
          (VersionedProtocol) Proxy.newProxyInstance(
              protocol.getClassLoader(),new Class[] { protocol },
              newRPCInvoker(protocol, addr, ticket, conf, factory,rpcTimeout));
       long serverVersion =proxy.getProtocolVersion(protocol.getName(), 
                                             clientVersion);
       if (serverVersion == clientVersion) {
         return proxy;
       } else {
         throw newRPCVersionMismatch(protocol.getName(),clientVersion, 
                              serverVersion);
       }
      }

    }
    2. Client 的connections

    publicclass Client {
      
      public HashtableClientConnectionIdClientConnection> connections =new Hashtable<ClientConnectionId, ClientConnection>();

      public void stop() { 
     ..............
       // wake up all connections
       synchronized (connections) {
         for (ClientConnection conn: connections.values()) {
          conn.interrupt();
         }
       }
       
       // wait until all connections are closed
       while (!connections.isEmpty()) {
         try {
          Thread.sleep(100);
         } catch (InterruptedExceptione) {
         }
       }
      ....................
      }

        //for unit testing only
     Set<ClientConnectionId> getConnectionIds(){
       synchronized (connections) {
        return connections.keySet();
       }
      }

        
      privateClientConnection getConnection(ClientConnectionIdremoteId,
                                 ClientCall call)
                                 throws IOException,InterruptedException {
       if (!running.get()) {
         // the client isstopped
         throw new IOException("Theclient is stopped");
       }
       ClientConnection connection;
      
       do {
         synchronized(connections) {
           connection= connections.get(remoteId);
           if(connection == null) {
            connection = newClientConnection(remoteId,this);
            connections.put(remoteId,connection);
           }
         }
       } while (!connection.addCall(call));
       
       //we don't invoke the method below inside"synchronized (connections)"
       //block above. The reason for that is if theserver happens to be slow,
       //it will take longer to establish a connectionand that will slow the
       //entire system down.
       connection.setupIOstreams();
       return connection;
      }
    }

    publicclass ClientConnection extends Thread {
      
      
       private synchronizedvoid close() {
         if(!shouldCloseConnection.get()) {
           return;
         }

         // release theresources
         // first thing to do;take theconnection out of the connection list
         synchronized(client.connections) {
           if(client.connections.get(remoteId) == this){
          client.connections.remove(remoteId);
           }
         }
      。。。。。。
      } 
    }


    3. ClientConnection 的calls

    publicclass ClientConnection extends Thread {

       private Hashtable<Integer,ClientCall> calls = new Hashtable<Integer,ClientCall>();
       
       public synchronizedboolean addCall(ClientCall call) {
         if(shouldCloseConnection.get())
           returnfalse;
        calls.put(call.id,call);
         notify();
         return true;
       }
      
          
       private synchronizedboolean waitForWork() {
         if (calls.isEmpty()&& !shouldCloseConnection.get() && client.running.get()) {
           longtimeout = maxIdleTime-
               (System.currentTimeMillis()-lastActivity.get());
           if(timeout>0) {
            try {
              wait(timeout);
            } catch (InterruptedException e) {}
           }
         }
         
         if (!calls.isEmpty()&& !shouldCloseConnection.get() &&client.running.get()) {
           returntrue;
         } else if(shouldCloseConnection.get()) {
           returnfalse;
         } else if(calls.isEmpty()) { // idle connection closed orstopped
          markClosed(null);
           returnfalse;
         } else { // get stopped butthere are still pending requests 
          markClosed((IOException)new IOException().initCause(
              newInterruptedException()));
           returnfalse;
         }
       }

        privatevoid receiveResponse() {
         if(shouldCloseConnection.get()) {
          return;
         }
         touch();
         
         try {
           int id =in.readInt();                 // try to read an id      

           ClientCallcall = calls.get(id);

           int state= in.readInt();     // read callstatus
           if (state== Status.SUCCESS.state) {
            Writable value =ReflectionUtils.newInstance(client.valueClass, client.conf);
            value.readFields(in);              // read value
            call.setValue(value);
            calls.remove(id);
           } else if(state == Status.ERROR.state) {
            call.setException(newRemoteException(WritableUtils.readString(in),
                                           WritableUtils.readString(in)));
            calls.remove(id);
           } else if(state == Status.FATAL.state) {
            // Close the connection
            markClosed(newRemoteException(WritableUtils.readString(in), 
                                     WritableUtils.readString(in)));
           }
         } catch (IOException e){
          markClosed(e);
         }
       } 
      
        
       private synchronizedvoid close() {
         if(!shouldCloseConnection.get()) {
           return;
         }

         // release theresources
         // first thing to do;take theconnection out of the connection list
         synchronized(client.connections) {
           if(client.connections.get(remoteId) == this) {
          client.connections.remove(remoteId);
           }
         }

         // close the streams andtherefore the socket
        IOUtils.closeStream(out);
        IOUtils.closeStream(in);
         disposeSasl();

         // clean up all calls
         if (closeException == null){
           if(!calls.isEmpty()) {
           

            // clean up calls anyway
            closeException = new IOException("Unexpectedclosed connection");
            cleanupCalls();
           }
         } else {
           // log theinfo
          

           // cleanupcalls
          cleanupCalls();
         } 
       }
       

        
       privatevoid cleanupCalls() {
         Iterator<Entry<Integer,ClientCall>> itor= calls.entrySet().iterator() ;
         while (itor.hasNext()){
           ClientCallc = itor.next().getValue(); 
          c.setException(closeException); // local exception
          itor.remove();        
         }
       }
      
     
    }


    4.并发执行以下代码

    public class MyClient {
        public static voidmain(String[] args) throws Exception {

          final InetSocketAddress addr =new InetSocketAddress("localhost",
    MyServer.IPC_PORT);
          final Query query = (Query)RPC.getProxy(Query.class, MyServer.IPC_VER,
    addr, new Configuration());

          new Thread() {

              @Override
              public voidrun() {
                 FileStatusfileStatus1 = query.getFileStatus("/tmp/testIPC");
                 System.out.println(fileStatus1);

                  FileStatusfileStatus2 = query.getFileStatus("/tmp/testIPC2");
                  System.out.println(fileStatus2);

               }

           }.start();

           new Thread() {

             @Override
              public voidrun() {
                  CPUStatus cpuStatus1 =query.getCPUStatus("Intel");
                 System.out.println(cpuStatus1);

                  CPUStatus cpuStatus2 =query.getCPUStatus("AMD");
                 System.out.println(cpuStatus2);
              }

           }.start();

           new Thread() {

            @Override
             public voidrun() {

             try {
                   Queryquery2 = (Query) RPC.getProxy(Query.class,
                  MyServer.IPC_VER, addr, newConfiguration());

                   FileStatusfileStatus1 = query2.getFileStatus("/tmp/testIPC");
                  System.out.println(fileStatus1);

                   FileStatusfileStatus2 = query2.getFileStatus("/tmp/testIPC2");
                  System.out.println(fileStatus2);
                  RPC.stopProxy(query2);
               } catch(IOException e) {
                   e.printStackTrace();
               }

           }

         }.start();

          RPC.stopProxy(query);
      } 
    }


    以上三个线程可以共用同一个 Client对象、ClientConnection线程 、ClientConnectionId对象,将ClientCall放在同一个calls中
  • 相关阅读:
    人生无常 淡然处之
    对PHP开发的认知
    专家路线
    很少接触的文学
    懒加载
    Markdown入门 学习
    (转载)iOS开发历程书籍推荐
    ObjectiveC1基础代码——类和对象
    day11基础代码——函数指针
    day6
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276530.html
Copyright © 2011-2022 走看看