zoukankan      html  css  js  c++  java
  • Hadoop源码分析8: IPC流程(3)客户端的clients、connections、calls复用

    1. RPCClientCache 中的 clients

    publicclass RPCClientCache {


       private Map<SocketFactory,Client> clients = new HashMap<SocketFactory,Client>();
       
        synchronizedClient getClient(Configuration conf,
          SocketFactory factory) {
         // Construct & cacheclient.  The configuration is only used fortimeout,
         // and Clients haveconnection pools.  So we can either (a) losesome
         // connection pooling andleak sockets, or (b) use the same timeout for all
         // configurations. Since the IPC is usually intended globally,not
         // per-job, we choose(a).
         Client client= clients.get(factory);
         if (client == null) {
           client =new Client(ObjectWritable.class, conf, factory);
          clients.put(factory,client);
         } else {
          client.incCount();
         }
         return client;
       }

          
       void stopClient(Clientclient) {
         synchronized (this) {
          client.decCount();
           if(client.isZeroReference()) {
            clients.remove(client.getSocketFactory());
           }
         }
         if (client.isZeroReference()){
          client.stop();
         }
       }
    }

    以上方法在下列代码中调用:

    publicclass RPCInvoker implements InvocationHandler{

        privateClientConnectionId remoteId;
        private Clientclient;
        private boolean isClosed= false;

       public RPCInvoker(Class<? extendsVersionedProtocol> protocol,
           InetSocketAddress address, UserGroupInformationticket,
           Configuration conf, SocketFactory factory,
           int rpcTimeout) throws IOException {
         this.remoteId = ClientConnectionId.getConnectionId(address,protocol,
             ticket, rpcTimeout,conf);
         this.client = RPC.CLIENTS.getClient(conf, factory);
        }  

      
        synchronizedvoid close() {
          if(!isClosed) {
           isClosed = true;
           RPC.CLIENTS.stopClient(client);
         }
       } 
    }

    publicclass RPC {

      public staticRPCClientCache CLIENTS=newRPCClientCache();

       //for unit testing only
      staticClient getClient(Configuration conf){
      return CLIENTS.getClient(conf);
     } 

     
      public staticObject[] call(Method method, Object[][]params,
                             InetSocketAddress[]addrs, 
                             UserGroupInformation ticket,Configuration conf)
       throws IOException, InterruptedException {

       RPCInvocation[] invocations = newRPCInvocation[params.length];
       for (int i = 0; i < params.length; i++)
         invocations[i] = newRPCInvocation(method, params[i]);
       Client client= CLIENTS.getClient(conf);
       try {
       Writable[] wrappedValues= 
         client.call(invocations,addrs, method.getDeclaringClass(), ticket, conf);
       
       if (method.getReturnType() == Void.TYPE) {
         return null;
       }

       Object[] values =
        (Object[])Array.newInstance(method.getReturnType(),wrappedValues.length);
       for (int i = 0; i < values.length; i++)
         if (wrappedValues[i] !=null)
           values[i]= ((ObjectWritable)wrappedValues[i]).get();
       
       return values;
       } finally {
        CLIENTS.stopClient(client);
       }
      }

     
      public staticVersionedProtocol getProxy(
         Class<? extendsVersionedProtocol> protocol,
         long clientVersion,InetSocketAddress addr, UserGroupInformation ticket,
         Configuration conf,SocketFactory factory, int rpcTimeout) throws IOException {

       if (UserGroupInformation.isSecurityEnabled()){
        SaslRpcServer.init(conf);
       }
       VersionedProtocol proxy =
          (VersionedProtocol) Proxy.newProxyInstance(
              protocol.getClassLoader(),new Class[] { protocol },
              newRPCInvoker(protocol, addr, ticket, conf, factory,rpcTimeout));
       long serverVersion =proxy.getProtocolVersion(protocol.getName(), 
                                             clientVersion);
       if (serverVersion == clientVersion) {
         return proxy;
       } else {
         throw newRPCVersionMismatch(protocol.getName(),clientVersion, 
                              serverVersion);
       }
      }

    }
    2. Client 的connections

    publicclass Client {
      
      public HashtableClientConnectionIdClientConnection> connections =new Hashtable<ClientConnectionId, ClientConnection>();

      public void stop() { 
     ..............
       // wake up all connections
       synchronized (connections) {
         for (ClientConnection conn: connections.values()) {
          conn.interrupt();
         }
       }
       
       // wait until all connections are closed
       while (!connections.isEmpty()) {
         try {
          Thread.sleep(100);
         } catch (InterruptedExceptione) {
         }
       }
      ....................
      }

        //for unit testing only
     Set<ClientConnectionId> getConnectionIds(){
       synchronized (connections) {
        return connections.keySet();
       }
      }

        
      privateClientConnection getConnection(ClientConnectionIdremoteId,
                                 ClientCall call)
                                 throws IOException,InterruptedException {
       if (!running.get()) {
         // the client isstopped
         throw new IOException("Theclient is stopped");
       }
       ClientConnection connection;
      
       do {
         synchronized(connections) {
           connection= connections.get(remoteId);
           if(connection == null) {
            connection = newClientConnection(remoteId,this);
            connections.put(remoteId,connection);
           }
         }
       } while (!connection.addCall(call));
       
       //we don't invoke the method below inside"synchronized (connections)"
       //block above. The reason for that is if theserver happens to be slow,
       //it will take longer to establish a connectionand that will slow the
       //entire system down.
       connection.setupIOstreams();
       return connection;
      }
    }

    publicclass ClientConnection extends Thread {
      
      
       private synchronizedvoid close() {
         if(!shouldCloseConnection.get()) {
           return;
         }

         // release theresources
         // first thing to do;take theconnection out of the connection list
         synchronized(client.connections) {
           if(client.connections.get(remoteId) == this){
          client.connections.remove(remoteId);
           }
         }
      。。。。。。
      } 
    }


    3. ClientConnection 的calls

    publicclass ClientConnection extends Thread {

       private Hashtable<Integer,ClientCall> calls = new Hashtable<Integer,ClientCall>();
       
       public synchronizedboolean addCall(ClientCall call) {
         if(shouldCloseConnection.get())
           returnfalse;
        calls.put(call.id,call);
         notify();
         return true;
       }
      
          
       private synchronizedboolean waitForWork() {
         if (calls.isEmpty()&& !shouldCloseConnection.get() && client.running.get()) {
           longtimeout = maxIdleTime-
               (System.currentTimeMillis()-lastActivity.get());
           if(timeout>0) {
            try {
              wait(timeout);
            } catch (InterruptedException e) {}
           }
         }
         
         if (!calls.isEmpty()&& !shouldCloseConnection.get() &&client.running.get()) {
           returntrue;
         } else if(shouldCloseConnection.get()) {
           returnfalse;
         } else if(calls.isEmpty()) { // idle connection closed orstopped
          markClosed(null);
           returnfalse;
         } else { // get stopped butthere are still pending requests 
          markClosed((IOException)new IOException().initCause(
              newInterruptedException()));
           returnfalse;
         }
       }

        privatevoid receiveResponse() {
         if(shouldCloseConnection.get()) {
          return;
         }
         touch();
         
         try {
           int id =in.readInt();                 // try to read an id      

           ClientCallcall = calls.get(id);

           int state= in.readInt();     // read callstatus
           if (state== Status.SUCCESS.state) {
            Writable value =ReflectionUtils.newInstance(client.valueClass, client.conf);
            value.readFields(in);              // read value
            call.setValue(value);
            calls.remove(id);
           } else if(state == Status.ERROR.state) {
            call.setException(newRemoteException(WritableUtils.readString(in),
                                           WritableUtils.readString(in)));
            calls.remove(id);
           } else if(state == Status.FATAL.state) {
            // Close the connection
            markClosed(newRemoteException(WritableUtils.readString(in), 
                                     WritableUtils.readString(in)));
           }
         } catch (IOException e){
          markClosed(e);
         }
       } 
      
        
       private synchronizedvoid close() {
         if(!shouldCloseConnection.get()) {
           return;
         }

         // release theresources
         // first thing to do;take theconnection out of the connection list
         synchronized(client.connections) {
           if(client.connections.get(remoteId) == this) {
          client.connections.remove(remoteId);
           }
         }

         // close the streams andtherefore the socket
        IOUtils.closeStream(out);
        IOUtils.closeStream(in);
         disposeSasl();

         // clean up all calls
         if (closeException == null){
           if(!calls.isEmpty()) {
           

            // clean up calls anyway
            closeException = new IOException("Unexpectedclosed connection");
            cleanupCalls();
           }
         } else {
           // log theinfo
          

           // cleanupcalls
          cleanupCalls();
         } 
       }
       

        
       privatevoid cleanupCalls() {
         Iterator<Entry<Integer,ClientCall>> itor= calls.entrySet().iterator() ;
         while (itor.hasNext()){
           ClientCallc = itor.next().getValue(); 
          c.setException(closeException); // local exception
          itor.remove();        
         }
       }
      
     
    }


    4.并发执行以下代码

    public class MyClient {
        public static voidmain(String[] args) throws Exception {

          final InetSocketAddress addr =new InetSocketAddress("localhost",
    MyServer.IPC_PORT);
          final Query query = (Query)RPC.getProxy(Query.class, MyServer.IPC_VER,
    addr, new Configuration());

          new Thread() {

              @Override
              public voidrun() {
                 FileStatusfileStatus1 = query.getFileStatus("/tmp/testIPC");
                 System.out.println(fileStatus1);

                  FileStatusfileStatus2 = query.getFileStatus("/tmp/testIPC2");
                  System.out.println(fileStatus2);

               }

           }.start();

           new Thread() {

             @Override
              public voidrun() {
                  CPUStatus cpuStatus1 =query.getCPUStatus("Intel");
                 System.out.println(cpuStatus1);

                  CPUStatus cpuStatus2 =query.getCPUStatus("AMD");
                 System.out.println(cpuStatus2);
              }

           }.start();

           new Thread() {

            @Override
             public voidrun() {

             try {
                   Queryquery2 = (Query) RPC.getProxy(Query.class,
                  MyServer.IPC_VER, addr, newConfiguration());

                   FileStatusfileStatus1 = query2.getFileStatus("/tmp/testIPC");
                  System.out.println(fileStatus1);

                   FileStatusfileStatus2 = query2.getFileStatus("/tmp/testIPC2");
                  System.out.println(fileStatus2);
                  RPC.stopProxy(query2);
               } catch(IOException e) {
                   e.printStackTrace();
               }

           }

         }.start();

          RPC.stopProxy(query);
      } 
    }


    以上三个线程可以共用同一个 Client对象、ClientConnection线程 、ClientConnectionId对象,将ClientCall放在同一个calls中
  • 相关阅读:
    2013.4.15 Particle Swarm Optimization with Skyline Operator for Fast Cloudbased Web Service Composition
    Adaptive service composition in flexible processes
    2013.4.13 DomainSpecific Service Selection for Composite Services
    2013.4.14 Modeling and Algorithms for QoSAware Service Composition in VirtualizationBased Cloud Computing
    2013.5.29 Towards Networkaware Service Composition in the Cloud
    Efficient algorithms for Web services selection with endtoend QoS constraints
    SQL Server中常用的SQL语句
    接口限流自定义注解
    linux服务器生产环境搭建
    MVEL自定义函数重复掉用报错:duplicate function
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276530.html
Copyright © 2011-2022 走看看