zoukankan      html  css  js  c++  java
  • 【分布式】RPC初探

    事先声明:本文代码参考自Dubbo作者的博客

    RPC(Remote Procedure Call)远程过程调用,是分布式系统当中必不可少的一个玩意。比如说在单机系统当中,我想调用某个方法,直接调就可以了对吧,但是当环境变成多机分布式系统时,A机器上想调用B机器上的某个方法时,就需要用到RPC了。RPC的原理在知乎这个问答上有很清楚的解释。

    简单点来说,就是客户端利用了socket把希望调用的方法的信息(方法名、方法需要的参数等)传给服务器端,服务器端把这些信息反序列化之后利用反射去调用指定的方法,并把返回值再通过socket返回给客户端。下面是代码示例,关键部分我写了自己理解的注释。

    代码主要用到了socket通信和JDK的动态代理,这两部分我在之前的博客中也都有涉及。

    RPCServer.java

    public class RPCServer {
        private static final int PORT = 8000;
        /**
         * 暴露服务
         *
         * @param service 服务的对象实例
         * */
        public static void open(final Object service) throws Exception {
            if (service == null) {
                throw new IllegalArgumentException();
            }
            System.out.println("Service is opening for " + service.getClass().getName() + " at port: " + PORT);
            //开启ServerSocket监听8000端口
            final ServerSocket server = new ServerSocket(PORT);
            for (;;) {
                try {
                    //接收到一个客户端请求
                    final Socket client = server.accept();
                    //开一个线程处理
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                                try {
                                    ObjectInputStream input = new ObjectInputStream(client.getInputStream());
                                    try {
                                        String methodName = input.readUTF();
                                        System.out.println(">>>>methodName: " + methodName);
                                        Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                                        Object[] arguments = (Object[]) input.readObject();
                                        System.out.println(">>>>arguments: " + arguments.toString());
                                        ObjectOutputStream out = new ObjectOutputStream(client.getOutputStream());
                                        try {
                                            //利用反射获取到方法对象
                                            Method method = service.getClass().getMethod(methodName, parameterTypes);
                                            //调用方法并获取返回值
                                            Object result = method.invoke(service, arguments);
                                            //把返回值写入socket,返回给客户端
                                            out.writeObject(result);
                                            //                                out.flush();
                                        } catch (Throwable t) {
                                            out.writeObject(t);
                                        } finally {
                                            out.close();
                                        }
                                    } finally {
                                        input.close();
                                    }
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                        }
                    }).start();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 指定在远程主机上的服务
         *
         * @param <T> 接口泛型
         * @param interfaceClass 接口
         * @param host 远程主机IP
        * */
    
        @SuppressWarnings("unchecked")
        public static <T> T refer(final Class<T> interfaceClass, final String host) {
            if (interfaceClass == null) {
                throw new IllegalArgumentException("invalid interface");
            }
            if (host == null || "".equals(host)) {
                throw new IllegalArgumentException("invalid host");
            }
            System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + PORT);
            //动态代理返回对象实例,并且利用泛型转成服务类型
            return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass},
                    new InvocationHandler() {
                        @Override
                        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                            Socket socket = new Socket(host, PORT);
                            try {
                                ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
                                try {
                                    //发送方法名
                                    out.writeUTF(method.getName());
                                    //发生方法参数列表
                                    out.writeObject(method.getParameterTypes());
                                    //发生方法需要的参数
                                    out.writeObject(args);
                                    ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                                    try {
                                        //获取返回值
                                        Object result = input.readObject();
                                        if (result instanceof Throwable) {
                                            throw (Throwable) result;
                                        }
                                        return result;
                                    }finally {
                                        input.close();
                                    }
                                }finally {
                                    out.close();
                                }
                            } finally {
                                socket.close();
                            }
                        }
                    });
        }
    }
    

    接口 HelloService.java

    public interface HelloService {
        public String show(String name);
    }
    

    接口实现 HelloServiceImpl.java

    public class HelloServiceImpl implements HelloService {
        @Override
        public String show(String name) {
            System.out.println(name);
            return "name: " + name;
        }
    }
    

    测试:

    服务端测试代码 ServerTest.java

    public class ServerTest {
        public static void main(String[] args) throws Exception {
            HelloService helloService = new HelloServiceImpl();
    		//开启RPC服务,并且绑定一个对象实例,指定服务器上的服务类型
            RPCServer.open(helloService);
        }
    }
    

    客户端测试代码 ClientTest.java

    public class ClientTest {
        public static void main(String[] args) {
            try {
    			//调用指定IP的远程主机上的指定服务
                HelloService service = RPCServer.refer(HelloService.class, "127.0.0.1");
                System.out.println(service.show("hello"));
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    结果如下:

    服务端:

    客户端:

    思考

    关于这段示例代码,有哪些改进的地方呢?首先我想到的是把TCP通信模型改成NIO通信,不要用BIO这种低并发的模型;其次是传输的信息可以用其他方式进行压缩或者叫序列化,减少传输的大小从而降低服务器压力和提高传输速度;还有就是这段代码使用的动态代理是JDK自带的方法,有个很大的缺点是必须要接口,之前的文章也提到了,可以采用CGlib来改善一下。目前能想到的就这三点了,找时间我再来完善一下。

    同时也可以去看看Dubbo源码。

  • 相关阅读:
    Java8新特性-日期时间
    解决有道云笔记导入md文件无法加载文件内的图片方式
    Mac安装Navicat Premium 12 永久破解
    MacBook Pro安装和配置Tomcat
    MySQL不支持DELETE使用表别名?
    JAVA设计模式之模板方法
    Lombok中的@Builder注解
    JAVA设计模式之策略模式
    Stream中的Peek操作
    MySql插入一条数据不提交事务主键仍自增的理解
  • 原文地址:https://www.cnblogs.com/puyangsky/p/6220948.html
Copyright © 2011-2022 走看看