zoukankan      html  css  js  c++  java
  • RPC框架学习+小Demo实例

    一、什么是RPC协议?

    全称:远程过程调度协议
    效果:使消费者向调用本地方法一样调用远程服务方法,对使用者透明
    目前常用:Dubbo、Thirft、Sofa....
    功能:

    • 建立远程通信(socket)TCP/UDP
    • 数据传递
    • 序列化和反序列化(XML/json/Protobuf/avro/kyro/hessian)

    流程图:
    在这里插入图片描述

    二、Demo思路

    1. 首先我们需要搭建两个项目,一个作为服务端提供服务,另一个作为客户端来调用服务端的接口方法。
    2. 因为客户端需要调用服务端的接口,所以我们需要客户端依赖这个通信服务所需的公共资源文件,所以服务端分为两部分,一部分为api资源,一部分为提供服务的方法。
    3. 为客户端与服务端建立连接,服务端等待请求,客户端等待响应结果
    4. 最终测试

    三、Demo流程

    1.首先建立两个maven项目,快速建立即可,一个命名为rpc-client作为客户端,另一个命名为rpc-server作为服务端。在rpc-server下建立两个子模块,一个命名为server-api,作为公共资源,另一个命名server-provider作为服务提供程序。
    在这里插入图片描述
    在这里插入图片描述
    2.在server下,新建一个测试接口,我这里命名为IHelloServer,让客户端调用此接口实现方法,该接口方法如下:
    在这里插入图片描述
    3.将api的依赖添加到rpc-client中,为了能让其调用公共资源。当然,服务提供模块也需要,所以也需要依赖进去。因为是本地小Demo测试,所以就直接Install本地,进行依赖添加了。
    在这里插入图片描述
    4.在server-provider中新建impl类,实现IhelloService接口,重写其中方法
    在这里插入图片描述
    5.这时,我们就可以在客户端使用接口对象了,但是因为是接口,我们无法对它进行实例化,那样就成为本地调用了,所以我们需要进行动态代理。
    在dubbo中,是使用注解进行动态代理,把信息注入到对象中:
    在这里插入图片描述
    所以我们需要写一个动态代理方法,来为对象进行实例化操作:

    常见的动态代理方式有很多,如:jdk、cglib、javassist、asm。。等
    (当然,作为新手小白,我还不太了解,需要后续学习)

    新建RpcClientProxy类,类中方法如下:(动态代理标注写法)
    在这里插入图片描述
    这里实现了InvocationHandler接口,其中的invoke方法,是在使用代理对象中的方法时,就会被调用
    然后我们在客户端mian方法中实例化动态代理类(指上面的类),执行其中的方法,将IhelloService动态代理,调用IHelloService的sayhello方法,控制台得到的结果是:test,因为我们返回的就是test,并且还没进行远程连接。
    6.之后需要进行网络间的通信了,我们新写一个类,来继承InvocationHandler接口,实现invoke方法,在方法中,主要工作有已下三步:

    1. 组装参数
    2. 序列化
    3. 进行网络传输

    7.我们需要对参数进行封装,这样方便进行参数的传输,而这个类,因为是公共资源,所以我们建立在server-api下:在这里插入图片描述
    实现序列化接口,因为要数据传输,写入set、get方法
    8.在客户端的invoke方法中,组装参数,封装到实体,等待传输过去就行了
    在这里插入图片描述
    新写通信类,用来向服务方发送请求参数,只需要写入服务地址和端口,把数据传输过去即可
    在这里插入图片描述
    9.新建发布类,写一个发布方法,当然位置在server-provider里,由于是小演示,所以用BIO即可

    BIO(阻塞IO)---当没有客户端连接过来时,accept会阻塞(当前进程),没有数据传输,IO也会阻塞,以至于一次只能处理一个IO,服务端的吞吐量会很低
    NIO,多路复用,一个线程管理N个连接

    在这里我们用socket进行通信,方法传入两个参,一个是服务器地址,一个是端口,死循环不断监听
    在这里插入图片描述
    10.由于BIO单线程吞吐量太低,所以我们用线程池进行优化,当我们获取到socket之后,放到线程里去执行,这样可以提高一些效率
    新建类,实现runable接口,在run方法中,进行反序列化,获取传来的参数,然后通过反射,去调用我们IHelloServiceImpl中的方法,再把得到的结果写回去,这样就完成了网络通信。
    在这里插入图片描述
    11.最后,provider模块主方法,我们新建实现类,新建发布类,设定端口号,运行即可。我们的客户端主方法,新建代理类,写入端口参数,调用方法,输出结果即可。
    在这里插入图片描述
    在这里插入图片描述
    最后控制台输出的结果:
    在这里插入图片描述
    通信成功!!!(后附代码)

    四、总结

    个人认为rpc框架通信的实现逻辑简单来说就是这个样子,只是不同的框架,针对某些特定的功能进行了加强,对某些操作进行了封装,使得使用者用起来更加方便。

    Demo总体流程如下:
    1.客户端与服务提供端依赖公共资源文件
    2.客户端需要对对象进行代理
    3.服务端与客户端进行远程通信
    4.客户端将参数发送给服务端,服务端读取参数,将结果返回

    五、代码

    rpc-client模块下:

    app类:

    public class App 
    {
        public static void main( String[] args )
        {
            RpcClientProxy rpcClientProxy = new RpcClientProxy();
            IHelloService iHelloService = rpcClientProxy.clientProxy(IHelloService.class,"localhost",8080);
            String rs = iHelloService.sayHello("test");
            System.out.println( rs);
        }
    }
    

    MyInvocationHandler类:

    public class MyInvocationHandler implements InvocationHandler {
        private String host;
        private int port;
        public MyInvocationHandler(String host, int port) {
            this.host = host;
            this.port = port;
        }
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            //开始执行代理,并且触发远程调用
            //1.组装参数
            //2.序列化
            //3.进行网络传输
            RpcRequest request = new RpcRequest();
            request.setClassName(method.getDeclaringClass().getName());
            request.setMethodName(method.getName());
            request.setParams(args);
            request.setTypes(method.getParameterTypes());
            //下面开始进行数据传输
            RpcTransPort rpcTransPort = new RpcTransPort(host,port);
            return rpcTransPort.send(request);
        }
    }
    

    RpcClientProxy类:

    public class RpcClientProxy {
        //jdk、cglib、javassist、asm
       /* public <T>  T clientProxy(final Class<T> interfaceCls){
            return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls},
                    new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    //如果调用interfaceCls的方法。则会被动态代理执行到invole方法
                    return "test";
                }
            });*/
            public <T>  T clientProxy(final Class<T> interfaceCls,String service,int port){
            return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls},new MyInvocationHandler(service,port));
        }
    }
    

    RpcTransPort类:

    public class RpcTransPort {
        private String host;
        private int port;
    
        public RpcTransPort(String host, int port) {
            this.host = host;
            this.port = port;
        }
        public Object send(RpcRequest rpcRequest){
            try(Socket socket = new Socket(host,port);
                ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream())){
                outputStream.writeObject(rpcRequest);
                outputStream.flush();
                ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
                return inputStream.readObject();
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //TODO close IO
            }
            return null;
        }
    }
    

    rpc-server-api模块下:

    IHelloService类:

    public interface IHelloService {
        String sayHello(String txt);
    }
    

    RpcRequest类:

    public class RpcRequest implements Serializable {
        private String className;
        private String methodName;
        private Object[] params;
        private Class[] types;
    
        public String getClassName() {
            return className;
        }
    
        public void setClassName(String className) {
            this.className = className;
        }
    
        public String getMethodName() {
            return methodName;
        }
    
        public void setMethodName(String methodName) {
            this.methodName = methodName;
        }
    
        public Object[] getParams() {
            return params;
        }
    
        public void setParams(Object[] params) {
            this.params = params;
        }
    
        public Class[] getTypes() {
            return types;
        }
    
        public void setTypes(Class[] types) {
            this.types = types;
        }
    }
    
    

    rpc-server-provider模块下:

    App类:

    public class App 
    {
        public static void main( String[] args )
        {
            IHelloService iHelloService = new IHelloServiceImpl();
            RpcProxyServer rpcProxyServer = new RpcProxyServer();
            rpcProxyServer.publisher(iHelloService,8080);
    
        }
    }
    

    IHelloServiceImpl类:

    public class IHelloServiceImpl implements IHelloService{
        @Override
        public String sayHello(String txt) {
            return "receive msg :"+txt;
        }
    }
    
    

    ProcessHandlerThread类:

    public class ProcessHandlerThread implements Runnable {
        Socket socket;
        Object service;
    
        public ProcessHandlerThread(Socket socket, Object service) {
            this.socket = socket;
            this.service = service;
        }
        @Override
        public void run() {
            //数据通信的处理
            try(ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
                ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream())){
                RpcRequest request = (RpcRequest)inputStream.readObject();//获得客户端传来的数据,反序列化
                Object object = invoke(request);
                outputStream.writeObject(object);
                outputStream.flush();
            }catch (Exception e){
                e.printStackTrace();;
            }
        }
        private Object invoke(RpcRequest request) throws Exception {
            Class clazz = Class.forName(request.getClassName());
            Method method = clazz.getMethod(request.getMethodName(),request.getTypes());
            return method.invoke(service,request.getParams());
        }
    }
    
    

    RpcProxyServer类:

    public class RpcProxyServer {
        /**
         *
         * @param service  发布到具体的服务器
         * @param port  发布的端口号
         */
        public void publisher(Object service,int port){
        //BIO(阻塞IO)---当没有客户端连接过来时,accept会阻塞(当前进程),没有数据传输,IO也会阻塞,以至于一次只能处理一个IO,服务端的吞吐量会很低
            //NIO,多路复用,一个线程管理N个连接
            ExecutorService executorService = Executors.newFixedThreadPool(10);//创建线程池,防止IO阻塞,提高服务端并行处理的连接数
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(port);
                while(true) {//死循环,不断的去监听
                  Socket socket =  serverSocket.accept();//等待客户端连接
                    //----上面为建立通信,下面为数据传输----
                    executorService.execute(new ProcessHandlerThread(socket,service));//一个socket对象,代表一个客户端的连接
                    //socket.getInputStream();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    最后,新手小白一枚,望各位大佬点评

  • 相关阅读:
    iOS之蓝牙开发—CoreBluetooth详解
    iOS-GCD使用详解
    iOS—Mask属性的使用
    idea导入eclipse中的maven项目
    SQL Server 查找字符串中指定字符出现的次数
    lLinux的常用命令
    从excel表中生成批量SQL
    ORA-00911: invalid character 错误解决
    sqlserver sp_who2和inputbuffer的使用,连接数
    如果存在这个表,则删除这个表的sql
  • 原文地址:https://www.cnblogs.com/fqliu/p/14016498.html
Copyright © 2011-2022 走看看