zoukankan      html  css  js  c++  java
  • Hadoop RPC源码阅读-客户端

    Hadoop版本Hadoop2.6

    RPC主要分为3个部分:(1)交互协议(2)客户端(3)服务端

    (2)客户端

    先展示RPC客户端实例代码

    public class LoginController {
        public static void main(String[] args) throws IOException {
          //获取RPC LoginServiceInterface协议接口的代理对象
            LoginServiceInterface proxy= RPC.getProxy(LoginServiceInterface.class,1L,new InetSocketAddress("localhost",10000),new Configuration());
            String msg=proxy.login("xiaoming","123123");
            System.out.println(msg);
        }
    }

    (1)进入上述的RPC.getProxy方法,会发现是通过获取RpcEngine接口(默认实现是WritableRpcEngine),利用WritableRpcEngine的getProxy方法获取Proxy代理,如下所示

    public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                             InetSocketAddress addr, UserGroupInformation ticket,
                             Configuration conf, SocketFactory factory,
                             int rpcTimeout, RetryPolicy connectionRetryPolicy,
                             AtomicBoolean fallbackToSimpleAuth)
        throws IOException {    
    
        if (connectionRetryPolicy != null) {
          throw new UnsupportedOperationException(
              "Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
        }
        T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
            new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
                factory, rpcTimeout, fallbackToSimpleAuth));
        return new ProtocolProxy<T>(protocol, proxy, true);
      }

    (2)上述就是客户端获取代理的过程,但是其中是如何从服务端获取通过动态代理类Invoker实现,并将代理封装成ProtocolProxy类,在本文上述的例子中,该ProtocolProxy类没有干什么,只是通过getProxy()方法将封装的代理返回给客户端

    那么我们接着分析动态代理类Invoker

    Invoker成员有Clinet类,并且全局变量ClientCache对Client进行缓存。

    动态代理类Invoker在代理对象发送请求时会自动执行invoke()方法,如下所示:

    public Object invoke(Object proxy, Method method, Object[] args)
          throws Throwable {
          long startTime = 0;
          if (LOG.isDebugEnabled()) {
            startTime = Time.now();
          }
          TraceScope traceScope = null;
          if (Trace.isTracing()) {
            traceScope = Trace.startSpan(
                method.getDeclaringClass().getCanonicalName() +
                "." + method.getName());
          }
          ObjectWritable value;
          try {
            value = (ObjectWritable)
              client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
                remoteId, fallbackToSimpleAuth);
          } finally {
            if (traceScope != null) traceScope.close();
          }
          if (LOG.isDebugEnabled()) {
            long callTime = Time.now() - startTime;
            LOG.debug("Call: " + method.getName() + " " + callTime);
          }
          return value.get();
        }

    3、上述中动态代理通过client.call方法向服务器发送请求获取返回值。

    我们还看到Invocation类封装了方法和参数,Invocation通过实现Writable实现序列化,方便数据在网络中传输,作为数据传输层,相当于VO。

    因此我们接着进入Clinet类,查看call方法干了什么。

    首先我们先看看Client类的结构,Client类包含了几个内部类:

    Call :用于封装Invocation对象,作为VO,写到服务端,同时也用于存储从服务端返回的数据
    Connection :用以处理远程连接对象。继承了Thread
    ConnectionId :唯一确定一个连接

    Client类中call()方法如下所示:

    public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
          ConnectionId remoteId, int serviceClass,
          AtomicBoolean fallbackToSimpleAuth) throws IOException {
        final Call call = createCall(rpcKind, rpcRequest);//将传入的数据封装成call对象
        Connection connection = getConnection(remoteId, call, serviceClass,
          fallbackToSimpleAuth);//获得一个连接  
        try {
          connection.sendRpcRequest(call);                 // send the rpc request向服务端发送call对象
        } catch (RejectedExecutionException e) {
          throw new IOException("connection has been closed", e);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          LOG.warn("interrupted waiting to send rpc request to server", e);
          throw new IOException(e);
        }
    
        boolean interrupted = false;
        synchronized (call) {
          while (!call.done) {
            try {
              call.wait();                           // wait for the result
            } catch (InterruptedException ie) {
              // save the fact that we were interrupted
              interrupted = true;
            }
          }
    
          if (interrupted) {
            // set the interrupt flag now that we are done waiting
            Thread.currentThread().interrupt();
          }
    
          if (call.error != null) {
            if (call.error instanceof RemoteException) {
              call.error.fillInStackTrace();
              throw call.error;
            } else { // local exception
              InetSocketAddress address = connection.getRemoteAddress();
              throw NetUtils.wrapException(address.getHostName(),
                      address.getPort(),
                      NetUtils.getHostname(),
                      0,
                      call.error);
            }
          } else {
            return call.getRpcResponse();
          }
        }
      }

     4、从上述可以看到,rpcRequest是将方法和参数封装后的可序列号的对象,当做请求参数发送给服务端。

    在上述方法中主要使用了两个类Call和Connection.

    Call:封装了与服务端请求的状态,包括:

        final int id;               // call id该请求连接ID
        final int retry;           // retry count该请求重试次数
        final Writable rpcRequest;  // the serialized rpc request该请求参数
        Writable rpcResponse;       // null if rpc has error该请求的返回值
        IOException error;          // exception, null if success该请求成功标示
        final RPC.RpcKind rpcKind;      // Rpc EngineKind使用RpcEngine的类型
        boolean done;               // true when call is done该请求完成标示

    Connection则是实现了与服务端建立连接,发送请求,获取数据等功能。

    5、Connection类解析

    Connection类继承线程类Thread.

     从3步可以看到在Clinet的call()方法通过getConnection()方法获取Connection,如下所示:

    可以看出Client使用connections对客户端每一个connection进行缓存,

    并通过setupIOstreams()方法与服务器建立Socket连接,并创建输入输出流connection.in,connection.out,

    并通过start()方法启动该线程也就是运行Connection类的run()方法,等待服务端传回数据。

    因此Connection类主要通过run()方法接受数据,通过sendRpcRequest()向服务端发送请求。

    private Connection getConnection(ConnectionId remoteId,
          Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
          throws IOException {
        if (!running.get()) {
          // the client is stopped
          throw new IOException("The client is stopped");
        }
        Connection connection;
        /* we could avoid this allocation for each RPC by having a  
         * connectionsId object and with set() method. We need to manage the
         * refs for keys in HashMap properly. For now its ok.
         */
        do {
          synchronized (connections) {
            connection = connections.get(remoteId);
            if (connection == null) {
              connection = new Connection(remoteId, serviceClass);
              connections.put(remoteId, connection);
            }
          }
        } while (!connection.addCall(call));
        
        //we don't invoke the method below inside "synchronized (connections)"
        //block above. The reason for that is if the server happens to be slow,
        //it will take longer to establish a connection and that will slow the
        //entire system down.
        connection.setupIOstreams(fallbackToSimpleAuth);
        return connection;
      }

    5.1 Connection 的sendRpcRequest()向服务端发送请求

    public void sendRpcRequest(final Call call)
            throws InterruptedException, IOException {
          if (shouldCloseConnection.get()) {
            return;
          }
    
          // Serialize the call to be sent. This is done from the actual
          // caller thread, rather than the sendParamsExecutor thread,
          
          // so that if the serialization throws an error, it is reported
          // properly. This also parallelizes the serialization.
          //
          // Format of a call on the wire:
          // 0) Length of rest below (1 + 2)
          // 1) RpcRequestHeader  - is serialized Delimited hence contains length
          // 2) RpcRequest
          //
          // Items '1' and '2' are prepared here. 
          final DataOutputBuffer d = new DataOutputBuffer();
          RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
              call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
              clientId);
          header.writeDelimitedTo(d);
          call.rpcRequest.write(d);
    
          synchronized (sendRpcRequestLock) {
            Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
              @Override
              public void run() {
                try {
                  synchronized (Connection.this.out) {
                    if (shouldCloseConnection.get()) {
                      return;
                    }
                    
                    if (LOG.isDebugEnabled())
                      LOG.debug(getName() + " sending #" + call.id);
             
                    byte[] data = d.getData();
                    int totalLength = d.getLength();
                    out.writeInt(totalLength); // Total Length
                    out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
                    out.flush();
                  }
                } catch (IOException e) {
                  // exception at this point would leave the connection in an
                  // unrecoverable state (eg half a call left on the wire).
                  // So, close the connection, killing any outstanding calls
                  markClosed(e);
                } finally {
                  //the buffer is just an in-memory buffer, but it is still polite to
                  // close early
                  IOUtils.closeStream(d);
                }
              }
            });
          
            try {
              senderFuture.get();
            } catch (ExecutionException e) {
              Throwable cause = e.getCause();
              
              // cause should only be a RuntimeException as the Runnable above
              // catches IOException
              if (cause instanceof RuntimeException) {
                throw (RuntimeException) cause;
              } else {
                throw new RuntimeException("unexpected checked exception", cause);
              }
            }
          }
        }

    5.2 Connection 的run()获取服务端返回的数据

    可以看到通过receiveRpcResponse()方法通过之前建立的输入流in获取服务器传来的数据,并将数据value传给call数据对象call.setRpcResponse(value);,

    在call.setRpcResponse(value)方法中通过callComplete()将call数据对象设置成已完成,并通过notify()唤醒该call对象。

    在Client的call()方法中,检测到call对象已完成后,就将call对象中的响应数据返回给调用者。

    至此,一个完整的RPC远程过程调用的过程就完成了。

    public void run() {
          if (LOG.isDebugEnabled())
            LOG.debug(getName() + ": starting, having connections " 
                + connections.size());
    
          try {
            while (waitForWork()) {//wait here for work - read or close connection循环等待获取服务端数据
              receiveRpcResponse();//获取服务端数据的具体实现
            }
          } catch (Throwable t) {
            // This truly is unexpected, since we catch IOException in receiveResponse
            // -- this is only to be really sure that we don't leave a client hanging
            // forever.
            LOG.warn("Unexpected error reading responses on connection " + this, t);
            markClosed(new IOException("Error reading responses", t));
          }
          
          close();
          
          if (LOG.isDebugEnabled())
            LOG.debug(getName() + ": stopped, remaining connections "
                + connections.size());
        }
    private void receiveRpcResponse() {
          if (shouldCloseConnection.get()) {
            return;
          }
          touch();
          
          try {
            int totalLen = in.readInt();
            RpcResponseHeaderProto header = 
                RpcResponseHeaderProto.parseDelimitedFrom(in);
            checkResponse(header);
    
            int headerLen = header.getSerializedSize();
            headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
    
            int callId = header.getCallId();
            if (LOG.isDebugEnabled())
              LOG.debug(getName() + " got value #" + callId);
    
            Call call = calls.get(callId);
            RpcStatusProto status = header.getStatus();
            if (status == RpcStatusProto.SUCCESS) {
              Writable value = ReflectionUtils.newInstance(valueClass, conf);
              value.readFields(in);                 // read value
              calls.remove(callId);
              call.setRpcResponse(value);
              
              // verify that length was correct
              // only for ProtobufEngine where len can be verified easily
              if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
                ProtobufRpcEngine.RpcWrapper resWrapper = 
                    (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
                if (totalLen != headerLen + resWrapper.getLength()) { 
                  throw new RpcClientException(
                      "RPC response length mismatch on rpc success");
                }
              }
            } else { // Rpc Request failed
              // Verify that length was correct
              if (totalLen != headerLen) {
                throw new RpcClientException(
                    "RPC response length mismatch on rpc error");
              }
              
              final String exceptionClassName = header.hasExceptionClassName() ?
                    header.getExceptionClassName() : 
                      "ServerDidNotSetExceptionClassName";
              final String errorMsg = header.hasErrorMsg() ? 
                    header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
              final RpcErrorCodeProto erCode = 
                        (header.hasErrorDetail() ? header.getErrorDetail() : null);
              if (erCode == null) {
                 LOG.warn("Detailed error code not set by server on rpc error");
              }
              RemoteException re = 
                  ( (erCode == null) ? 
                      new RemoteException(exceptionClassName, errorMsg) :
                  new RemoteException(exceptionClassName, errorMsg, erCode));
              if (status == RpcStatusProto.ERROR) {
                calls.remove(callId);
                call.setException(re);
              } else if (status == RpcStatusProto.FATAL) {
                // Close the connection
                markClosed(re);
              }
            }
          } catch (IOException e) {
            markClosed(e);
          }
        }
  • 相关阅读:
    模块模式——属性
    防止变量被覆盖
    自执行匿名函数语法和普通函数语法对比
    JavaScript更改原型
    JavaScript覆盖原型以及更改原型
    JavaScript原型链
    作用域链和原型链描述javaScript访问变量和属性的顺序
    javascript高级变量提升和执行环境对象
    构建第一个单页应用
    error: expected identifier before numeric constant 问题
  • 原文地址:https://www.cnblogs.com/arbitrary/p/5628737.html
Copyright © 2011-2022 走看看