在上一节中,由于篇幅原因,对invoker的介绍会在本章节详细阐述。
生成消费者调用invoker
将代码点到ReferenceConfig#createProxy -> invoker = refprotocol.refer(interfaceClass, url);
这边还不得不提一下refprotocol到底取到的是哪个扩展点呢?
根据ExtensionLoader和wrapper的关系,加载到的结果是这样的:
实际上调用了 ProtocolListenerWrapper.refer -> ProtocolFilterWrapper.refer -> registerProtocal#refer -> RegistryProtocol#doRefer
ProtocolListenerWrapper.refer 执行一些监听器的referred动作 ,传入的参数是 registerProtocal#refer 获得的invoker。返回的是ListenerInvokerWrapper其实现了Invoker<T>接口。
ProtocolFilterWrapper.refer
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); }
buildInvokerChain这个方法的作用就是 处理一系列的过滤器,每一个过滤器都有一个invoker处理,而ProtocolListenerWrapper.refer 得到的invoker,作为最后一个要处理的invoker。关于过滤器将在后续介绍。
RegistryProtocol#doRefer
/** * * @param cluster FailoverCluster * @param registry zookeeperRegister * @param type DemoService.class * @param url zookeeper://localhost:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-consumer&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=35673×tamp=1640055558556 * @param <T> * @return */ private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); // 去注册消费者 registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(subscribeUrl); /** * 为消息消费者添加category=providers,configurators,routers属性后,然后向注册中心订阅该URL, * 关注该服务下的providers,configurators,routers发生变化时通知RegistryDirectory, * 以便及时发现服务提供者、配置、路由规则的变化 */ directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); // 根据Directory,利用集群策略返回集群Invoker。cluster$Adaptive -> FailoverClusterInvoker Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
服务的注册和发现跟RegistryDirectory相关联,包括获取对应的invoker。下图是RegisterDirectory的结构,当然还有StaticDirectory,在上面的代码中设置了Register的具体扩展,protocal,RegisteredConsumerUrl,RouterChain
然后调用Invoker invoker = cluster.join(directory);来创建invoker。
public class FailoverCluster implements Cluster { public final static String NAME = "failover"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); } }
从代码上可以看到 实际上返回的是FailoverClusterInvoker。在这里 不得不简单描述一下Cluster集群容错与invoker的关系
最后我们拿到的实际上是FailoverClusterInvoker,当执行方法调用的时候,实际上就是执行其doInvoke方法。
这边先说个结论;: 此时的invoker为 MockClusterInvoker
createProxy
现在回到上一节的代理部分
ReferenceConfig#createProxy
/** * 根据invoker获取代理类,其实现逻辑如下: * 从消费者URL中获取interfaces的值,用,分隔出单个服务应用接口。 * 增加默认接口EchoService接口。 * 根据需要实现的接口,使用jdk或Javassist创建代理类。 */ return (T) proxyFactory.getProxy(invoker);
proxyFactory.getProxy
-> AbstractProxyFactory#getProxy(Invoker<T> invoker)
-> AbstractProxyFactory#getProxy(Invoker<T> invoker)#getProxy(Invoker<T> invoker, boolean generic)
-> JavassistProxyFactory#getProxy
1) Proxy.getProxy(interfaces) 返回的是一个 Proxy0 对象,注意是大写的P
2)最后调用 Proxy0 的 newInstance 方法,实例化一个包装类型的 proxy0,注意是小写的
3)dubbo 里面的 EchoService 或者 泛化调用,都是通过javassist 字节码工具,以实现方法形式实现,所以才可以在业务层面强转成功。
4)对于返回的代理类,也实现对应 refer 的接口,例如上述实现的 DemoService。
5)DC 是dynamic code 缩写,只是一个标识,说明是动态生成的类。
6)对于手动执行方法,都是调用 InvokerInvocationHandler 的 invoke 方法
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
这边的proxy是使用的org.apache.dubbo.common.bytecode.Proxy;的proxy工具类,InvokerInvocationHandler的invoke方法返回的数据类型就是远程服务的类型。
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } return invoker.invoke(createInvocation(method, args)).recreate(); }
也就是说 当执行了远程服务接口方法的时候,实际上就是执行是proxy0.sayhello,然后AbstractClusterInvoker#invoke ,上面的代码做了如下事情:
1) 该方法对应的类如果是Object 类型,则直接执行该方法。
2 ) 判断是否为toString、hashCode、equals,是则直接执行invoker 的对应方法
3) 构造一个 RpcInvocation,用于执行invoker 的 invoker 方法,而RpcInvocation 的生层过程就是填充一些比如methodName,parameterTypes, arguments, attachments, invoker 等信息。此invoker 为 MockClusterInvoker。
接下来就是服务调用过程了,invoker.invoke将在后续讲解。其中也涉及了负载均衡 集群容错和服务的注册和发现,其中包含了动态配置和路由配置