zoukankan      html  css  js  c++  java
  • java实现RPC

    一,服务提供者

    工程为battercake-provider,项目结构图如下图所示

    1.1 先创建一个“卖煎饼”微服务的接口和实现类

    package com.jp.service;
    
    public interface BatterCakeService {
        /**
         * 卖煎饼的服务
         */
        public String sellBatterCake(String name);
    }
    package com.jp.service;
    
    import com.jp.service.BatterCakeService;
    
    /**
     * 卖煎饼服务的实现类
     *
     */
    public class BatterCakeServiceImpl implements BatterCakeService {
    
        public String sellBatterCake(String name) {
            return name+"煎饼,卖的特别好";
        }    
    }

    1.2 RPC框架调用部分

    该部分有两个关键部分:RPC服务提供器线程处理类

    1)RPC服务提供器

    1. 需要发布的服务存储在一个内存变量serviceList中。(该例就是把卖煎饼服务的实例对象传入
    2. 启动socket,server.accept()方法阻塞在那,监听输入
    3. 针对每一个请求,单独启动一个线程处理
     1 package com.jp.rpc;
     2 
     3 import java.net.ServerSocket;
     4 import java.net.Socket;
     5 import java.util.ArrayList;
     6 import java.util.Arrays;
     7 import java.util.List;
     8 /**
     9  * RPC服务提供器
    10  * 1,将需要发布的服务存储在一个内存变量serviceList中
    11  * 2,启动socket,server.accept()方法阻塞在那,监听输入
    12  * 3,针对每一个请求,单独启动一个线程处理
    13  */
    14 public class RpcProvider {
    15     
    16     //存储注册的服务列表
    17     private static List<Object> serviceList;
    18     
    19     /**
    20      * 发布rpc服务
    21      * @param object 提供(卖煎饼)服务的实例对象
    22      * @param port 监听的端口
    23      * @throws Exception
    24      */
    25     public static void export(int port,Object... services) throws Exception {
    26         serviceList=Arrays.asList(services);
    27         ServerSocket server = new ServerSocket(port);
    28         Socket client = null;
    29         while (true) {
    30             //阻塞等待输入,每来一个请求就会产生一个socket对象
    31             client = server.accept();
    32             //每一个请求,启动一个线程处理
    33             new Thread(new ServerThread(client,serviceList)).start();
    34         }
    35     }
    36 }

    2)线程处理类

    ServerThread(socke对象服务实例列表)线程处理类的代码,ServerThread主要做以下几个步骤

    1. 读取客户端发送的服务名
    2. 判断服务是否发布
    3. 如果发布,则走反射逻辑,动态调用,返回结果
    4. 如果未发布,则返回提示通知
     1 package com.jp.rpc;
     2 
     3 import java.io.IOException;
     4 import java.io.ObjectInputStream;
     5 import java.io.ObjectOutputStream;
     6 import java.lang.reflect.Method;
     7 import java.net.Socket;
     8 import java.util.List;
     9 
    10 public class ServerThread implements Runnable {
    11 
    12     private Socket client = null;
    13 
    14     private List<Object> serviceList = null;
    15 
    16     public ServerThread(Socket client, List<Object> service) {
    17         this.client = client;
    18         this.serviceList = service;
    19     }
    20 
    21     //@Override
    22     public void run() {
    23         ObjectInputStream input = null;
    24         ObjectOutputStream output = null;
    25         try {
    26             input = new ObjectInputStream(client.getInputStream());
    27             output = new ObjectOutputStream(client.getOutputStream());
    28             // 读取客户端要访问那个service
    29             Class serviceClass = (Class) input.readObject();
    30             // 找到该服务类实例
    31             Object obj = findService(serviceClass);
    32             if (obj == null) {
    33                 output.writeObject(serviceClass.getName() + "服务未发现");
    34             } else {
    35                 //利用反射调用该方法,返回结果
    36                 //从请求中得到请求的方法名和方法参数;加上上面得到了服务对象实例;反射得到具体的方法实例;invoke执行
    37                 try {
    38                     String methodName = input.readUTF();
    39                     Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
    40                     Object[] arguments = (Object[]) input.readObject();
    41                     
    42                     Method method = obj.getClass().getMethod(methodName, parameterTypes);  
    43                     Object result = method.invoke(obj, arguments);  
    44                     output.writeObject(result); 
    45                 } catch (Throwable t) {
    46                     output.writeObject(t);
    47                 }
    48             }
    49         } catch (Exception e) {
    50             e.printStackTrace();
    51         } finally {
    52             try {
    53                 client.close();
    54                 input.close();
    55                 output.close();
    56             } catch (IOException e) {
    57                 // TODO Auto-generated catch block
    58                 e.printStackTrace();
    59             }
    60         }
    61 
    62     }
    63 
    64     //到服务列表中找服务实例
    65     private Object findService(Class serviceClass) {
    66         for (Object obj : serviceList) {
    67             boolean isFather = serviceClass.isAssignableFrom(obj.getClass());
    68             if (isFather) {
    69                 return obj;
    70             }
    71         }
    72         return null;
    73     }
    74 
    75 }

    1.3 发布服务

     1 package com.jp.start;
     2 
     3 import com.jp.rpc.RpcProvider;
     4 import com.jp.service.BatterCakeService;
     5 import com.jp.service.BatterCakeServiceImpl;
     6 
     7 public class RpcBootStrap {
     8     public static void main(String[] args) throws Exception {
     9         //实例化“卖煎饼”这个服务的实现类
    10         BatterCakeService batterCakeService =new BatterCakeServiceImpl();
    11         //发布卖煎饼的服务:注册在20006端口,并把提供服务的实例传入
    12         RpcProvider.export(20006,batterCakeService);
    13     }
    14 }

    二,服务消费者

    消费者工程为battercake-consumer,项目结构图如下图所示

    2.1 rpc调用部分

    分为两部分:代理类处理器(代理类工厂)和 service的代理类对象(即前面工厂生产返回的)

    1)代理类处理器(代理类工厂)

    负责生产代理类(传入服务的名字(类?);ip;端口

     1 package com.jp.rpc;
     2 
     3 import java.lang.reflect.Proxy;
     4 
     5 /**
     6  * 用于生产服务代理类
     7  */
     8 public class RpcConsumer {
     9     public static <T> T getService(Class<T> clazz,String ip,int port) {
    10         ProxyHandler proxyHandler =new ProxyHandler(ip,port);
    11         return (T)Proxy.newProxyInstance(RpcConsumer.class.getClassLoader(), new Class<?>[] {clazz}, proxyHandler);
    12     }
    13 }

    2)服务代理类的处理器该类就是代理类功能的具体实现者,其实就是封装了调用远程服务的过程(封装请求数据发给远端服务提供者,把提供者返回的结果返回

    1. 建立socket连接
    2. 封装请求数据,发送给服务提供者
    3. 返回结果
     1 package com.jp.rpc;
     2 
     3 import java.io.ObjectInputStream;
     4 import java.io.ObjectOutputStream;
     5 import java.lang.reflect.InvocationHandler;
     6 import java.lang.reflect.Method;
     7 import java.net.Socket;
     8 
     9 public class ProxyHandler implements InvocationHandler {
    10     
    11     private String ip;
    12     private int port;
    13 
    14     public ProxyHandler(String ip, int port) {
    15         this.ip = ip;
    16         this.port = port;
    17     }
    18 
    19     //@Override
    20     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    21         Socket socket = new Socket(this.ip, this.port);
    22         ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
    23         ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
    24         try {
    25             output.writeObject(proxy.getClass().getInterfaces()[0]);
    26             output.writeUTF(method.getName());
    27             output.writeObject(method.getParameterTypes());
    28             output.writeObject(args);
    29             output.flush();
    30             Object result = input.readObject();
    31             if (result instanceof Throwable) {
    32                 throw (Throwable) result;
    33             }
    34             return result;
    35         } finally {
    36             socket.shutdownOutput();
    37         }
    38     }
    39 
    40 }

    2.2 接下来建立一个测试类RpcTest如下

    (跑该测试类前,记得运行在battercake-provider端的RpcBootstrap类发布BatterCakeService服务)

     1 package com.jp.start;
     2 
     3 import com.jp.rpc.RpcConsumer;
     4 import com.jp.service.BatterCakeService;
     5 
     6 public class RpcTest {
     7     public static void main(String[] args) {
     8         //生成代理类,三个参数:被代理对象,ip,端口
     9         BatterCakeService batterCakeService = RpcConsumer.getService(BatterCakeService.class, "127.0.0.1", 20006);
    10         //调用代理类的方法并获得结果
    11         String result = batterCakeService.sellBatterCake("双蛋");
    12         System.out.println(result);
    13     }
    14 }

    输出结果如下

     

    https://blog.csdn.net/wangyunpeng0319/article/details/78651998

    https://www.cnblogs.com/rjzheng/category/1205773.html

  • 相关阅读:
    Django连接SQL Server,安装相关扩展包及相关配置
    安装pipenv
    报错及网络上的解决方案
    Nucleus-SE迁移:未实现的设施和兼容性
    RTOS诊断和错误检查
    Nucleus SE RTOS初始化和启动
    Nucleus 实时操作系统中断(下)
    Nucleus 实时操作系统中断(上)
    系统时间
    应用程序计时器
  • 原文地址:https://www.cnblogs.com/xdyixia/p/9227738.html
Copyright © 2011-2022 走看看