zoukankan      html  css  js  c++  java
  • Hadoop RPC源码阅读-客户端

    Hadoop版本Hadoop2.6

    RPC主要分为3个部分:(1)交互协议(2)客户端(3)服务端

    (2)客户端

    先展示RPC客户端实例代码

    public class LoginController {
        public static void main(String[] args) throws IOException {
          //获取RPC LoginServiceInterface协议接口的代理对象
            LoginServiceInterface proxy= RPC.getProxy(LoginServiceInterface.class,1L,new InetSocketAddress("localhost",10000),new Configuration());
            String msg=proxy.login("xiaoming","123123");
            System.out.println(msg);
        }
    }

    (1)进入上述的RPC.getProxy方法,会发现是通过获取RpcEngine接口(默认实现是WritableRpcEngine),利用WritableRpcEngine的getProxy方法获取Proxy代理,如下所示

    public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                             InetSocketAddress addr, UserGroupInformation ticket,
                             Configuration conf, SocketFactory factory,
                             int rpcTimeout, RetryPolicy connectionRetryPolicy,
                             AtomicBoolean fallbackToSimpleAuth)
        throws IOException {    
    
        if (connectionRetryPolicy != null) {
          throw new UnsupportedOperationException(
              "Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
        }
        T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
            new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
                factory, rpcTimeout, fallbackToSimpleAuth));
        return new ProtocolProxy<T>(protocol, proxy, true);
      }

    (2)上述就是客户端获取代理的过程,但是其中是如何从服务端获取通过动态代理类Invoker实现,并将代理封装成ProtocolProxy类,在本文上述的例子中,该ProtocolProxy类没有干什么,只是通过getProxy()方法将封装的代理返回给客户端

    那么我们接着分析动态代理类Invoker

    Invoker成员有Clinet类,并且全局变量ClientCache对Client进行缓存。

    动态代理类Invoker在代理对象发送请求时会自动执行invoke()方法,如下所示:

    public Object invoke(Object proxy, Method method, Object[] args)
          throws Throwable {
          long startTime = 0;
          if (LOG.isDebugEnabled()) {
            startTime = Time.now();
          }
          TraceScope traceScope = null;
          if (Trace.isTracing()) {
            traceScope = Trace.startSpan(
                method.getDeclaringClass().getCanonicalName() +
                "." + method.getName());
          }
          ObjectWritable value;
          try {
            value = (ObjectWritable)
              client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
                remoteId, fallbackToSimpleAuth);
          } finally {
            if (traceScope != null) traceScope.close();
          }
          if (LOG.isDebugEnabled()) {
            long callTime = Time.now() - startTime;
            LOG.debug("Call: " + method.getName() + " " + callTime);
          }
          return value.get();
        }

    3、上述中动态代理通过client.call方法向服务器发送请求获取返回值。

    我们还看到Invocation类封装了方法和参数,Invocation通过实现Writable实现序列化,方便数据在网络中传输,作为数据传输层,相当于VO。

    因此我们接着进入Clinet类,查看call方法干了什么。

    首先我们先看看Client类的结构,Client类包含了几个内部类:

    Call :用于封装Invocation对象,作为VO,写到服务端,同时也用于存储从服务端返回的数据
    Connection :用以处理远程连接对象。继承了Thread
    ConnectionId :唯一确定一个连接

    Client类中call()方法如下所示:

    public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
          ConnectionId remoteId, int serviceClass,
          AtomicBoolean fallbackToSimpleAuth) throws IOException {
        final Call call = createCall(rpcKind, rpcRequest);//将传入的数据封装成call对象
        Connection connection = getConnection(remoteId, call, serviceClass,
          fallbackToSimpleAuth);//获得一个连接  
        try {
          connection.sendRpcRequest(call);                 // send the rpc request向服务端发送call对象
        } catch (RejectedExecutionException e) {
          throw new IOException("connection has been closed", e);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          LOG.warn("interrupted waiting to send rpc request to server", e);
          throw new IOException(e);
        }
    
        boolean interrupted = false;
        synchronized (call) {
          while (!call.done) {
            try {
              call.wait();                           // wait for the result
            } catch (InterruptedException ie) {
              // save the fact that we were interrupted
              interrupted = true;
            }
          }
    
          if (interrupted) {
            // set the interrupt flag now that we are done waiting
            Thread.currentThread().interrupt();
          }
    
          if (call.error != null) {
            if (call.error instanceof RemoteException) {
              call.error.fillInStackTrace();
              throw call.error;
            } else { // local exception
              InetSocketAddress address = connection.getRemoteAddress();
              throw NetUtils.wrapException(address.getHostName(),
                      address.getPort(),
                      NetUtils.getHostname(),
                      0,
                      call.error);
            }
          } else {
            return call.getRpcResponse();
          }
        }
      }

     4、从上述可以看到,rpcRequest是将方法和参数封装后的可序列号的对象,当做请求参数发送给服务端。

    在上述方法中主要使用了两个类Call和Connection.

    Call:封装了与服务端请求的状态,包括:

        final int id;               // call id该请求连接ID
        final int retry;           // retry count该请求重试次数
        final Writable rpcRequest;  // the serialized rpc request该请求参数
        Writable rpcResponse;       // null if rpc has error该请求的返回值
        IOException error;          // exception, null if success该请求成功标示
        final RPC.RpcKind rpcKind;      // Rpc EngineKind使用RpcEngine的类型
        boolean done;               // true when call is done该请求完成标示

    Connection则是实现了与服务端建立连接,发送请求,获取数据等功能。

    5、Connection类解析

    Connection类继承线程类Thread.

     从3步可以看到在Clinet的call()方法通过getConnection()方法获取Connection,如下所示:

    可以看出Client使用connections对客户端每一个connection进行缓存,

    并通过setupIOstreams()方法与服务器建立Socket连接,并创建输入输出流connection.in,connection.out,

    并通过start()方法启动该线程也就是运行Connection类的run()方法,等待服务端传回数据。

    因此Connection类主要通过run()方法接受数据,通过sendRpcRequest()向服务端发送请求。

    private Connection getConnection(ConnectionId remoteId,
          Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
          throws IOException {
        if (!running.get()) {
          // the client is stopped
          throw new IOException("The client is stopped");
        }
        Connection connection;
        /* we could avoid this allocation for each RPC by having a  
         * connectionsId object and with set() method. We need to manage the
         * refs for keys in HashMap properly. For now its ok.
         */
        do {
          synchronized (connections) {
            connection = connections.get(remoteId);
            if (connection == null) {
              connection = new Connection(remoteId, serviceClass);
              connections.put(remoteId, connection);
            }
          }
        } while (!connection.addCall(call));
        
        //we don't invoke the method below inside "synchronized (connections)"
        //block above. The reason for that is if the server happens to be slow,
        //it will take longer to establish a connection and that will slow the
        //entire system down.
        connection.setupIOstreams(fallbackToSimpleAuth);
        return connection;
      }

    5.1 Connection 的sendRpcRequest()向服务端发送请求

    public void sendRpcRequest(final Call call)
            throws InterruptedException, IOException {
          if (shouldCloseConnection.get()) {
            return;
          }
    
          // Serialize the call to be sent. This is done from the actual
          // caller thread, rather than the sendParamsExecutor thread,
          
          // so that if the serialization throws an error, it is reported
          // properly. This also parallelizes the serialization.
          //
          // Format of a call on the wire:
          // 0) Length of rest below (1 + 2)
          // 1) RpcRequestHeader  - is serialized Delimited hence contains length
          // 2) RpcRequest
          //
          // Items '1' and '2' are prepared here. 
          final DataOutputBuffer d = new DataOutputBuffer();
          RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
              call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
              clientId);
          header.writeDelimitedTo(d);
          call.rpcRequest.write(d);
    
          synchronized (sendRpcRequestLock) {
            Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
              @Override
              public void run() {
                try {
                  synchronized (Connection.this.out) {
                    if (shouldCloseConnection.get()) {
                      return;
                    }
                    
                    if (LOG.isDebugEnabled())
                      LOG.debug(getName() + " sending #" + call.id);
             
                    byte[] data = d.getData();
                    int totalLength = d.getLength();
                    out.writeInt(totalLength); // Total Length
                    out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
                    out.flush();
                  }
                } catch (IOException e) {
                  // exception at this point would leave the connection in an
                  // unrecoverable state (eg half a call left on the wire).
                  // So, close the connection, killing any outstanding calls
                  markClosed(e);
                } finally {
                  //the buffer is just an in-memory buffer, but it is still polite to
                  // close early
                  IOUtils.closeStream(d);
                }
              }
            });
          
            try {
              senderFuture.get();
            } catch (ExecutionException e) {
              Throwable cause = e.getCause();
              
              // cause should only be a RuntimeException as the Runnable above
              // catches IOException
              if (cause instanceof RuntimeException) {
                throw (RuntimeException) cause;
              } else {
                throw new RuntimeException("unexpected checked exception", cause);
              }
            }
          }
        }

    5.2 Connection 的run()获取服务端返回的数据

    可以看到通过receiveRpcResponse()方法通过之前建立的输入流in获取服务器传来的数据,并将数据value传给call数据对象call.setRpcResponse(value);,

    在call.setRpcResponse(value)方法中通过callComplete()将call数据对象设置成已完成,并通过notify()唤醒该call对象。

    在Client的call()方法中,检测到call对象已完成后,就将call对象中的响应数据返回给调用者。

    至此,一个完整的RPC远程过程调用的过程就完成了。

    public void run() {
          if (LOG.isDebugEnabled())
            LOG.debug(getName() + ": starting, having connections " 
                + connections.size());
    
          try {
            while (waitForWork()) {//wait here for work - read or close connection循环等待获取服务端数据
              receiveRpcResponse();//获取服务端数据的具体实现
            }
          } catch (Throwable t) {
            // This truly is unexpected, since we catch IOException in receiveResponse
            // -- this is only to be really sure that we don't leave a client hanging
            // forever.
            LOG.warn("Unexpected error reading responses on connection " + this, t);
            markClosed(new IOException("Error reading responses", t));
          }
          
          close();
          
          if (LOG.isDebugEnabled())
            LOG.debug(getName() + ": stopped, remaining connections "
                + connections.size());
        }
    private void receiveRpcResponse() {
          if (shouldCloseConnection.get()) {
            return;
          }
          touch();
          
          try {
            int totalLen = in.readInt();
            RpcResponseHeaderProto header = 
                RpcResponseHeaderProto.parseDelimitedFrom(in);
            checkResponse(header);
    
            int headerLen = header.getSerializedSize();
            headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
    
            int callId = header.getCallId();
            if (LOG.isDebugEnabled())
              LOG.debug(getName() + " got value #" + callId);
    
            Call call = calls.get(callId);
            RpcStatusProto status = header.getStatus();
            if (status == RpcStatusProto.SUCCESS) {
              Writable value = ReflectionUtils.newInstance(valueClass, conf);
              value.readFields(in);                 // read value
              calls.remove(callId);
              call.setRpcResponse(value);
              
              // verify that length was correct
              // only for ProtobufEngine where len can be verified easily
              if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
                ProtobufRpcEngine.RpcWrapper resWrapper = 
                    (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
                if (totalLen != headerLen + resWrapper.getLength()) { 
                  throw new RpcClientException(
                      "RPC response length mismatch on rpc success");
                }
              }
            } else { // Rpc Request failed
              // Verify that length was correct
              if (totalLen != headerLen) {
                throw new RpcClientException(
                    "RPC response length mismatch on rpc error");
              }
              
              final String exceptionClassName = header.hasExceptionClassName() ?
                    header.getExceptionClassName() : 
                      "ServerDidNotSetExceptionClassName";
              final String errorMsg = header.hasErrorMsg() ? 
                    header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
              final RpcErrorCodeProto erCode = 
                        (header.hasErrorDetail() ? header.getErrorDetail() : null);
              if (erCode == null) {
                 LOG.warn("Detailed error code not set by server on rpc error");
              }
              RemoteException re = 
                  ( (erCode == null) ? 
                      new RemoteException(exceptionClassName, errorMsg) :
                  new RemoteException(exceptionClassName, errorMsg, erCode));
              if (status == RpcStatusProto.ERROR) {
                calls.remove(callId);
                call.setException(re);
              } else if (status == RpcStatusProto.FATAL) {
                // Close the connection
                markClosed(re);
              }
            }
          } catch (IOException e) {
            markClosed(e);
          }
        }
  • 相关阅读:
    svn command line tag
    MDbg.exe(.NET Framework 命令行调试程序)
    Microsoft Web Deployment Tool
    sql server CI
    VS 2010 One Click Deployment Issue “Application Validation did not succeed. Unable to continue”
    mshtml
    大厂程序员站错队被架空,只拿着五折工资!苟活和离职,如何选择?
    揭秘!Windows 为什么会蓝屏?微软程序员竟说是这个原因...
    喂!千万别忘了这个C语言知识!(~0 == -1 问题)
    Linux 比 Windows 更好,谁反对?我有13个赞成理由
  • 原文地址:https://www.cnblogs.com/arbitrary/p/5628737.html
Copyright © 2011-2022 走看看