概述
配置 dubbo
我们在使用dubbo使用的时候,首先在 resources 下创建 dubbo-consumer.xml
和 dubbo-provider.xml
就可以了,例如我们看一下 dubbo-provider.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> <dubbo:provider protocol="dubbo" port="-1"/> <!--<dubbo:service interface="cn.healthmall.order.service.AppMsgPushUpdateService" ref="appMsgPushUpdateService" version="1.0.0" retries="0"/>--> </beans>
provider 服务接口定义如下 :
public interface DemoService { String sayHello(String name); default CompletableFuture<String> sayHelloAsync(String name) { return CompletableFuture.completedFuture(sayHello(name)); } }
我们这一节了解一下 dubbo是如何将bean 导出的。
源码分析
DubboBootrap 是如何被嵌入到 Springboot 中启动的呢 ?
DubboBootrap 是启动类,它包含了Dubbo的启动,我们从名字也可以看得出,那么在 Spring boot 是如何Dubbo是如何启动的呢?我们从springboot 流程中可以知道 Springboot 启动中允许用户根据需求扩展 Listener ,达到监听 Springboot 事件的作用,我们看一下这个类 :
主要是这个类,我们在讲 Springboot 启动流程中的时候讲到了 Springboot 会广播例如像刷新 context 一样的事件,而 dubbo 生成自己的监听器(Listener)来开启 Dubbo 启动的逻辑,具体的代码 :
public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener implements Ordered { private void onContextRefreshedEvent(ContextRefreshedEvent event) { dubboBootstrap.start(); } }
接下里就是 DubboBootstrap 中的 start 方法方法
/** * Start the bootstrap */ public DubboBootstrap start() { if (started.compareAndSet(false, true)) { initialize(); if (logger.isInfoEnabled()) { logger.info(NAME + " is starting..."); } // 导出服务 // 1. export Dubbo Services exportServices(); // Not only provider register if (!isOnlyRegisterProvider() || hasExportedServices()) { // 2. export MetadataService exportMetadataService(); //3. Register the local ServiceInstance if required registerServiceInstance(); } // referServices(); if (logger.isInfoEnabled()) { logger.info(NAME + " has started."); } } return this; }
我们看一下 exportServices 方法,这个方法,先是 configManager 中获取多个 serviceBean ,而 serviceBean 又是如何来的呢? 是从;
private void exportServices() { configManager.getServices().forEach(sc -> { // TODO, compatible with ServiceConfig.export() ServiceConfig serviceConfig = (ServiceConfig) sc; serviceConfig.setBootstrap(this); if (exportAsync) { ExecutorService executor = executorRepository.getServiceExporterExecutor(); Future<?> future = executor.submit(() -> { sc.export(); }); asyncExportingFutures.add(future); } else { sc.export(); exportedServices.add(sc); } }); }
serviceBean 就是 service bean 的抽象,类结构图
serviceBean 继承自 ServiceConfig ,ServiceConfig 本身带了一个 ScheduledExecutorService 用于某个 bean 延迟启动 (这个感觉类的对象消耗挺大的,因为每个 service bean 都带有一个 ScheduledExecutorService 字段,而服务一般都会有多个),我们在代码 sc.export() 这句就会看到具体的实现。
//ServiceConfig类方法 public synchronized void export() { if (!shouldExport()) { return; } if (bootstrap == null) { bootstrap = DubboBootstrap.getInstance(); bootstrap.init(); } checkAndUpdateSubConfigs(); //init serviceMetadata serviceMetadata.setVersion(version); serviceMetadata.setGroup(group); serviceMetadata.setDefaultGroup(group); serviceMetadata.setServiceType(getInterfaceClass()); serviceMetadata.setServiceInterfaceName(getInterface()); serviceMetadata.setTarget(getRef()); //延迟导出 if (shouldDelay()) { DELAY_EXPORT_EXECUTOR.schedule(this:: doExport, getDelay(), TimeUnit.MILLISECONDS); } else { //最终调用这个方法 doExport(); } }
//ServiceConfig类方法 protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); } if (exported) { return; } exported = true; if (StringUtils.isEmpty(path)) { path = interfaceName; } // doExportUrls(); // dispatch a ServiceConfigExportedEvent since 2.7.4 dispatch(new ServiceConfigExportedEvent(this)); } private void doExportUrls() { ServiceRepository repository = ApplicationModel.getServiceRepository(); ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass()); repository.registerProvider( getUniqueServiceName(), ref, serviceDescriptor, this, serviceMetadata ); List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true); for (ProtocolConfig protocolConfig : protocols) { String pathKey = URL.buildKey(getContextPath(protocolConfig) .map(p -> p + "/" + path) .orElse(path), group, version); // In case user specified path, register service one more time to map it to path. repository.registerService(pathKey, interfaceClass); // TODO, uncomment this line once service key is unified serviceMetadata.setServiceKey(pathKey); doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
通过调试, registryURLs 一个例子例如这样 :
registryURLs : registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&pid=5300&qos.port=22222®istry=zookeeper×tamp=1586333795545
可以知道前面的逻辑是获取 protocol 信息,获取知道 某个 service 是通过什么 protocol 进行传输的,于是进入了这个方法 :
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); if (StringUtils.isEmpty(name)) { name = DUBBO; } Map<String, String> map = new HashMap<String, String>(); //这一段设置了一堆信息到 map 中,设置完的map 的信息如下图 ..... //init serviceMetadata attachments serviceMetadata.getAttachments().putAll(map); // 下面开始导出服务了!!!核心逻辑 // export service String host = findConfigedHosts(protocolConfig, registryURLs, map); Integer port = findConfigedPorts(protocolConfig, name, map); URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); // You can customize Configurator to append extra parameters if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(SCOPE_KEY); // don't export when none is configured if (!SCOPE_NONE.equalsIgnoreCase(scope)) { // export to local if the config is not remote (export to remote only when config is remote) if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (CollectionUtils.isNotEmpty(registryURLs)) { for (URL registryURL : registryURLs) { .... //注意这里!!使用 ProxyFactory 来生成一个 Invoker (相当与 spring 中的 AOP 实现) Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); //生成包装类 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); //交给各个协议调用 export 方法了 Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } /** * @since 2.7.0 * ServiceData Store */ WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE)); if (metadataService != null) { metadataService.publishServiceDefinition(url); } } } this.urls.add(url); }
很长,但是我们看到核心的逻辑如下 :
- 封装 map信息
- PROXY_FACTORY 生成 invoker
- 生成包装类
- 调用 protocol 的 export 方法
ok, 到了 protocol 的 export 方法,protocol 是个接口,我们从下图可以看到到最后注册在 zk 中,实际上这个 protocol 对象被包装了多层,涉及到的类如下
从上至下依次为 :
- ProtocolListenerWrapper
- ProtocolFilterWrapper
- QosProtocolWrapper
- RegistryProtocol
我们从名字也大概可以猜到各个类的作用,我们来看一下 RegistryProtocol 这个类的 exprot 作用,从名字可以知道该类为了服务 provider 注册到注册中心去的。
@Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { URL registryUrl = getRegistryUrl(originInvoker); // url to export locally URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); // decide if we need to delay publish boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { //注册的逻辑在这 register(registryUrl, registeredProviderUrl); } // Deprecated! Subscribe to override rules in 2.6.x or before. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); } public void register(URL registryUrl, URL registeredProviderUrl) { Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registeredProviderUrl); ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey()); model.addStatedUrl(new ProviderModel.RegisterStatedURL( registeredProviderUrl, registryUrl, true )); }
而 Register 是个接口它和 protocol 类一样,被包装了多层,我们从上面的调试图片也可以看到,在FailbackRegistry 类中的 register 方法中,
@Override public void register(URL url) { if (!acceptable(url)) { logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type."); return; } super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // Sending a registration request to the server side // 注册方法!! doRegister(url); } catch (Exception e) { ... } ... }
最终到了 ZookeeperRegistry 这个类的注册方法,很好理解,利用 zkClient 完成节点注册。
@Override public void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
至此,我们完成了服务的导出整个流程。
补充
ProxyFactory 是什么
提外话看一下这个 ProxyFactory 到底是什么
/** * ProxyFactory. (API/SPI, Singleton, ThreadSafe) 单例,线程安全的 */ @SPI("javassist") public interface ProxyFactory { @Adaptive({PROXY_KEY}) <T> T getProxy(Invoker<T> invoker) throws RpcException; @Adaptive({PROXY_KEY}) <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException; @Adaptive({PROXY_KEY}) <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException; } //例如一个实现,利用反射返回对象 public class JdkProxyFactory extends AbstractProxyFactory { @Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker)); } @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { Method method = proxy.getClass().getMethod(methodName, parameterTypes); return method.invoke(proxy, arguments); } }; } }
protocol 接口是如何被包装的呢?
上面我们看到 protocol 调用 export 方法的时候实际上是一个 protocol&Adaptive 的包装类,我们了解一下这个包装类是如何形成的呢?在上一篇的 SPI 机制有讲到,可以前往查看。
参考资料
- http://dubbo.apache.org/zh-cn/docs/source_code_guide/dubbo-spi.html