zoukankan      html  css  js  c++  java
  • RPC框架原理剖析(含实例)(转)

    转自:http://blog.csdn.net/rulon147/article/details/53814589

    一、什么是RPC

    RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
    RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
    有多种 RPC模式和执行。最初由 Sun 公司提出。IETF ONC 宪章重新修订了 Sun 版本,使得 ONC RPC 协议成为 IETF 标准协议。现在使用最普遍的模式和执行是开放式软件基础的分布式计算环境(DCE)。

    二、图例说明


    三、java 实例演示

     

    1、实现技术方案

         下面使用比较原始的方案实现RPC框架,采用Socket通信、动态代理与反射与Java原生的序列化。

    2、RPC框架架构

         RPC架构分为三部分:

    1. 服务提供者,运行在服务器端,提供服务接口定义与服务实现类。
    2. 服务中心,运行在服务器端,负责将本地服务发布成远程服务,管理远程服务,提供给服务消费者使用。
    3. 服务消费者,运行在客户端,通过远程代理对象调用远程服务。

    3、 具体实现

    1)服务提供者接口定义与实现,代码如下:

    [java] view plain copy
     
    1. package services;  
    2.   
    3.   
    4. public interface HelloService {  
    5.   
    6.   
    7.     String sayHi(String name);  
    8.   
    9.   
    10. }  


    2)HelloServices接口实现类:

    [java] view plain copy
     
    1. package services.impl;  
    2.   
    3.   
    4. import services.HelloService;  
    5.   
    6.   
    7. public class HelloServiceImpl implements HelloService {  
    8.   
    9.   
    10.     public String sayHi(String name) {  
    11.         return "Hi, " + name;  
    12.     }  
    13.   
    14.   
    15. }  



    3)服务中心代码实现,代码如下:

    [java] view plain copy
     
    1. package services;  
    2.   
    3.   
    4. import java.io.IOException;  
    5.   
    6.   
    7. public interface Server {  
    8.     public void stop();  
    9.   
    10.   
    11.     public void start() throws IOException;  
    12.   
    13.   
    14.     public void register(Class serviceInterface, Class impl);  
    15.   
    16.   
    17.     public boolean isRunning();  
    18.   
    19.   
    20.     public int getPort();  
    21. }  


    4)服务中心实现类:

    [java] view plain copy
     
    1. package services.impl;  
    2.   
    3.   
    4.   
    5.   
    6.   
    7.   
    8. import java.io.IOException;  
    9. import java.io.ObjectInputStream;  
    10. import java.io.ObjectOutputStream;  
    11. import java.lang.reflect.Method;  
    12. import java.net.InetSocketAddress;  
    13. import java.net.ServerSocket;  
    14. import java.net.Socket;  
    15. import java.util.HashMap;  
    16. import java.util.concurrent.ExecutorService;  
    17. import java.util.concurrent.Executors;  
    18.   
    19.   
    20. import services.Server;  
    21.   
    22.   
    23. public class ServiceCenter implements Server {  
    24.     private static ExecutorService              executor        = Executors.newFixedThreadPool(Runtime.getRuntime()  
    25.                                                                         .availableProcessors());  
    26.   
    27.   
    28.     private static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>();  
    29.   
    30.   
    31.     private static boolean                      isRunning       = false;  
    32.   
    33.   
    34.     private static int                          port;  
    35.   
    36.   
    37.     public ServiceCenter(int port) {  
    38.         this.port = port;  
    39.     }  
    40.   
    41.   
    42.     public void stop() {  
    43.         isRunning = false;  
    44.         executor.shutdown();  
    45.     }  
    46.   
    47.   
    48.     public void start() throws IOException {  
    49.         ServerSocket server = new ServerSocket();  
    50.         server.bind(new InetSocketAddress(port));  
    51.         System.out.println("start server");  
    52.         try {  
    53.             while (true) {  
    54.                 // 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行  
    55.                 executor.execute(new ServiceTask(server.accept()));  
    56.             }  
    57.         } finally {  
    58.             server.close();  
    59.         }  
    60.     }  
    61.   
    62.   
    63.     public void register(Class serviceInterface, Class impl) {  
    64.         serviceRegistry.put(serviceInterface.getName(), impl);  
    65.     }  
    66.   
    67.   
    68.     public boolean isRunning() {  
    69.         return isRunning;  
    70.     }  
    71.   
    72.   
    73.     public int getPort() {  
    74.         return port;  
    75.     }  
    76.   
    77.   
    78.     private static class ServiceTask implements Runnable {  
    79.         Socket clent = null;  
    80.   
    81.   
    82.         public ServiceTask(Socket client) {  
    83.             this.clent = client;  
    84.         }  
    85.   
    86.   
    87.         public void run() {  
    88.             ObjectInputStream input = null;  
    89.             ObjectOutputStream output = null;  
    90.             try {  
    91.                 // 2.将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果  
    92.                 input = new ObjectInputStream(clent.getInputStream());  
    93.                 String serviceName = input.readUTF();  
    94.                 String methodName = input.readUTF();  
    95.                 Class<?>[] parameterTypes = (Class<?>[]) input.readObject();  
    96.                 Object[] arguments = (Object[]) input.readObject();  
    97.                 Class serviceClass = serviceRegistry.get(serviceName);  
    98.                 if (serviceClass == null) {  
    99.                     throw new ClassNotFoundException(serviceName + " not found");  
    100.                 }  
    101.                 Method method = serviceClass.getMethod(methodName, parameterTypes);  
    102.                 Object result = method.invoke(serviceClass.newInstance(), arguments);  
    103.   
    104.   
    105.                 // 3.将执行结果反序列化,通过socket发送给客户端  
    106.                 output = new ObjectOutputStream(clent.getOutputStream());  
    107.                 output.writeObject(result);  
    108.             } catch (Exception e) {  
    109.                 e.printStackTrace();  
    110.             } finally {  
    111.                 if (output != null) {  
    112.                     try {  
    113.                         output.close();  
    114.                     } catch (IOException e) {  
    115.                         e.printStackTrace();  
    116.                     }  
    117.                 }  
    118.                 if (input != null) {  
    119.                     try {  
    120.                         input.close();  
    121.                     } catch (IOException e) {  
    122.                         e.printStackTrace();  
    123.                     }  
    124.                 }  
    125.                 if (clent != null) {  
    126.                     try {  
    127.                         clent.close();  
    128.                     } catch (IOException e) {  
    129.                         e.printStackTrace();  
    130.                     }  
    131.                 }  
    132.             }  
    133.   
    134.   
    135.         }  
    136.     }  
    137.   
    138.   
    139.   
    140.   
    141.   
    142.   
    143. }  



    5)客户端的远程代理对象:

    [java] view plain copy
     
    1. package client;  
    2.   
    3.   
    4. import java.io.ObjectInputStream;  
    5. import java.io.ObjectOutputStream;  
    6. import java.lang.reflect.InvocationHandler;  
    7. import java.lang.reflect.Proxy;  
    8. import java.net.InetSocketAddress;  
    9. import java.net.Socket;  
    10. import java.lang.reflect.Method;  
    11.   
    12.   
    13. public class RPCClient<T> {  
    14.     @SuppressWarnings("unchecked")  
    15.     public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {  
    16.         // 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用  
    17.         return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] { serviceInterface },  
    18.                 new InvocationHandler() {  
    19.                     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
    20.                         Socket socket = null;  
    21.                         ObjectOutputStream output = null;  
    22.                         ObjectInputStream input = null;  
    23.                         try {  
    24.                             // 2.创建Socket客户端,根据指定地址连接远程服务提供者  
    25.                             socket = new Socket();  
    26.                             socket.connect(addr);  
    27.   
    28.   
    29.                             // 3.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者  
    30.                             output = new ObjectOutputStream(socket.getOutputStream());  
    31.                             output.writeUTF(serviceInterface.getName());  
    32.                             output.writeUTF(method.getName());  
    33.                             output.writeObject(method.getParameterTypes());  
    34.                             output.writeObject(args);  
    35.   
    36.   
    37.                             // 4.同步阻塞等待服务器返回应答,获取应答后返回  
    38.                             input = new ObjectInputStream(socket.getInputStream());  
    39.                             return input.readObject();  
    40.                         } finally {  
    41.                             if (socket != null)  
    42.                                 socket.close();  
    43.                             if (output != null)  
    44.                                 output.close();  
    45.                             if (input != null)  
    46.                                 input.close();  
    47.                         }  
    48.                     }  
    49.                 });  
    50.     }  
    51. }  



    6)最后为测试类:

    [java] view plain copy
     
    1. package client;  
    2.   
    3.   
    4. import java.io.IOException;  
    5. import java.net.InetSocketAddress;  
    6.   
    7.   
    8. import services.HelloService;  
    9. import services.Server;  
    10. import services.impl.HelloServiceImpl;  
    11. import services.impl.ServiceCenter;  
    12.   
    13.   
    14. public class RPCTest {  
    15.     public static void main(String[] args) throws IOException {  
    16.         new Thread(new Runnable() {  
    17.             public void run() {  
    18.                 try {  
    19.                     Server serviceServer = new ServiceCenter(8088);  
    20.                     serviceServer.register(HelloService.class, HelloServiceImpl.class);  
    21.                     serviceServer.start();  
    22.                 } catch (IOException e) {  
    23.                     e.printStackTrace();  
    24.                 }  
    25.             }  
    26.         }).start();  
    27.         HelloService service = RPCClient  
    28.                 .getRemoteProxyObj(HelloService.class, new InetSocketAddress("localhost", 8088));  
    29.         System.out.println(service.sayHi("test"));  
    30.     }  
    31.   
    32.   
    33. }  



     

    运行结果:

    [java] view plain copy
     
    1. regeist service HelloService  
    2. start server  
    3. Hi, test  
  • 相关阅读:
    复制文件-用FileOutputStream和FileInputStream读写文件
    向属性文件中添加属性
    合并多个文件的内容
    EVA4400存储RAID信息丢失数据恢复过程
    DELL Eq PS4000数据恢复成功案例
    华为OceanStor S5600T服务器数据恢复
    IBM ds4700崩溃重组raid及修复数据库
    分析IBM AIX存储层结构 / 常用存储命令整理
    riad5阵列崩溃,恢复数据过程
    如何使用Handy Backup 6.2进行数据备份(步骤阅读)
  • 原文地址:https://www.cnblogs.com/hadoop-dev/p/7656976.html
Copyright © 2011-2022 走看看