zoukankan      html  css  js  c++  java
  • 4_分布式通信框架RMI

    简介

    1. 什么是RPC
      • RPC(Remote Procedure Call, 远程过程调用), 一般用来实现部署在不同机器上的系统之间的方法调用. 使程序能像访问本地资源一样, 通过网络传输去访问远端系统资源.
      • 对客户端来说, 传输层用什么协议, 序列化, 反序列化过程都是透明的, 不用管.
    2. Java RMI
      • RMI(Remote Method Invocation, 远程方法调用). 这是一种用于远程过程调用的应用程序编程接口, 纯Java的网络分布式应用系统解决方案.
      • 目前RMI使用Java远程消息交换协议JRMP通信(专为Java对象定制)
      • RMI框架开发的应用系统能部署在任何支持JRE的平台上, 但对于非Java语言开发的应用系统的支持不足, 无法与非Java语言书写的对象进行通信.

    RMI代码实践

    • 创建远程接口, 并且继承java.rmi.Remote接口
      public interface iHelloService extends Remote {
      
          String sayHello(String msg) throws RemoteException;
      }
    • 远程对象必须实现UnicastRemoteObject, 这样才能保证客户端访问获得远程对象时, 该远程对象会把自身的一个拷贝以Socket的形式传输给客户端
      public class HelloServiceImpl extends UnicastRemoteObject implements iHelloService {
      
      
          protected HelloServiceImpl() throws RemoteException {
              super();
          }
      
          @Override
          public String sayHello(String msg) throws RemoteException {
              return "Hello" + msg;
          }
      }
    • 此时服务端本身已存在的远程对象称为"skeleton", 而skeleton是服务端的一个代理, 用于接收客户端的请求后调用远程方法来响应客户端请求.
      public class Server {
      
          public static void main(String[] args) {
              try {
                  iHelloService helloService = new HelloServiceImpl();

      //创建服务器程序:注册 LocateRegistry.createRegistry(
      1099); Naming.rebind("rmi://127.0.0.1/Hello", helloService); System.out.println("服务启动成功"); } catch (RemoteException e) { e.printStackTrace(); } catch (MalformedURLException e) { e.printStackTrace(); } } }
    • 而客户端获得的拷贝称为"stub", 它是客户端的一个代理.
      public class ClientDemo  {
      
          public static void main(String[] args) throws RemoteException, NotBoundException, MalformedURLException {
      //创建客户端程序 iHelloService helloService
      = (iHelloService) Naming.lookup("rmi://127.0.0.1/Hello"); System.out.println(helloService.sayHello("MR. White")); } }
    • 这里说一下RMI的底层实现

    源码解读

    • 类图
      • 远程对象的发布
      • 远程引用层
    •  发布远程对象

      1. 从上面类图可知, 这里会发布两个远程对象, 一个RegistryImpl, 另一个是自己写的RMI实现类对象.
      2. 从HelloServiceImpl构造器看起, 调用了父类UnicastRemoteObject的给构造方法.
        protected UnicastRemoteObject(int port) throws RemoteException
            {
                this.port = port;
                exportObject((Remote) this, port);
            }
      3. 追溯到了私有方法exportObject()
        • 这里做了个判断, 判断服务的实现类是不是UnicastRemoteObject的子类, 如果是, 则直接把传入的UnicastServerRef对象赋值给其ref(RemoteRef)对象.
        • 反之, 则调用UnicastServerRef的exportObject()方法
          public static Remote exportObject(Remote obj, int port)
                  throws RemoteException
              {
                  //创建UnicastServerRef对象, 对象内引用了LiveRef(tcp通信)
                  return exportObject(obj, new UnicastServerRef(port));
              }
          private static Remote exportObject(Remote obj, UnicastServerRef sref) throws RemoteException { // if obj extends UnicastRemoteObject, set its ref. if (obj instanceof UnicastRemoteObject) { ((UnicastRemoteObject) obj).ref = sref; } return sref.exportObject(obj, null, false); }
          public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException { Class var4 = var1.getClass(); //创建远程代理类, 该代理是对OperationImpl对象的代理, getClientRef提供的InvocationHandler中提供了TCP连接. Remote var5; try { var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse); } catch (IllegalArgumentException var7) { throw new ExportException("remote object implements illegal remote interface", var7); } if (var5 instanceof RemoteStub) { this.setSkeleton(var1); } //包装实际对象, 并将其暴露在TCP端口上, 等待客户端调用 Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3); this.ref.exportObject(var6); this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4); return var5; }
        • 因为HelloServiceImpl继承了UnicastRemoteObject, 所以服务启动的时候, 会通过UnicastRemoteObject的构造方法把对象进行发布.
    • 服务端启动Registry
      • LocateRegistry.createRegistry(1099);
      1. 从上面这行代码入手, 开始往下看. 可以发现服务端创建 了一个RegistryImpl对象, 这里做了一个判断,
        • 如果服务端指定的端口是1099并且系统开启了安全管理器, 那么就可以在限定的权限集内绕过系统的安全校验, 这样纯粹是为了提高效率.
          public RegistryImpl(final int var1) throws RemoteException {
                  this.bindings = new Hashtable(101);
                  if (var1 == 1099 && System.getSecurityManager() != null) {
                      try {
                          AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
                              public Void run() throws RemoteException {
                                  LiveRef var1x = new LiveRef(RegistryImpl.id, var1);
                                  RegistryImpl.this.setup(new UnicastServerRef(var1x, (var0) -> {
                                      return RegistryImpl.registryFilter(var0);
                                  }));
                                  return null;
                              }
                          }, (AccessControlContext)null, new SocketPermission("localhost:" + var1, "listen,accept"));
                      } catch (PrivilegedActionException var3) {
                          throw (RemoteException)var3.getException();
                      }
                  } else {
                      LiveRef var2 = new LiveRef(id, var1);
                      this.setup(new UnicastServerRef(var2, RegistryImpl::registryFilter));
                  }
          
              }
        • 如果没有, 将执行this.setup(), setup方法将传入的UnicastServerRef对象赋值给正在初始化的RegistryImpl对象的远程引用ref. 这里涉及到了向上转型, 然后继续执行UnicastServerRef的exportObject方法
          private void setup(UnicastServerRef var1) throws RemoteException {
                  this.ref = var1;
                  var1.exportObject(this, (Object)null, true);
              }
      2. 进入到UnicastServerRef的exportObject()方法,
        • 这里首先为传入的RegistryImpl创建一个代理, 这个代理我们可以推断出就是后面服务于客户端的RegistryImpl的Stub (RegistryImpl_Stub)对象. 
        • 然后将UnicastServerRef 的skel(skeleton)对象设置为当前RegistryImpl对象. 
        • 最后用 skeleton, stub, UnicastServerRef 对象, id和一个boolean值构造了一个Target对象, 也就是这个Target对象基本包含了全部的信息, 等待TCP的调用.
          public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException {
                  Class var4 = var1.getClass();
          
                  Remote var5;
                  try {
                      var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse);
                  } catch (IllegalArgumentException var7) {
                      throw new ExportException("remote object implements illegal remote interface", var7);
                  }
          
                  if (var5 instanceof RemoteStub) {
                      this.setSkeleton(var1);
                  }
          
                  Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3);
                  this.ref.exportObject(var6);
                  this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4);
                  return var5;
              }
      3. 到上面为止, 我们看到的都是一些变量的赋值和创建工作, 还没有到连接层, 这些引用对象将会被Stub和Skeleton对象使用. 接下来就是连接层上的了.
        • 追溯LiveRef的exportObject()方法, 很容易找到了TCPTransport的exportObject(), 这个方法做的事情就是将上面构造的Target对象暴露出去.
          //TCPEndpoint类中
              public void exportObject(Target var1) throws RemoteException {
                  this.transport.exportObject(var1);
              }
          
              //TCPTransport类中
              public void exportObject(Target var1) throws RemoteException {
                  synchronized(this) {
                      this.listen();
                      ++this.exportCount;
                  }
                  boolean var2 = false;
                  boolean var12 = false;
                  try {
                      var12 = true;
                      super.exportObject(var1);
                      var2 = true;
                      var12 = false;
                  } finally {
                      if (var12) {
                          if (!var2) {
                              synchronized(this) {
                                  this.decrementExportCount();
                              }
                          }
                      }
                  }
                  if (!var2) {
                      synchronized(this) {
                          this.decrementExportCount();
                      }
                  }
              }
        • 调用TCPTransport的listen()方法, listen()方法创建了一个ServerSocket, 并且启动了一条线程等待客户端的请求.
          private void listen() throws RemoteException {
                  assert Thread.holdsLock(this);
                  TCPEndpoint var1 = this.getEndpoint();
                  int var2 = var1.getPort();
                  if (this.server == null) {
                      if (tcpLog.isLoggable(Log.BRIEF)) {
                          tcpLog.log(Log.BRIEF, "(port " + var2 + ") create server socket");
                      }
          
                      try {
                          this.server = var1.newServerSocket();
                          Thread var3 = (Thread)AccessController.doPrivileged(new NewThreadAction(new TCPTransport.AcceptLoop(this.server), "TCP Accept-" + var2, true));
                          var3.start();
                      } catch (BindException var4) {
                          throw new ExportException("Port already in use: " + var2, var4);
                      } catch (IOException var5) {
                          throw new ExportException("Listen failed on port: " + var2, var5);
                      }
                  } else {
                      SecurityManager var6 = System.getSecurityManager();
                      if (var6 != null) {
                          var6.checkListen(var2);
                      }
                  }
              }
        • 接着调用父类Transport的exportObject()将Target对象存放进ObjectTable中.
      4. 此时, 我们已经将RegistryImpl对象创建并起了服务等待客户端的请求.
    • 客户端获取服务端Registry代理
      • iHelloService helloService = (iHelloService) Naming.lookup("rmi://127.0.0.1/Hello");
      1. 从上述代码看起 (CleintDemo中), 很容易追溯到LocateRegistry的getRegistry()方法, 该方法通过传入的host和port构造RemoteRef对象, 并创建了一个本地代理, 这个代理对象其实是RegistryImpl_Stub对象. 这样客户端便有了服务端的RegistryImpl的代理(取决于ignoreStubClasses变量). 
        private static Registry getRegistry(ParsedNamingURL parsed)
                throws RemoteException
            {
                return LocateRegistry.getRegistry(parsed.host, parsed.port);
            }
        
        //========================================
        
            public static Registry getRegistry(String host, int port,
                                               RMIClientSocketFactory csf)
                throws RemoteException
            {
                Registry registry = null;
        
        //获取仓库地址
        if (port <= 0) port = Registry.REGISTRY_PORT; if (host == null || host.length() == 0) { // If host is blank (as returned by "file:" URL in 1.0.2 used in // java.rmi.Naming), try to convert to real local host name so // that the RegistryImpl's checkAccess will not fail. try { host = java.net.InetAddress.getLocalHost().getHostAddress(); } catch (Exception e) { // If that failed, at least try "" (localhost) anyway... host = ""; } } LiveRef liveRef = new LiveRef(new ObjID(ObjID.REGISTRY_ID), new TCPEndpoint(host, port, csf, null), false); RemoteRef ref = (csf == null) ? new UnicastRef(liveRef) : new UnicastRef2(liveRef);
        //创建远程代理类, 引用liveRef, 好让动态代理时, 能进行tcp通信
        return (Registry) Util.createProxy(RegistryImpl.class, ref, false); }
      2. 但注意此时这个代理其实还没有和服务端的RegistryImpl对象关联, 毕竟是两个VM上面的对象, 这里我们也可以猜测, 代理和远程的Registry对象之间是通过socket消息完成的.
      3. 调用RegistryImpl_Stub的ref(RemoteRef)对象的newCall()方法, 将RegistryImpl_Stub对象传了进去, 不要忘了构造它的时候我们将服务器的主机端口等信息传了进去, 也就是我们把服务器相关的信息也传进了newCall()方法.
        • newCall()方法做的事情简单来看就是建立了跟远程RegistryImpl的Skeleton对象的连接(上面我们说到过服务端通过TCPTransport的exportObject()方法等待着客户端的请求)
          public RemoteCall newCall(RemoteObject var1, Operation[] var2, int var3, long var4) throws RemoteException {
                  clientRefLog.log(Log.BRIEF, "get connection");
                  Connection var6 = this.ref.getChannel().newConnection();
          
                  try {
                      clientRefLog.log(Log.VERBOSE, "create call context");
                      if (clientCallLog.isLoggable(Log.VERBOSE)) {
                          this.logClientCall(var1, var2[var3]);
                      }
          
                      StreamRemoteCall var7 = new StreamRemoteCall(var6, this.ref.getObjID(), var3, var4);
          
                      try {
                          this.marshalCustomCallData(var7.getOutputStream());
                      } catch (IOException var9) {
                          throw new MarshalException("error marshaling custom call data");
                      }
          
                      return var7;
                  } catch (RemoteException var10) {
                      this.ref.getChannel().free(var6, false);
                      throw var10;
                  }
              }
      4. 连接建立之后就是发送请求了, 我们知道客户端终究只是拥有Registry对象的代理, 而不是真正的位于服务端Registry对象本身, 他们位于不同的虚拟机实例之中, 无法直接调用, 必然是通过消息进行交互的.
        • 看看super.ref.invoke()做了什么
        • 容易追溯到StreamRemoteCall的executeCall()方法. 看似本地调用, 但其实很容易从代码中看出来是通过tcp连接发送消息到服务端, 由服务端解析并且处理调用.
          public void invoke(RemoteCall var1) throws Exception {
                  try {
                      clientRefLog.log(Log.VERBOSE, "execute call");
                      var1.executeCall();
                  } catch (RemoteException var3) {
                      clientRefLog.log(Log.BRIEF, "exception: ", var3);
                      this.free(var1, false);
                      throw var3;
                  } catch (Error var4) {
                      clientRefLog.log(Log.BRIEF, "error: ", var4);
                      this.free(var1, false);
                      throw var4;
                  } catch (RuntimeException var5) {
                      clientRefLog.log(Log.BRIEF, "exception: ", var5);
                      this.free(var1, false);
                      throw var5;
                  } catch (Exception var6) {
                      clientRefLog.log(Log.BRIEF, "exception: ", var6);
                      this.free(var1, true);
                      throw var6;
                  }
              }
      5. 至此, 客户端的查询服务请求发出了.
    • 服务器接收客户端的服务查询请求并返回给客户端结果
      • ######
    • 客户端获取通过lookUp()查询获得的客户端HelloServiceImpl的Stub对象.
      1. 客户端通过Lookup查询获得的是客户端HelloServiceImpl的Stub对象(这一块我们看不到, 因为Skeleton为我们屏蔽了),
      2. 然后后续的处理仍然是通过HelloServiceImpl_Stub代理对象通过socket网络请求到服务端,
      3. 通过服务端的HelloServiceImpl_Stub(Skeleton)进行代理, 将请求通过Dispatcher转发到对应的服务端方法获得结果以后再次通过socket把结果返回到客户端.

    RPC的基本原理

    1.   

    2. 要说明, 在RMI Client实施正式的RMI调用前, 它必须通过LocateRegistry或者Naming方式到RMI注册表寻找要调用的RMI注册信息. 找到RMI事务注册信息后, Client会从RMI注册表获取这个RMI Remote Service的Stub信息. 这个过程成功后, RMI Client才能开始正式的调用过程.
    3. 另外要说明的是RMI Client正式调用过程, 也不是由RMI Client直接访问Remote Service, 而是由客户端获取的Stub作为RMI Client的代理访问Remote Service的代理Skeleton, 如上图所示的顺序. 也就是说真实的请求调用是在Stub-Skeleton之间进行的. Registry并不参与具体的Stub-Skeleton的调用过程, 只负责记录“哪个服务名”使用哪一个Stub, 并在 Remote Client询问它时将这个Stub拿给Client.
  • 相关阅读:
    4年Java程序员十面阿里终拿下offer,评级P6+年薪30-40w无股票
    真香警告!手绘172张图解HTTP协议+703页TCP/IP协议笔记
    Git官方和创始人都推荐的Git权威指南,广度深度和实战性史无前例
    阿里“教授”总结整理手写大型网站技术架构:核心原理与案例分析
    GitHub上120K Stars国内第一的Java多线程PDF到底有什么魅力?
    霸榜GitHub必读书籍:编写高质量代码改善Java程序员的151个建议
    GitHub上260K Stars的P8架构师纯手写的Java高并发编程详解
    LeetCode每日一题:802 找到最终安全状态
    LeetCode每日一题:662二叉树最大宽度
    Springboot之security框架 登录安全验证授权流程
  • 原文地址:https://www.cnblogs.com/binwenhome/p/12936245.html
Copyright © 2011-2022 走看看