zoukankan      html  css  js  c++  java
  • 分布式原理之RPC框架

    PRC 即远程服务调用,是实现分布式最基本的方式,而这一实现基于的又是java的反射功能所实现的动态代理。

    完全可以自己写一个不到200行的rpc服务:

    只需要实现四个类:

    0.本地客户端

    1. 远程服务的本地代理

    * 功能:
    * 0. 将本地的接口调用转化成JDK动态代理,在动态代理中实现接口的远程调用
    * 1. 创建Socket客户端,根据指定地址调用远程服务真正实现者
    * 2. 同步阻塞获取服务返回应答

    2.远程的服务代理

      * 功能:

    *  0.监听TCP连接,收到新的连接后,将其封装成task,交给线程池去执行
    * 1.将客户端发送的码流反序列化成对象,调用服务实现,获取执行结果
    * 2.将执行结果序列化,通过Socket发送给客户端

    3.远程服务的真正实现者

    key : 为什么要用反射呢?因为远程请求来的时候,并不知道请求的是哪个类的哪个方法,所以需要动态反射出来类,并得到相应调用的方法。

    result:最终的效果就是调用远程的服务就像调用本地的一样,客户端并不用管调用的服务在哪里,框架将一切都封装好了。

     代码:

    /**
     * 本地客户端
     */
    public class RPCClient {
        public static void main(String[] args) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        RPCExporter.exporter("locolhost", 8080);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            RPCImporter<EchoService> importer = new RPCImporter<>();
            EchoService echo = importer.importer(EchoServiceIpml.class,
                    new InetSocketAddress("locolhost", 8080));
            System.out.println(echo.echo("Are you ok? "));
        }
    }
    /**
     * 远程服务的本地代理
     * 功能:
     *  0. 将本地的接口调用转化成JDK动态代理,在动态代理中实现接口的远程调用
     *  1. 创建Socket客户端,根据指定地址调用远程服务真正实现者
     *  2. 同步阻塞获取服务返回应答
     */
    
    public class RPCImporter<T> {
        public T importer(Class<?> serviceClass, InetSocketAddress address) {
            return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(),
                    new Class<?>[]{serviceClass.getInterfaces()[0]}, new InvocationHandler() {
                        @Override
                        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                            Socket socket = null;
                            ObjectInputStream inputStream = null;
                            ObjectOutputStream outputStream = null;
                            try {
                                socket = new Socket();
                                socket.connect(address);
                                outputStream = new ObjectOutputStream(socket.getOutputStream());
                                outputStream.writeUTF(serviceClass.getName());
                                outputStream.writeUTF(method.getName());
                                outputStream.writeObject(method.getParameterTypes());
                                outputStream.writeObject(args);
                                inputStream = new ObjectInputStream(socket.getInputStream());
                                return inputStream.readShort();
                            } catch (Exception e) {
                                e.printStackTrace();
                            } finally {
                                if (socket != null) socket.close();
                                if (inputStream != null) inputStream.close();
                                if (outputStream != null) outputStream.close();
                            }
                            return null;
                        }
                    });
        }
    }
    /**
     * 远程的服务代理
     * 功能:
     *  0.监听TCP连接,收到新的连接后,将其封装成task,交给线程池去执行
     *  1.将客户端发送的码流反序列化成对象,调用服务实现,获取执行结果
     *  2.将执行结果序列化,通过Socket发送给客户端
     */
    
    public class RPCExporter {
        private static Executor threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        public static void exporter(String hostName, int port) throws IOException {
            ServerSocket serverSocket = new ServerSocket();
            serverSocket.bind(new InetSocketAddress(hostName, port));
            try {
                while (true) {
                    threadPool.execute(new ServerTask(serverSocket.accept()));
                }
            } finally {
                serverSocket.close();
            }
        }
    
        private static class ServerTask implements Runnable {
            private Socket _client;
    
            public ServerTask(Socket client) {
                _client = client;
            }
    
            @Override
            public void run() {
                ObjectOutputStream outputStream = null;
                ObjectInputStream inputStream = null;
                try {
                    inputStream = new ObjectInputStream(_client.getInputStream());
                    String interfaceName = inputStream.readUTF();
                    String methodName = inputStream.readUTF();
                    Class<?> service = Class.forName(interfaceName);
                    Class<?>[] paramsType = (Class<?>[]) inputStream.readObject();
                    Method method = service.getMethod(methodName, paramsType);
                    Object[] args = (Object[]) inputStream.readObject();
                    Object result = method.invoke(service.newInstance(), args);
                    outputStream = new ObjectOutputStream(_client.getOutputStream());
                    outputStream.writeObject(result);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (inputStream != null) {
                        try {
                            inputStream.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    if (outputStream != null) {
                        try {
                            outputStream.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
    /**
     * 远程的服务真正实现者
     */
    public class EchoServiceIpml implements  EchoService {
    
        @Override
        public String echo(String ping) {
            return ping != null ? ping + "---> I'm ok" : "I'm ok";
        }
    }
  • 相关阅读:
    BZOJ 3506 机械排序臂 splay
    BZOJ 2843 LCT
    BZOJ 3669 魔法森林
    BZOJ 2049 LCT
    BZOJ 3223 文艺平衡树 splay
    BZOJ 1433 假期的宿舍 二分图匹配
    BZOJ 1051 受欢迎的牛 强连通块
    BZOJ 1503 郁闷的出纳员 treap
    BZOJ 1096 ZJOI2007 仓库设计 斜率优化dp
    BZOJ 1396: 识别子串( 后缀数组 + 线段树 )
  • 原文地址:https://www.cnblogs.com/shawshawwan/p/9364998.html
Copyright © 2011-2022 走看看