zoukankan      html  css  js  c++  java
  • 简单实现Java的RMI——远程方法调用

    一、RMI简介:

    说到RMI就不得不说RPC了。

    RPC:(Remote Procedure Call),远程过程调用。

    RMI(Remote Method Invocation),远程方法调用。

    RPC和RMI是有区别的,RPC中是通过网络服务协议向远程主机发送请求,RPC远程主机就去搜索与之相匹配的类和方法,找到后就执行方法并把结果编码,通过网络协议发回。

    而RMI是通过客户端的对象作为远程接口进行远程方法的调用。RMI只适用于Java语言。

    二、RMI的运行机理:

    涉及两个网络端。其核心思想是,一个端可以通过调用另一个端的方法,实现相关功能。
    一个端“执行”一个方法,而这个方法的实际执行是在另一端进行的!

    当然,两个端都应该有相同的类,自然会拥有相同的方法。
    一个端所谓的执行这个方法,其实是通过调用这个类的代理对象的方法,在其中拦截这个方法,在这个方法中
    实际上是将执行这个方法的参数和类名称、方法名称,通过网络通讯传输给另一端;另一端根据得到的方法名称、
    类名称和参数,实际执行那个方法,再将方法执行结果回传给对端。
    要注意的问题:
    1、实际执行方法的一端,我们可以认为是RMI服务器端,伪执行一端,自然是RMI客户端;
    2、伪执行端不应该自己完成参数、方法名称和类名称的传递工作;也就是说,对于RMI客户端用户而言,他只面对一个类的一个方法,
        直接执行就好;
    3、RMI服务器端可能接收多个RMI客户端有关这个方法的执行请求,每个客户端的执行当然应该是独立的,应该用线程实现;
    4、RMI服务器端在执行了相关方法,并回传方法执行结果后,应该断开与RMI客户端的连接。

    下面是我要实现它的一个思路。

    上图由于截图原因,给点补充说明:RpcClientExecutor的作用是建立和服务器的连接,并接受消息和发送消息,具体是发送方法的序列号和参数类型。

    1.首先:应该是RpcBeanDefinition:

     1 package com.xupt.rpc.core;
     2 
     3 import java.lang.reflect.Method;
     4 
     5 public class RpcBeanDefination {   //将类,类方法,类对象封装在Definition中。
     6 
     7     private Class<?> klass;
     8     private Method method;
     9     private Object object;
    10     
    11     RpcBeanDefination() {
    12     }

    给该类所有的成员都有getter和setter方法就不需要说了,这个类,将执行的哪个类的哪个方法和类的对象封装起来,以后这个类将形成Map

    的值,下面来介绍的RpcBeanFactory会重点介绍。

    2.RpcBeanFactory

     1 package com.xupt.rpc.core;
     2 
     3 import java.util.HashMap;
     4 import java.util.Map;
     5 
     6 public class RpcBeanFactory {
     7 
     8     private final Map<String, RpcBeanDefination> beanMap;
     9     
    10     RpcBeanFactory() {
    11         beanMap = new HashMap<>();
    12     }
    13     
    14     void rpcBeanRegistry(String beanId,RpcBeanDefination defination) {
    15         RpcBeanDefination rbd = beanMap.get(beanId);
    16         if(rbd != null) {
    17             return;
    18         }
    19         beanMap.put(beanId, defination);
    20     }
    21     
    22     RpcBeanDefination getBean(String beanId) {
    23         return beanMap.get(beanId);
    24     }
    25 }

    此类是将序列号作为Map中的键,RpcBeanDeifintion作为值放入Map中,用BeanId来找对应客户端那边序列号相同的方法。

    3.下来是RpcBeanRegistry:

     1 package com.xupt.rpc.core;
     2 
     3 import java.lang.reflect.Method;
     4 
     5 public class RpcBeanRegistry {
     6     
     7     RpcBeanRegistry() {
     8     }
     9     
    10     //给客户端提供
    11      static void registryInterface(RpcBeanFactory rpcBeanFactory,Class<?> interfaces) {
    12         doregistry(rpcBeanFactory,interfaces,null);
    13     }
    14      
    15     //内部使用,注册
    16      private static void doregistry(RpcBeanFactory rpcBeanFactory , Class<?> interfaces ,Object object) {
    17           //得到接口中的所有的方法,行程方法的数组
    18          Method[] methods = interfaces.getDeclaredMethods();
    19             for(Method method : methods) {  //遍历这些方法
    20                 String beanId = String.valueOf(method.toString().hashCode());//将方法序列化。
    21                 RpcBeanDefination rpcBeanDefination = new RpcBeanDefination();
    22                 
    23                 //将得到的实现接口的那个类和它的方法以及对象放进RpcBeanDefination()中。
    24                 rpcBeanDefination.setKlass(interfaces);
    25                 rpcBeanDefination.setMethod(method);
    26                 rpcBeanDefination.setObject(object);
    27                 
    28                 rpcBeanFactory.rpcBeanRegistry(beanId, rpcBeanDefination);
    29             }
    30     }
    31     
    32      //服务端使用,知道实现类的对象。
    33      static void registryInterface(RpcBeanFactory rpcBeanFactory,Class<?> interfaces,Object object) {
    34          //判断此类是否实现了这个接口。
    35         if(!interfaces.isAssignableFrom(object.getClass())){
    36             return;
    37         }
    38         doregistry(rpcBeanFactory,interfaces,object);
    39     }
    40     
    41      //服务器端使用,知道类,创建一个对象。
    42     static void registryInterface(RpcBeanFactory rpcBeanFactory,Class<?> interfaces,Class<?> klass) {
    43         //判断该类是否实现了接口。
    44         if(!interfaces.isAssignableFrom(klass)){
    45             return;
    46         }
    47         try {
    48             doregistry(rpcBeanFactory, interfaces, klass.newInstance());
    49         } catch (Exception e) {
    50             e.printStackTrace();
    51         }
    52     }
    53 }

    这个类是同时给客户端和服务器使用的,所以有一个私有方法来完成类方法的获得和序列号的产生(method.toString().hashcode())。然后将类、方法、对象放进RpcBeanDefinition中,将得到的序列号作为键和rpcBeanDeifinyion作为值放进Map中,形成键值对,方便客户端和服务器的调用。

    4.下来是RpcServer:

    1 package com.xupt.rpc.core;

     2 
     3 import java.io.IOException;
     4 import java.net.ServerSocket;
     5 import java.net.Socket;
     6 
     7 public class RpcServer implements Runnable {
     8     
     9     private ServerSocket server;
    10     private int port;
    11     private boolean goon;
    12     private final RpcBeanFactory rpcBeanFactory;
    13     private static long executorId;
    14     
    15     public RpcServer() {
    16         rpcBeanFactory = new RpcBeanFactory();
    17         this.goon = false;
    18     }
    19 
    20     public void setPort(int port) {
    21         this.port = port;
    22     }
    23     
    24     public void startRpcServer() throws Exception {
    25         if(this.port == 0) {
    26             return;
    27         }
    28         server = new ServerSocket(port);
    29         this.goon = true;
    30         new Thread(this,"Rpc_Server").start();//启动线程
    31     }
    32     
    33     public void stopRpcServer() {   //关闭服务器
    34         if (this.server != null && !this.server.isClosed()) {
    35             try {
    36                 this.server.close();
    37             } catch (IOException e) {
    38                 e.printStackTrace();
    39             } finally {
    40                 this.server = null;
    41             }
    42         }
    43     }
    44     
    45     RpcBeanFactory getRpcBeanFactory() {
    46         return rpcBeanFactory;
    47     }
    48     
    49     //用注册的方法知道类实现的接口,类的对象,通过对象执行类的方法。
    50     public void rpcRegistry(Class<?> interfaces,Object object) {
    51         RpcBeanRegistry.registryInterface(rpcBeanFactory, interfaces, object);
    52     }
    53     
    54     public void rpcRegistry(Class<?> interfaces, Class<?> imClass) {
    55         RpcBeanRegistry.registryInterface(rpcBeanFactory, interfaces,imClass);
    56     }
    57     
    58     @Override
    59     public void run() {
    60         while(goon) {
    61             try {
    62                 Socket socket = server.accept();//不断的侦听
    63                 
    64                 new RpcServerExecutor(socket, this,++executorId);
    65                 
    66             } catch (Exception e) {
    67                 goon = false;
    68                 e.printStackTrace();
    69             }
    70         }
    71         stopRpcServer();
    72     }
    73 }

    上述服务器端的基本任务已经完成了。

    下来我们来看看客户端:

    首先还是:RpcClientExecutor,它是建立与服务器端的连接,以及接收,发送消息,具体发送的是方法的序列号和参数类型。

    package com.xupt.rpc.core;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.Socket;
    
    public class RpcClientExecutor {
    	
    	private String rpcServerIp;  //ip地址
    	private int rpcServerPort;	//端口号
    	
    	RpcClientExecutor() {
    	}
    
    	RpcClientExecutor(String rpcServerIp, int rpcServerPort) {
    		this.rpcServerIp = rpcServerIp;
    		this.rpcServerPort = rpcServerPort;
    	}
    
    	String getRpcServerIp() {
    		return rpcServerIp;
    	}
    
    	void setRpcServerIp(String rpcServerIp) {
    		this.rpcServerIp = rpcServerIp;
    	}
    
    	int getRpcServerPort() {
    		return rpcServerPort;
    	}
    
    	void setRpcServerPort(int rpcServerPort) {
    		this.rpcServerPort = rpcServerPort;
    	}
    	
    	//关闭客户端
    	private void closeSocket(ObjectInputStream ois, ObjectOutputStream oos, Socket socket) {
    		try {
    			if (ois != null) {
    				ois.close();
    			}
    		} catch (IOException e) {
    			e.printStackTrace();
    		} finally {
    			ois = null;
    		}
    		try {
    			if (oos != null) {
    				oos.close();
    			}
    		} catch (IOException e) {
    			e.printStackTrace();
    		} finally {
    			oos = null;
    		}
    		try {
    			if (socket != null && !socket.isClosed()) {
    				socket.close();
    			}
    		} catch (IOException e) {
    			e.printStackTrace();
    		} finally {
    			socket = null;
    		}
    	}
    	
    	@SuppressWarnings("unchecked")
    	<T> T rpExecutor(String beanId,Object[] params) throws Exception {
    		//连接服务器端
    		Socket socket = new Socket(rpcServerIp, rpcServerPort);
    		//发送方法的序列号和参数类型。
    		ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
    		oos.writeUTF(beanId);
    		oos.writeObject(params);
    		
    		//必须将这句放在前面三句的后面。
    		ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
    		//接收服务器端返回的结果。
    		Object result = ois.readObject();
    		
    		closeSocket(ois,oos,socket);
    		
    		return (T) result;
    		
    		
    	}
    }
    

     下来是:RpcClient:这个类是产生代理,用代理对象伪执行相关的方法,然后在真正的来连接服务器。

     1 package com.xupt.rpc.core;
     2 
     3 import java.lang.reflect.InvocationHandler;
     4 import java.lang.reflect.Method;
     5 import java.lang.reflect.Proxy;
     6 
     7 public class RpcClient {
     8     
     9     private RpcClientExecutor rpcClientExecutor;
    10     
    11     public RpcClient(String rpcServerIp,int rpcServerport) {
    12         this.rpcClientExecutor = new RpcClientExecutor(rpcServerIp, rpcServerport);
    13     }
    14     
    15     @SuppressWarnings("unchecked")
    16     public <T> T getProxy(Class<?> klass) {
    17         return (T) Proxy.newProxyInstance(
    18                 klass.getClassLoader(),
    19                 new Class[] {klass},
    20                 new InvocationHandler() {
    21                     
    22                     @Override
    23                     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    24                         String BeanId = String.valueOf(method.toString().hashCode());
    25                         Object result = rpcClientExecutor.rpExecutor(BeanId, args);
    26                         
    27                         return result;
    28                     }
    29                 }
    30             );
    31     }
    32 }

    上述方法产生代理就不多做解释,不明白请看上一篇的代理机制之初见吧。。。。

    以上就是我的RMI的简单实现,入如果有错误,请指正!也希望它对你有所帮助。

  • 相关阅读:
    LeetCode-198-打家劫舍
    LeetCode-279-完全平方数
    LeetCode-91-解码方法
    Redis RDB 分析工具 rdbtools 说明(转载)
    ftp软件下载,ftp软件到底在哪里下载
    element-ui组件库二次开发——打包、上传npm
    ftp客户端工具,这6款好用的ftp客户端工具,站长们必不可少的常用工具!
    不会用Java Future,我怀疑你泡茶没我快, 又是超长图文!!(转载)
    ftp管理软件,ftp管理软件怎么使用
    Android连载22-自定义广播之标准广播发送(转载)
  • 原文地址:https://www.cnblogs.com/youdiaodaxue16/p/9885804.html
Copyright © 2011-2022 走看看