Dubbo provider启动原理:
当我们的dubbo启动我们的spring容器时spring 初始化容器的时候会查找META-INF/spring.handles文件查找对应的NamespaceHandle,dubbo在其jar包下配置了DubboNamespaceHandle,该类下有以下配置项:
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true)); registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true)); registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
意思就是当spring 在解析容器的时候遇到指定配置会使用对应的Parser去解析配置项。
provider
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware, ApplicationEventPublisherAware
该类主要实现了上面几个接口,我们来看其中最主要的InitializingBean,该类会在类实例化后调用其中的afterPropertiesSet方法 ,所以我们来看下:
public void afterPropertiesSet() throws Exception { ... //上面一大堆代码都是判空去重新赋值的代码,我们不关注他们,最主要是下面这个export方法 if (!supportedApplicationListener) { export(); } }
public synchronized void export() { checkAndUpdateSubConfigs(); if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && !export) { return; } //当设置了延时发布时用定时器延时发布 if (delay != null && delay > 0) { delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS); } else { //否则的话直接发布 doExport(); } }
export方法最主要是判断是有配置了延时发布,是的话就用schedule去延时发布,否的话doExport发布,在spring中真正干活的都是do开头的方法,我们再继续查看doExport方法
1 protected synchronized void doExport() { 2 if (unexported) { 3 throw new IllegalStateException("Already unexported!"); 4 } 5 if (exported) { 6 return; 7 } 8 exported = true; 9 10 if (path == null || path.length() == 0) { 11 path = interfaceName; 12 } 13 //生成唯一serviceName group/interfaceClass 如group/com.xx.xxx 14 //ref 接口实现bean 服务的真正提供者 15 //interfaceClass 需要发布的接口服务 16 ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass); 17 ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); 18 //发布url 19 doExportUrls(); 20 }
这里主要查看doExportUrls()方法,上面的是把服务信息存到本地map里:
@SuppressWarnings({"unchecked", "rawtypes"}) private void doExportUrls() { //这里会获取到注册中心的列表如有配置多个的话, // 格式:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=user-service&dubbo=2.0.2&pid=16232®istry=zookeeper&release=2.7.0×tamp=1553883173387 List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
所以这里说明dubbo是支持多协议的多注册中心的,提前预告,下面这个doExportUrlsFor1Protocol方法会很长很复杂:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { //获取协议的名称如dubbo String name = protocolConfig.getName(); if (name == null || name.length() == 0) { //默认是dubbo name = Constants.DUBBO; } Map<String, String> map = new HashMap<String, String>(); //组装map 再填充到url上 map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); appendRuntimeParameters(map); appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this); //这里省略了一堆设置url的代码,主要是把接口的配置方法加到参数列表里如果method 重试次数 .... if (ProtocolUtils.isGeneric(generic)) { map.put(Constants.GENERIC_KEY, generic); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("NO method found in service interface " + interfaceClass.getName()); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(Constants.TOKEN_KEY, token); } } //是否injvm也就是本地发布不上注册中心 if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) { protocolConfig.setRegister(false); map.put("notify", "false"); } // export service String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } //获取绑定的有效IP地址 String host = this.findConfigedHosts(protocolConfig, registryURLs, map); //获得一个绑定端口 Integer port = this.findConfigedPorts(protocolConfig, name, map); //创建一个url //dubbo://192.168.1.2:20882/com.lin.service.UserService?anyhost=true&application=user-service // &bean.name=com.lin.service.UserService&bind.ip=192.168.1.2&bind.port=20882&dubbo=2.0.2 // &generic=false&group=userGroup&interface=com.lin.service.UserService&methods=add,findUserByName,findUserById // &pid=16232&release=2.7.0&side=provider×tamp=1553884198155 URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(Constants.SCOPE_KEY); // don't export when none is configured //当scope为none的时候不发布 if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) { // export to local if the config is not remote (export to remote only when config is remote) //当scope不是remote的时候发布本地服务 if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) //当scope不是local的时候发布远程服务 if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && !registryURLs.isEmpty()) { //当注册中心有多个的时候会发布到多个注册中心 for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); //监控url URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } // For providers, this is used to enable custom proxy to generate invoker String proxy = url.getParameter(Constants.PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); } //proxyFactory是一个自适应的扩展点,所以是一个proxyFactory$Adaptive //默认会有这几个 //stub=org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper //jdk=org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory //javassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory //当url里有proxy=xxx的时候就取xxxProxyFactory,如果没有的话默认就是JavassistProxyFactory Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); //创建一个Invoker的包装类DelegateProviderMetaDataInvoker DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); //protocol是一个自动扩展点,所以会返回一个ivoker.getURL().protocol/Protocol的一个对象 如RegistryProtocol、DubboProtocol Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } /** * @since 2.7.0 * ServiceData Store */ MetadataReportService metadataReportService = null; if ((metadataReportService = getMetadataReportService()) != null) { //上报元数据中心 metadataReportService.publishProvider(url); } } } this.urls.add(url); }
这里涉及到一个自适应扩展点的概念,具体什么是自适应扩展点可以到dubbo官网上看,那里介绍的很详细
因为这里要注册的URL是registry://192.168.xxxx这样格式的,又因为protocol又是一个Protocol自适应扩展点
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); Exporter<?> exporter = protocol.export(wrapperInvoker);
所以我们能得到protocol.export这里里面是调用的RegistryProtocol里面的export方法,所以我们再来看下这个方法:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //获得注册地址 //zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=user-service... URL registryUrl = getRegistryUrl(originInvoker); // url to export locally //获取提供者发布url //dubbo://192.168.1.2:20882/com.lin.service.UserService?anyhost=true&application=user-service&bean.name=... URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker //本地发布服务 也就是服务发布到netty容器里 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry //获得注册中心地址 final Registry registry = getRegistry(originInvoker); //获得要注册的提供者URL final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish boolean register = registeredProviderUrl.getParameter("register", true); if (register) { //注册到注册中心 register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } // Deprecated! Subscribe to override rules in 2.6.x or before. //订阅url registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); }
我们一步一步来分析上面的代码,首先我这边的注册中心用的是zookeeper所以拿到的注册地址是zookeeper协议的,服务提供者也是用的默认的dubbo协议,那我们下一步来看下服务真正发布的方法doLocalExport方法:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl); //这里protocol又是一个自适应扩展点,所以里面会调用invoker.getUrl.getProtocol+"Protocol"的export()方法 //如 DubboProtocol.export(); exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; }
一开始就是一个双重检查锁,我们不管他,直接关注我们的protocol.export方法,跟上面的一样,protocol也是一个自适应扩展点,所以里面实际用的是我们invoker.getUrl.getProtocol+"Protocol"的export()方法,我们invoker的url呢又是我们上面包装进去的providerUrl也就是dubbo://xxxx这个url,所以最终调用的就是DubboProtocol的export方法:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } openServer(url); optimizeSerialization(url); return exporter; }
上面那些代码我们不关心先不看了,我们直接可以看到有一个openServer(url)的方法,根据名字我们就可以猜到这里就是开启我们服务的地方了,我们来看下:
private void openServer(URL url) { // find server. String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { //创建服务 serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override server.reset(url); } } }
private ExchangeServer createServer(URL url) { ... url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } ... return server; }
我们可以看到这里Exchangers.bind(url,requestHandler),这里呢最终会调到:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } //这里getTransporter也是获得一个自适应的扩展点,如果没有配置的话默认就是用的NettyTransporter return getTransporter().bind(url, handler); }
再里面就是发布到Netty容器了,有兴趣的可以自己去看下,现在这里我们的服务就已经发布了,下面还有注册到注册中心,我们再看下我们上面的RegistryProtocol的export方法里面的注册服务的代码:
// url to registry //获得注册中心地址 final Registry registry = getRegistry(originInvoker); //获得要注册的提供者URL final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish boolean register = registeredProviderUrl.getParameter("register", true); if (register) { //注册到注册中心 register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); }
主要还是register这个方法:
public void register(URL registryUrl, URL registeredProviderUrl) { //因为registryFacotry是一个自适应的扩展点,所以会返回一个zookeeperRegistry,如果是redis://的话就返回一个RedisRegistry Registry registry = registryFactory.getRegistry(registryUrl); //注册到注册中心 registry.register(registeredProviderUrl); }
根据环境我们会获得一个ZookeeperRegistry所以我们再看下zookeeperRegistry的register方法:
因为register这个方法zookeeperRegistry并没有去实现它,所以一定是在父类的register我们继续看他父类FailbackRegistry的register方法:
public void register(URL url) { super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // Sending a registration request to the server side //注册服务 是模版模式,所以会在子类实现 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly addFailedRegistered(url); } }
这方法主要注册的就是doRegister方法了,因为是模版模式,所以这个方法zookeeperRegistry自己实现了这个方法:
@Override public void doRegister(URL url) { try { //创建一个临时节点 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
所以这里就是最终注册的地方了,这里会根据url去创建一个服务的临时节点,到这服务的发布和注册就已经完成了,其他地方有兴趣的可以自己去看下源码,dubbo里很多地方都用到了自适应扩展点这个概念,所以如果要看源码就要先去理解什么是自适应扩展点。