RpcFramework

1 /* 2 * Copyright 2011 Alibaba.com All right reserved. This software is the 3 * confidential and proprietary information of Alibaba.com ("Confidential 4 * Information"). You shall not disclose such Confidential Information and shall 5 * use it only in accordance with the terms of the license agreement you entered 6 * into with Alibaba.com. 7 */ 8 import java.io.ObjectInputStream; 9 import java.io.ObjectOutputStream; 10 import java.lang.reflect.InvocationHandler; 11 import java.lang.reflect.Method; 12 import java.lang.reflect.Proxy; 13 import java.net.ServerSocket; 14 import java.net.Socket; 15 16 /** 17 * RpcFramework 18 * 19 * @author william.liangf 20 */ 21 public class RpcFramework { 22 23 /** 24 * 暴露服务 25 * 26 * @param service 服务实现 27 * @param port 服务端口 28 * @throws Exception 29 */ 30 public static void export(final Object service, int port) throws Exception { 31 if (service == null) 32 throw new IllegalArgumentException("service instance == null"); 33 if (port <= 0 || port > 65535) 34 throw new IllegalArgumentException("Invalid port " + port); 35 System.out.println("Export service " + service.getClass().getName() + " on port " + port); 36 ServerSocket server = new ServerSocket(port); 37 for(;;) { 38 try { 39 final Socket socket = server.accept(); 40 new Thread(new Runnable() { 41 @Override 42 public void run() { 43 try { 44 try { 45 ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); 46 try { 47 String methodName = input.readUTF(); 48 Class<?>[] parameterTypes = (Class<?>[])input.readObject(); 49 Object[] arguments = (Object[])input.readObject(); 50 ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); 51 try { 52 Method method = service.getClass().getMethod(methodName, parameterTypes); 53 Object result = method.invoke(service, arguments); 54 output.writeObject(result); 55 } catch (Throwable t) { 56 output.writeObject(t); 57 } finally { 58 output.close(); 59 } 60 } finally { 61 input.close(); 62 } 63 } finally { 64 socket.close(); 65 } 66 } catch (Exception e) { 67 e.printStackTrace(); 68 } 69 } 70 }).start(); 71 } catch (Exception e) { 72 e.printStackTrace(); 73 } 74 } 75 } 76 77 /** 78 * 引用服务 79 * 80 * @param <T> 接口泛型 81 * @param interfaceClass 接口类型 82 * @param host 服务器主机名 83 * @param port 服务器端口 84 * @return 远程服务 85 * @throws Exception 86 */ 87 @SuppressWarnings("unchecked") 88 public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { 89 if (interfaceClass == null) 90 throw new IllegalArgumentException("Interface class == null"); 91 if (! interfaceClass.isInterface()) 92 throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!"); 93 if (host == null || host.length() == 0) 94 throw new IllegalArgumentException("Host == null!"); 95 if (port <= 0 || port > 65535) 96 throw new IllegalArgumentException("Invalid port " + port); 97 System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port); 98 return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() { 99 public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { 100 Socket socket = new Socket(host, port); 101 try { 102 ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); 103 try { 104 output.writeUTF(method.getName()); 105 output.writeObject(method.getParameterTypes()); 106 output.writeObject(arguments); 107 ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); 108 try { 109 Object result = input.readObject(); 110 if (result instanceof Throwable) { 111 throw (Throwable) result; 112 } 113 return result; 114 } finally { 115 input.close(); 116 } 117 } finally { 118 output.close(); 119 } 120 } finally { 121 socket.close(); 122 } 123 } 124 }); 125 } 126 127 }
RpcProvider

1 public class RpcProvider { 2 public static void main(String[] args) throws Exception { 3 HelloService service = new HelloServiceImpl(); 4 RpcFramework.export(service, 1234); 5 } 6 }
HelloService

1 public interface HelloService { 2 String hello(String name); 3 }
HelloServiceImpl

1 /* 2 * Copyright 2011 Alibaba.com All right reserved. This software is the 3 * confidential and proprietary information of Alibaba.com ("Confidential 4 * Information"). You shall not disclose such Confidential Information and shall 5 * use it only in accordance with the terms of the license agreement you entered 6 * into with Alibaba.com. 7 */ 8 /** 9 * HelloServiceImpl 10 * 11 * @author william.liangf 12 */ 13 public class HelloServiceImpl implements HelloService { 14 15 public String hello(String name) { 16 return "Hello " + name; 17 } 18 19 }
RpcConsumer

1 /* 2 * Copyright 2011 Alibaba.com All right reserved. This software is the 3 * confidential and proprietary information of Alibaba.com ("Confidential 4 * Information"). You shall not disclose such Confidential Information and shall 5 * use it only in accordance with the terms of the license agreement you entered 6 * into with Alibaba.com. 7 */ 8 9 /** 10 * RpcConsumer 11 * 12 * @author william.liangf 13 */ 14 public class RpcConsumer { 15 16 public static void main(String[] args) throws Exception { 17 HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234); 18 for (int i = 0; i < Integer.MAX_VALUE; i ++) { 19 String hello = service.hello("World" + i); 20 System.out.println(hello); 21 Thread.sleep(1000); 22 } 23 } 24 25 }
实际上,RPC的原理其实很简单:
服务器启动了一个线程监听 Socket 端口,
有Socket访问了, 反序列化解析出
调用哪个Service 哪个 方法, 以及传入的 参数,
再用Socket 写回去.
客户端 利用 Jdk 的Proxy 生成了一个代理类,
在创建 Proxy 时建立与服务器的Socket连接.
调用 Proxy 的方法时, 向服务器发送数据, 等待结果返回.