zoukankan      html  css  js  c++  java
  • RPC笔记之初探RPC:DIY简单RPC框架

    一、什么是RPC

    RPC(Remote Procedure Call)即远程过程调用,简单的说就是在A机器上去调用B机器上的某个方法,在分布式系统中极其常用。

    rpc原理其实很简单,比较容易理解,在rpc中有两个角色,rpc server和rpc client,先从client开始讨论,因为client要像调用普通类的方法那样调用server端的方法,比如client要调用server端的Foo#bar()方法,所以它必须首先获取到Foo的实例,但是又不能直接new,因为直接new的话还是相当于在本地调用,所以这个时候就必须有个什么机制能够把Foo包一下,使得表面上看起来和Foo完全一样但是调用它的bar方法时底层替换为去调用server的bar,这个机制就是代理,代理提供了一种类似于拦截的机制,可以把调用bar方法替换成为自己的实现,比如本地调用bar方法大致执行过程(粗糙概括):

    execute bar()
    return result

    代理替换之后的bar方法(粗糙概括):

    call rpc server execute bar()
    get result from server
    return result

    第一步的call rpc server execute bar(),client如何告诉server自己要调用哪个方法呢,这个方法就比较多了,比较常见的是约定一种协议,比如第几个字节是表示的嘛意思,然后server接收后解析按照指令执行就可以了,这样网络传输的数据比较少,或者不太讲究的直接将现成的协议拿过来用,比如通过socket直接传json、传xml、传对象流等等,再或者甚至用http请求的,反正能够把自己要调用哪个方法告诉server,同时还有调用方法时需要传递的参数,然后等待server执行完获取到其结果就可以了。

    然后就是server端的处理,如果是使用socket传输数据的话,server应该启动一个服务监听在约定的端口(不约定好的话客户端不知道去连谁啊),一个while循环不断地等待客户端的连接,每来一个客户端就启动一个新的线程去处理(此处没有考虑高并发情况下的负载和优化,只是基本的实现),在新线程中读取socket流看客户端要调用哪个方法,然后调用本地的此方法,调用的时候将client传过来的参数传入进去,待方法执行完再传回给client,传回的方法和client传数据过来相同,无非是走socket自定义协议、xml、json、http等等,再然后client读取到结果返回,一次rpc调用就完成了。至此,一个简单的rpc框架的雏形已经完成。

    二、DIY简单RPC框架

    这一章节基于上面讨论的rpc调用的过程,实现一个简单的rpc框架,其中代理使用JDK提供的代理实现,传输层使用Java的ObjectInputStream和ObjectOutputStream实现。

    定义一个工具类,提供两个方法,分别用于服务端启动rpc server和客户端获取相关serviceProvider的代理对象。

    RpcServiceProviderUtil.java:

    package cc11001100.diySimpleRpcFramework.util;
    
    import java.io.Closeable;
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.InvocationTargetException;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    import static cc11001100.diySimpleRpcFramework.util.RpcLogUtil.info;
    
    /**
     * @author CC11001100
     */
    public class RpcServiceProviderUtil {
    
    	/**
    	 * 在指定的端口上启动rpc服务,用于server端启动rpc服务
    	 *
    	 * @param port
    	 * @throws IOException
    	 */
    	public static <T> void startService(T object, int port) throws IOException {
    		ServerSocket serverSocket = new ServerSocket(port);
    		while (true) {
    			final Socket socket = serverSocket.accept();
    			info(socket, "start");
    			new Thread(() -> {
    				ObjectInputStream ois = null;
    				ObjectOutputStream oos = null;
    				try {
    					// 从输入流中读取要调用的方法名和调用时传入的参数
    					ois = new ObjectInputStream(socket.getInputStream());
    					String methodName = ois.readUTF();
    					Class[] parameterTypes = (Class[]) ois.readObject();
    					Object[] parameterValues = (Object[]) ois.readObject();
    					Method method = object.getClass().getMethod(methodName, parameterTypes);
    
    					// 调用方法执行
    					info(socket, "begin invoke method ", methodName);
    					long start = System.currentTimeMillis();
    					Object invoke = method.invoke(object, parameterValues);
    					long cost = System.currentTimeMillis() - start;
    					info(socket, "exec method ", methodName, " done, cost=", cost, "ms");
    
    					// 将执行结果传回调用端
    					oos = new ObjectOutputStream(socket.getOutputStream());
    					oos.writeObject(invoke);
    				} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
    					// 如果发生了异常,反馈给调用端
    					try {
    						info(socket, "exec exception, e=", e.getClass(), ", cause=", e.getCause());
    						oos = new ObjectOutputStream(socket.getOutputStream());
    						oos.writeObject(e);
    					} catch (IOException e1) {
    						e1.printStackTrace();
    					}
    				} catch (IOException e) {
    					e.printStackTrace();
    				} finally {
    					close(ois);
    					close(oos);
    					close(socket);
    				}
    				info(socket, "end");
    			}).start();
    		}
    	}
    
    	private static void close(Closeable closeable) {
    		if (closeable != null) {
    			try {
    				closeable.close();
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    	/**
    	 * 用于客户端获取rpc的endpoint
    	 *
    	 * @param clazz
    	 * @param remoteHost
    	 * @param remotePort
    	 * @param <T>
    	 * @return
    	 */
    	public static <T> T wrap(Class<T> clazz, String remoteHost, int remotePort) {
    		return (T) Proxy.newProxyInstance(clazz.getClassLoader(), clazz.getInterfaces(), (proxy, method, args) -> {
    			// 获取要调用的方法的名字
    			String methodName = method.getName();
    			Class<?>[] parameterTypes = method.getParameterTypes();
    
    			// 调用rpc server端去执行
    			Socket socket = new Socket(remoteHost, remotePort);
    			ObjectOutputStream oos = null;
    			ObjectInputStream ois = null;
    			Object result = null;
    			try {
    				oos = new ObjectOutputStream(socket.getOutputStream());
    				oos.writeUTF(methodName);
    				oos.writeObject(parameterTypes);
    				oos.writeObject(args);
    
    				// 读取rpc server端执行结果
    				ois = new ObjectInputStream(socket.getInputStream());
    				result = ois.readObject();
    
    				// 此处不catch,执行时出了异常尽管抛出
    			} finally {
    				close(ois);
    				close(oos);
    				close(socket);
    			}
    
    			// 检测server端执行是否抛了异常
    			if (result != null && result instanceof Throwable) {
    				throw (Throwable) result;
    			}
    
    			return result;
    		});
    	}
    
    }
    

    RpcLogUtil.java:

    package cc11001100.diySimpleRpcFramework.util;
    
    import java.net.Socket;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    /**
     * @author CC11001100
     */
    public class RpcLogUtil {
    
    	private static String now() {
    		return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
    	}
    
    	public static void info(Socket socket, Object... messages) {
    		String remoteAddress = socket.getRemoteSocketAddress().toString();
    		StringBuilder sb = new StringBuilder();
    		sb.append("[").append(now()).append("]")
    				.append(" - ").append(remoteAddress).append(":").append(" - ");
    		for (Object msg : messages) {
    			sb.append(msg.toString());
    		}
    		System.out.println(sb.toString());
    	}
    
    }
    

    因为代理是使用JDK提供的代理机制实现的,这种代理方式要求必须要定义一个接口然后实现它,所以首先定义一个接口:

    package cc11001100.diySimpleRpcFramework.rpcServiceProvider;
    
    public interface FooRpcServiceProvider {
    
    	int add(int a, int b);
    
    }
    

    然后实现它:

    package cc11001100.diySimpleRpcFramework.rpcServiceProvider;
    
    /**
     * @author CC11001100
     */
    public class FooRpcServiceProviderImpl implements FooRpcServiceProvider {
    
    	@Override
    	public int add(int a, int b) {
    		return a + b;
    	}
    
    }

    测试一下此RPC server是否可用,先启动一个rpc server:

    package cc11001100.diySimpleRpcFramework.test;
    
    import cc11001100.diySimpleRpcFramework.rpcServiceProvider.FooRpcServiceProviderImpl;
    import cc11001100.diySimpleRpcFramework.util.RpcServiceProviderUtil;
    
    import java.io.IOException;
    
    /**
     * 启动rpc server端
     *
     * @author CC11001100
     */
    public class FooRpcServiceProviderServerTest {
    
    	public static void main(String[] args) throws IOException {
    
    		RpcServiceProviderUtil.startService(new FooRpcServiceProviderImpl(), 10086);
    
    	}
    
    }
    

    然后启动client去调用server:

    package cc11001100.diySimpleRpcFramework.test;
    
    import cc11001100.diySimpleRpcFramework.rpcServiceProvider.FooRpcServiceProvider;
    import cc11001100.diySimpleRpcFramework.rpcServiceProvider.FooRpcServiceProviderImpl;
    import cc11001100.diySimpleRpcFramework.util.RpcServiceProviderUtil;
    
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    /**
     * rpc客户端调用
     *
     * @author CC11001100
     */
    public class FooRpcServiceProviderClientTest {
    
    	public static void main(String[] args) throws InterruptedException {
    
    		FooRpcServiceProvider foo = RpcServiceProviderUtil.wrap(FooRpcServiceProviderImpl.class, "localhost", 10086);
    		Random random = new Random();
    		while (true) {
    			int a = random.nextInt(10);
    			int b = random.nextInt(10);
    			int result = foo.add(a, b);
    			System.out.printf("%d + %d = %d
    ", a, b, result);
    			TimeUnit.MILLISECONDS.sleep(random.nextInt(900) + 100);
    		}
    
    	}
    
    }
    

    控制台输出:

    image image

    我写的rpc server精通10以内加法,这点client可以作证。

    .

  • 相关阅读:
    数据类型(列类型-小数型)
    Python全栈开发之2、数据类型-数值、字符串、列表、字典、元组和文件处理
    Python全栈开发之3、深浅拷贝、变量和函数、递归、函数式编程、内置函数
    Python全栈开发之1、python基础初识介绍以及安装
    数据类型(列类型)
    中文数据问题
    SQL--数据--基本操作
    SQL--数据表--基本操作
    SQL--数据库--基本操作
    Mysql数据库
  • 原文地址:https://www.cnblogs.com/cc11001100/p/10273535.html
Copyright © 2011-2022 走看看