本地调用与透明远程调用的比较:
当需要调用另外一台机器上的接口时,期望与本地调用类似。而将远程协议封装到底层去。
本地调用 Wrok work = applicationContext.getBean(Work.class)
work.exec();
远程调用
Wrok work =ServiceFactory.createService(Work.class, String toAddr)
work.exec();
因为远程调用需要涉及到给出的地址,故需要增加一些有关寻址的参数。但原始业务类与方法不需要增加任何注解或者配置。只要业务方法的参数保证能够进行序列化即可。
远程调用的实现:
public <T> T createService(Class<T> type, String toAddr){ // 创建一个回调对象,远程调用逻辑在内实现 RpcInvocationHandler rpcInvocationHandler = new RpcInvocationHandler(); // 将远程调用参数,传递到远程代理实现类中 rpcInvocationHandler.setAddr(toAddr); // 使用java动态代理,实现业务对象的代理 return (T) Proxy.newProxyInstance(ServiceFactory.class.getClassLoader(), new Class[]{type}, rpcInvocationHandler); }
动态代理的实现:
public class RpcInvocationHandler implements InvocationHandler { // 内置消息中心,用于真正发送消息 private MessageCenter messageCenter; // 目标地址 private String toAddr; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 建议进行一次包装,而不要直接将参数发送 RpcRequestMessage rpcRequestMessage = new RpcRequestMessage(); RpcRequest request = new RpcRequest(); request.setId(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName());
// 这里用asm import org.objectweb.asm.Type获得方法信息 request.setMethodDesc(method.getName() + Type.getMethodDescriptor(method)); request.setArgs(args); rpcRequestMessage.setRequest(request); // 如果是同步消息,则要建立等待池 rpcMessagePairPool.addRequest(request); // 消息的具体发送交给消息中心,可考虑各种序列化 messageCenter.send(rpcRequestMessage); // 根据消息ID进行响应。 RpcResponse response = rpcMessagePairPool.loadResponse(request); if (response.getException() != null) { throw response.getException(); } return response.getResult(); } }
消息中心可使用各种序列化将消息发送到对端,例如使用kafka消息封装,http-json等。然后再进行反序列化通知到对端的应用中。
public void onMessage(Message message) { try { RpcRequestMessage requestMessage = (RpcRequestMessage) message; RpcRequest request = requestMessage.getRequest(); // 根据请求信息,还原出类型 Class<?> type = Class.forName(request.getClassName()); // 根据类型获取到实例 Object target = applicationContext.getBean(type); // 获取需要调用的方法 Method currentMethod = findMethod(type, request.getMethodDesc()); // 进行方法调用,并获得返回值 if (currentMethod != null) { try { Object result = currentMethod.invoke(target, request.getArgs()); rpcResponse.setResult(result); } catch (Exception e) { rpcResponse.setException(e); } } else { rpcResponse.setException(new RuntimeException("no such method")); } RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setId(request.getId()); RpcResponseMessage rpcResponseMessage = new RpcResponseMessage(); rpcResponseMessage.setToAddr(requestMessage.getFrom()); rpcResponseMessage.setResponse(rpcResponse); // 将结果重新发送回去 messageCenter.send(rpcResponseMessage); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }
因方法存在重名的情况,所以要获取方法的详细描述,才能识别出目标对象的正确方法
protected Method findMethod(Class<?> type, String methodDesc) throws Exception { String key = type.getName() + "#" + methodDesc; Method method = methodMap.get(key); if (method == null) { synchronized (methodMap) { method = methodMap.get(key); if (method == null) { Method[] declaredMethods = type.getDeclaredMethods(); for (Method method2 : declaredMethods) { methodMap.put(type.getName() + "#" + method2.getName() + Type.getMethodDescriptor(method2), method2); } } method = methodMap.get(key); } } return method; }