zoukankan      html  css  js  c++  java
  • RPC框架学习+小Demo实例

    一、什么是RPC协议?

    全称:远程过程调度协议
    效果:使消费者向调用本地方法一样调用远程服务方法,对使用者透明
    目前常用:Dubbo、Thirft、Sofa....
    功能:

    • 建立远程通信(socket)TCP/UDP
    • 数据传递
    • 序列化和反序列化(XML/json/Protobuf/avro/kyro/hessian)

    流程图:
    在这里插入图片描述

    二、Demo思路

    1. 首先我们需要搭建两个项目,一个作为服务端提供服务,另一个作为客户端来调用服务端的接口方法。
    2. 因为客户端需要调用服务端的接口,所以我们需要客户端依赖这个通信服务所需的公共资源文件,所以服务端分为两部分,一部分为api资源,一部分为提供服务的方法。
    3. 为客户端与服务端建立连接,服务端等待请求,客户端等待响应结果
    4. 最终测试

    三、Demo流程

    1.首先建立两个maven项目,快速建立即可,一个命名为rpc-client作为客户端,另一个命名为rpc-server作为服务端。在rpc-server下建立两个子模块,一个命名为server-api,作为公共资源,另一个命名server-provider作为服务提供程序。
    在这里插入图片描述
    在这里插入图片描述
    2.在server下,新建一个测试接口,我这里命名为IHelloServer,让客户端调用此接口实现方法,该接口方法如下:
    在这里插入图片描述
    3.将api的依赖添加到rpc-client中,为了能让其调用公共资源。当然,服务提供模块也需要,所以也需要依赖进去。因为是本地小Demo测试,所以就直接Install本地,进行依赖添加了。
    在这里插入图片描述
    4.在server-provider中新建impl类,实现IhelloService接口,重写其中方法
    在这里插入图片描述
    5.这时,我们就可以在客户端使用接口对象了,但是因为是接口,我们无法对它进行实例化,那样就成为本地调用了,所以我们需要进行动态代理。
    在dubbo中,是使用注解进行动态代理,把信息注入到对象中:
    在这里插入图片描述
    所以我们需要写一个动态代理方法,来为对象进行实例化操作:

    常见的动态代理方式有很多,如:jdk、cglib、javassist、asm。。等
    (当然,作为新手小白,我还不太了解,需要后续学习)

    新建RpcClientProxy类,类中方法如下:(动态代理标注写法)
    在这里插入图片描述
    这里实现了InvocationHandler接口,其中的invoke方法,是在使用代理对象中的方法时,就会被调用
    然后我们在客户端mian方法中实例化动态代理类(指上面的类),执行其中的方法,将IhelloService动态代理,调用IHelloService的sayhello方法,控制台得到的结果是:test,因为我们返回的就是test,并且还没进行远程连接。
    6.之后需要进行网络间的通信了,我们新写一个类,来继承InvocationHandler接口,实现invoke方法,在方法中,主要工作有已下三步:

    1. 组装参数
    2. 序列化
    3. 进行网络传输

    7.我们需要对参数进行封装,这样方便进行参数的传输,而这个类,因为是公共资源,所以我们建立在server-api下:在这里插入图片描述
    实现序列化接口,因为要数据传输,写入set、get方法
    8.在客户端的invoke方法中,组装参数,封装到实体,等待传输过去就行了
    在这里插入图片描述
    新写通信类,用来向服务方发送请求参数,只需要写入服务地址和端口,把数据传输过去即可
    在这里插入图片描述
    9.新建发布类,写一个发布方法,当然位置在server-provider里,由于是小演示,所以用BIO即可

    BIO(阻塞IO)---当没有客户端连接过来时,accept会阻塞(当前进程),没有数据传输,IO也会阻塞,以至于一次只能处理一个IO,服务端的吞吐量会很低
    NIO,多路复用,一个线程管理N个连接

    在这里我们用socket进行通信,方法传入两个参,一个是服务器地址,一个是端口,死循环不断监听
    在这里插入图片描述
    10.由于BIO单线程吞吐量太低,所以我们用线程池进行优化,当我们获取到socket之后,放到线程里去执行,这样可以提高一些效率
    新建类,实现runable接口,在run方法中,进行反序列化,获取传来的参数,然后通过反射,去调用我们IHelloServiceImpl中的方法,再把得到的结果写回去,这样就完成了网络通信。
    在这里插入图片描述
    11.最后,provider模块主方法,我们新建实现类,新建发布类,设定端口号,运行即可。我们的客户端主方法,新建代理类,写入端口参数,调用方法,输出结果即可。
    在这里插入图片描述
    在这里插入图片描述
    最后控制台输出的结果:
    在这里插入图片描述
    通信成功!!!(后附代码)

    四、总结

    个人认为rpc框架通信的实现逻辑简单来说就是这个样子,只是不同的框架,针对某些特定的功能进行了加强,对某些操作进行了封装,使得使用者用起来更加方便。

    Demo总体流程如下:
    1.客户端与服务提供端依赖公共资源文件
    2.客户端需要对对象进行代理
    3.服务端与客户端进行远程通信
    4.客户端将参数发送给服务端,服务端读取参数,将结果返回

    五、代码

    rpc-client模块下:

    app类:

    public class App 
    {
        public static void main( String[] args )
        {
            RpcClientProxy rpcClientProxy = new RpcClientProxy();
            IHelloService iHelloService = rpcClientProxy.clientProxy(IHelloService.class,"localhost",8080);
            String rs = iHelloService.sayHello("test");
            System.out.println( rs);
        }
    }
    

    MyInvocationHandler类:

    public class MyInvocationHandler implements InvocationHandler {
        private String host;
        private int port;
        public MyInvocationHandler(String host, int port) {
            this.host = host;
            this.port = port;
        }
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            //开始执行代理,并且触发远程调用
            //1.组装参数
            //2.序列化
            //3.进行网络传输
            RpcRequest request = new RpcRequest();
            request.setClassName(method.getDeclaringClass().getName());
            request.setMethodName(method.getName());
            request.setParams(args);
            request.setTypes(method.getParameterTypes());
            //下面开始进行数据传输
            RpcTransPort rpcTransPort = new RpcTransPort(host,port);
            return rpcTransPort.send(request);
        }
    }
    

    RpcClientProxy类:

    public class RpcClientProxy {
        //jdk、cglib、javassist、asm
       /* public <T>  T clientProxy(final Class<T> interfaceCls){
            return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls},
                    new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    //如果调用interfaceCls的方法。则会被动态代理执行到invole方法
                    return "test";
                }
            });*/
            public <T>  T clientProxy(final Class<T> interfaceCls,String service,int port){
            return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls},new MyInvocationHandler(service,port));
        }
    }
    

    RpcTransPort类:

    public class RpcTransPort {
        private String host;
        private int port;
    
        public RpcTransPort(String host, int port) {
            this.host = host;
            this.port = port;
        }
        public Object send(RpcRequest rpcRequest){
            try(Socket socket = new Socket(host,port);
                ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream())){
                outputStream.writeObject(rpcRequest);
                outputStream.flush();
                ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
                return inputStream.readObject();
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //TODO close IO
            }
            return null;
        }
    }
    

    rpc-server-api模块下:

    IHelloService类:

    public interface IHelloService {
        String sayHello(String txt);
    }
    

    RpcRequest类:

    public class RpcRequest implements Serializable {
        private String className;
        private String methodName;
        private Object[] params;
        private Class[] types;
    
        public String getClassName() {
            return className;
        }
    
        public void setClassName(String className) {
            this.className = className;
        }
    
        public String getMethodName() {
            return methodName;
        }
    
        public void setMethodName(String methodName) {
            this.methodName = methodName;
        }
    
        public Object[] getParams() {
            return params;
        }
    
        public void setParams(Object[] params) {
            this.params = params;
        }
    
        public Class[] getTypes() {
            return types;
        }
    
        public void setTypes(Class[] types) {
            this.types = types;
        }
    }
    
    

    rpc-server-provider模块下:

    App类:

    public class App 
    {
        public static void main( String[] args )
        {
            IHelloService iHelloService = new IHelloServiceImpl();
            RpcProxyServer rpcProxyServer = new RpcProxyServer();
            rpcProxyServer.publisher(iHelloService,8080);
    
        }
    }
    

    IHelloServiceImpl类:

    public class IHelloServiceImpl implements IHelloService{
        @Override
        public String sayHello(String txt) {
            return "receive msg :"+txt;
        }
    }
    
    

    ProcessHandlerThread类:

    public class ProcessHandlerThread implements Runnable {
        Socket socket;
        Object service;
    
        public ProcessHandlerThread(Socket socket, Object service) {
            this.socket = socket;
            this.service = service;
        }
        @Override
        public void run() {
            //数据通信的处理
            try(ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
                ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream())){
                RpcRequest request = (RpcRequest)inputStream.readObject();//获得客户端传来的数据,反序列化
                Object object = invoke(request);
                outputStream.writeObject(object);
                outputStream.flush();
            }catch (Exception e){
                e.printStackTrace();;
            }
        }
        private Object invoke(RpcRequest request) throws Exception {
            Class clazz = Class.forName(request.getClassName());
            Method method = clazz.getMethod(request.getMethodName(),request.getTypes());
            return method.invoke(service,request.getParams());
        }
    }
    
    

    RpcProxyServer类:

    public class RpcProxyServer {
        /**
         *
         * @param service  发布到具体的服务器
         * @param port  发布的端口号
         */
        public void publisher(Object service,int port){
        //BIO(阻塞IO)---当没有客户端连接过来时,accept会阻塞(当前进程),没有数据传输,IO也会阻塞,以至于一次只能处理一个IO,服务端的吞吐量会很低
            //NIO,多路复用,一个线程管理N个连接
            ExecutorService executorService = Executors.newFixedThreadPool(10);//创建线程池,防止IO阻塞,提高服务端并行处理的连接数
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(port);
                while(true) {//死循环,不断的去监听
                  Socket socket =  serverSocket.accept();//等待客户端连接
                    //----上面为建立通信,下面为数据传输----
                    executorService.execute(new ProcessHandlerThread(socket,service));//一个socket对象,代表一个客户端的连接
                    //socket.getInputStream();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    最后,新手小白一枚,望各位大佬点评

  • 相关阅读:
    在SQLite中使用索引优化查询速度
    SQLite支持的SQL数据操作
    left (outer) join , right (outer) join, full (outer) join, (inner) join, cross join 区别
    深入理解Android内存管理原理(六)
    Merge Sorted Array
    Sort Colors
    Construct Binary Tree from Preorder and Inorder Traversal
    Binary Tree Postorder Traversal
    Symmetric Tree
    Rotate Image
  • 原文地址:https://www.cnblogs.com/fqliu/p/14016498.html
Copyright © 2011-2022 走看看