zoukankan      html  css  js  c++  java
  • 分布式架构的基石.简单的 RPC 框架实现(JAVA)

      前言

      RPC 的全称是 Remote Procedure Call,它是一种进程间通信方式。允许像调用本地服务一样调用远程服务。

      学习来源:《分布式系统架构:原理与实践》 - 李林锋

      1.RPC 框架原理

      RPC 框架的目标就是让远程过程(服务)调用更加简单、透明,RPC框架负责屏蔽底层的传输方式TCP 或者 UDP)、序列化方式XML、JSON、二进制)和通信细节。

      框架使用者只需要了解谁在什么位置,提供了什么样的远程服务接口即可,开发者不需要关心底层通信细节和调用过程。

      2.最简单的 RPC 框架实现

    ·  下面通过 JAVA 原生的序列化TCP Socket通信、动态代理反射机制,实现最简单的 RPC 框架。它由三部分组成:

    • 服务提供者:它运行在服务端,负责提供服务接口定义和服务实现类。(EchoServiceEchoServiceImpl
    • 服务发布者,它运行在 RPC 服务端,负责将本地服务发布成远程服务,供其他消费者调用。(RPCExporter
    • 本地服务代理,它运行在 RPC 客户端,通过代理调用远程服务提供者,然后将结果进行封装返回给本地消费者。(RPCImporter

     

      下面看具体代码,首先是服务端接口定义和服务实现类。

      代码清单 :EchoService 

    package com.rpc.test;
    
    /**
     * @Description - 调用接口
     * @Author zww
     * @Date 2018/12/10 17:29
     */
    public interface EchoService {
        String echo(String ping);
    }

      代码清单:EchoServiceImpl

    package com.rpc.test;
    
    /**
     * @Description - 调用接口实现
     * @Author zww
     * @Date 2018/12/10 17:30
     */
    public class EchoServiceImpl implements EchoService {
        @Override
        public String echo(String ping) {
            return ping != null ? ping + "挺不错的。" : "挺不错的。";
        }
    }

      

      RPC 服务端发布者代码实现如下:

      代码清单:RPCExporter

    package com.rpc.test;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.Method;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    /**
     * @Description - 服务端发布者(提供服务)
     * @Author zww
     * @Date 2018/12/10 17:33
     */
    public class RPCExporter {
        //线程池
        static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
        public static void exporter(String hostName, int port) throws Exception {
            ServerSocket server = new ServerSocket(); //店家
            server.bind(new InetSocketAddress(hostName, port)); //开店地址
            try {
                while (true) { //开启营业模式
                    executor.execute(new ExporterTask(server.accept())); //accept : 来客人了
                }
            } finally {
                server.close();
            }
        }
    
        //根据约定规则解析请求,返回结果
        private static class ExporterTask implements Runnable {
            Socket client = null; //客户
            public ExporterTask(Socket client) {
                this.client = client;
            }
    
            @Override
            public void run() { 
                ObjectInputStream inputStream = null;
                ObjectOutputStream outputStream = null;
                try {
                    System.out.println("老板娘:诶,来咯!您要点什么!");
                    inputStream = new ObjectInputStream(client.getInputStream()); //接收请求
                    System.out.println("老板娘:要个回锅肉!");
                    String interfaceName = inputStream.readUTF();
                    Class<?> service = Class.forName(interfaceName);
                    System.out.println("老板娘:微辣!");
                    String methodName = inputStream.readUTF();
                    System.out.println("老板娘:少油!");
                    Class<?>[] parameterType = (Class<?>[])inputStream.readObject();
                    System.out.println("老板娘:别放香菜!");
                    Object[] arguments = (Object[])inputStream.readObject();
                    System.out.println("老板娘:做菜快点!");
                    Method method = service.getMethod(methodName, parameterType);
                    Object result = method.invoke(service.newInstance(), arguments);
                    System.out.println("老板娘:老头子,听清没!");
                    System.out.println("老板闷头做菜中!!!!");
                    System.out.println("老板娘:帅哥,你的菜好了!");
                    outputStream = new ObjectOutputStream(client.getOutputStream()); //返回结果
                    outputStream.writeObject(result);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (outputStream != null) {
                        try {
                            outputStream.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    if (inputStream != null) {
                        try {
                            inputStream.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    if (client != null) {
                        try {
                            client.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }

      服务发布者的主要职责

    • 监听客户端的 TCP 连接,接收到新的客户端连接之后,将其封装成 Task,由线程池执行。
    • 将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果。
    • 将执行结果反序列化,通过 Socket 发送给客户端。
    • 远程服务调用完成后,释放 Socket 等连接资源,防止句柄泄露。

      RPC 客户端本地服务代理代码:

      代码清单:RPCImporter

    package com.rpc.test;
    
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    
    /**
     * @Description -  路人(请求服务)
     * @Author zww
     * @Date 2018/12/11 10:31
     */
    public class RPCImporter<S> {
    
        public S importer(final Class<?> serviceClass, final InetSocketAddress address) {
            //启用远端代理
            return (S) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class<?>[]{ serviceClass.getInterfaces()[0] }, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    Socket socket = null;
                    ObjectOutputStream outputStream = null;
                    ObjectInputStream inputStream = null;
                    try {
                        socket = new Socket();
                        socket.connect(address);
                        //使用 TCP 方式请求远端方法,以下为约定的传输方式
                        System.out.println("路人:老板娘,点菜咯!");
                        outputStream = new ObjectOutputStream(socket.getOutputStream()); //发送请求
                        System.out.println("路人:要个回锅肉");
                        outputStream.writeUTF(serviceClass.getName());
                        System.out.println("路人:微辣!");
                        outputStream.writeUTF(method.getName());
                        System.out.println("路人:少油!");
                        outputStream.writeObject(method.getParameterTypes());
                        System.out.println("路人:别放香菜!");
                        outputStream.writeObject(args);
                        System.out.println("路人:上菜快点!");
                        inputStream = new ObjectInputStream(socket.getInputStream()); //获取结果
                        System.out.println("路人:吧唧吧唧!");
                        return inputStream.readObject();
                    } finally {
                        if (socket != null) socket.close();
                    }
                }
            });
        }
    
    }

      本地服务代理的主要功能如下:

    • 将本地的接口调用转换成 JDK 的动态代理,在动态代理中实现接口的远程调用。
    • 创建 Socket 客户端,根据指定地址链接远程服务提供者。
    • 将远程服务调用所需的接口类、方法名、参数列表、返回参数 等编码后发送给服务提供者。
    • 同步阻塞服务端返回应答,获取应答之后返回。

      最后:编写测试代码,并看看执行结果

    package com.rpc.test;
    
    import java.net.InetSocketAddress;
    
    /**
     * 测试类
     */
    public class TestApplication {
        public static void main(String[] args) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //启用服务提供端(设置 地址端口)
                        RPCExporter.exporter("localhost", 8080);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            //发起服务请求
            RPCImporter<EchoService> importer = new RPCImporter<>();
            //使用远端代理(访问 地址端口)
            EchoService echo = importer.importer(EchoServiceImpl.class, new InetSocketAddress("localhost", 8080));
            System.out.println(echo.echo("这家店味道咋样? 
    "));
        }
    }

      执行测试结果:

    Connected to the target VM, address: '127.0.0.1:57656', transport: 'socket'
    路人:老板娘,点菜咯!
    老板娘:诶,来咯!您要点什么!
    路人:要个回锅肉
    路人:微辣!
    路人:少油!
    路人:别放香菜!
    路人:上菜快点!
    老板娘:要个回锅肉!
    老板娘:微辣!
    老板娘:少油!
    老板娘:别放香菜!
    老板娘:做菜快点!
    老板娘:老头子,听清没!
    老板闷头做菜中!!!!
    老板娘:帅哥,你的菜好了!
    路人:吧唧吧唧!味道不错
    这家店味道咋样? 
    挺不错的。

      

     
     
     
  • 相关阅读:
    asp.net 中input radio checked 无效
    AD对象DirectoryEntry本地开发
    Linux部署
    spring 定时任务配置使用
    闲言碎语
    javascript 折后保留一位小数
    JSON 实力应用
    水晶报表(crystal report )中显示CheckBox
    html 笔记
    转载-js按回车键实现登陆-myself
  • 原文地址:https://www.cnblogs.com/zhaww/p/10298292.html
Copyright © 2011-2022 走看看