服务启动的过程类似于Dubbo发布过程中,服务发布启动的过程,现在我们直接进入到服务调用的核心模块ReferenceBean进行分析:
首先是afterPropertiesSet()方法,是IUserService在初始化之后,进行一个回调处理
public abstract class AbstractBeanFactory
try {
populateBean(beanName, mbd, instanceWrapper);
//afterPropertiesSet()是在initializeBean方法内部进行调用的
exposedObject = initializeBean(beanName, exposedObject, mbd);
}
ReferenceBean的afterPropertiesSet()方法主要进行了一下行为:设置consumerConfig、applicationConfig、moduleConfig、registryConfigs(注册中心配置)、monitorConfig,并且添加了一个是否预初始化的操作。
public class ReferenceBean<T> extends ReferenceConfig<T> implements InitializingBean
public void afterPropertiesSet() throws Exception {
if (getConsumer() == null) {
Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);
if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
ConsumerConfig consumerConfig = null;
for (ConsumerConfig config : consumerConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (consumerConfig != null) {
throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);
}
consumerConfig = config;
}
}
if (consumerConfig != null) {
setConsumer(consumerConfig);
}
}
}
if (getApplication() == null
&& (getConsumer() == null || getConsumer().getApplication() == null)) {
Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
ApplicationConfig applicationConfig = null;
for (ApplicationConfig config : applicationConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (applicationConfig != null) {
throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
}
applicationConfig = config;
}
}
if (applicationConfig != null) {
setApplication(applicationConfig);
}
}
}
if (getModule() == null
&& (getConsumer() == null || getConsumer().getModule() == null)) {
Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
ModuleConfig moduleConfig = null;
for (ModuleConfig config : moduleConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (moduleConfig != null) {
throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
}
moduleConfig = config;
}
}
if (moduleConfig != null) {
setModule(moduleConfig);
}
}
}
if ((getRegistries() == null || getRegistries().isEmpty())
&& (getConsumer() == null || getConsumer().getRegistries() == null || getConsumer().getRegistries().isEmpty())
&& (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().isEmpty())) {
Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
if (registryConfigMap != null && registryConfigMap.size() > 0) {
List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
for (RegistryConfig config : registryConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
registryConfigs.add(config);
}
}
if (registryConfigs != null && !registryConfigs.isEmpty()) {
super.setRegistries(registryConfigs);
}
}
}
if (getMonitor() == null
&& (getConsumer() == null || getConsumer().getMonitor() == null)
&& (getApplication() == null || getApplication().getMonitor() == null)) {
Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
MonitorConfig monitorConfig = null;
for (MonitorConfig config : monitorConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (monitorConfig != null) {
throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
}
monitorConfig = config;
}
}
if (monitorConfig != null) {
setMonitor(monitorConfig);
}
}
}
//预初始化实例对象
Boolean b = isInit();
if (b == null && getConsumer() != null) {
b = getConsumer().isInit();
}
if (b != null && b.booleanValue()) {
getObject();
}
}
}
setApplicationContext(ApplicationContext applicationContext)方法主要用来设置上下文对象,方便获取应用上下文的属性配置、同时SpringExtensionFactory添加了一个ApplicationContext,方便Dubbo在服务扩展点加载时,提供了另外一种加载的方式,从Spring上下文中获取对象。
public class ReferenceBean<T> extends ReferenceConfig<T> implements ApplicationContextAware{
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
SpringExtensionFactory.addApplicationContext(applicationContext);
}
}
getObject()方法主要用来在获取bean对象时,提供自定义实例bean的方法,方便获取代理对象,调用的是父类ReferenceConfig中的get()方法,然后调用ReferenceConfig中的init()方法
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean
@Override
public Object getObject() throws Exception {
//此处的get()调用的是父类ReferenceConfig中的方法。此处的get()方法是一个加了synchronized的同步方法
return get();
}
}
在init()方法中,进行了一下操作,首先调用checkDefault()方法,设置consumerConfig默认配置、设置属性值,根据接口名从本地缓存中获取接口对应的解析值,如果不为空,则赋值给URL,然后通过consumer对成员变量设置属性。
然后通过Wrapper获取接口的一个包装类和方法,并设置参数map.然后获取dubbo的注册地址,然后根据参数map调用createProxy方法创建一个代理对象。
public class ReferenceConfig<T> extends AbstractReferenceConfig {
......
ref = createProxy(map);
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}
其中map参数值如下:
"side" -> "consumer"
"application" -> "user-consumer"
"register.ip" -> "10.9.233.26"
"methods" -> "getUserById,queryList"
"dubbo" -> "2.6.2"
"pid" -> "7036"
"interface" -> "com.bail.user.service.IUserService"
"version" -> "1.0.0"
"timestamp" -> "1638179225870"
"revision" -> "1.0.0"
createProxy方法中,调用InjvmProtocol中的isInjvmRefer判断是否是本地jvm引用,在isInjvmRefer方法中调用getExporter()方法来获取暴露者,返回为空,判断isInjvmRefer=false,成员变量url为空,调用loadRegistries(false)加载URLS,从RegistryConfig中获得注册中心地址,然后根据urls从refprotocol中获取调用者invoker,通过调用doRefer方法后,返回一个invoker对象,主要包括注册表信息、接口名称、路径监听等信息,然后继续调用proxyFactory的getProxy(invoker)返回一个代理对象
private T createProxy(Map<String, String> map) {
//tmpUrl = temp://localhost?application=user-consumer&dubbo=2.6.2
//&interface=com.bail.user.service.IUserService&methods=getUserById,queryList&pid=7036
//®ister.ip=10.9.233.26&revision=1.0.0&side=consumer×tamp=1638179225870&version=1.0.0
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
// by default, reference local service if there is
isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
isJvmRefer = isInjvm().booleanValue();
}
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// create service proxy
return (T) proxyFactory.getProxy(invoker);
}
然后在创建Protocol扩展类的时候,对应的包括类有三个,
0 = {Class@5213} "class com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper"
1 = {Class@5226} "class com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper"
2 = {Class@5319} "class com.alibaba.dubbo.qos.protocol.QosProtocolWrapper"
接下来分析refprotocol.refer(interfaceClass, urls.get(0))的过程,refprotocol本质上是一个Protocol的实现类,此处调用的是一个Protocol的扩展点实现类,扩展点实现类实际上调用的是RegistryProtocol,而在实例化RegistryProtocol的过程中,RegistryProtocol包含有多个扩展点的成员变量,如下:
private Cluster cluster;
private Protocol protocol;
private RegistryFactory registryFactory;
private ProxyFactory proxyFactory;
在调用refer的方法中,调用了registryFactory的getRegistry(url)方法,经过扩展点的获取,实际上调用的是ZookeeperRegistryFactory的父类AbstractRegistryFactory的getRegistry方法
此时
url = zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=user-consumer&dubbo=2.6.2&pid=5752&refer=application%3Duser-consumer%26dubbo%3D2.6.2%26interface%3Dcom.bail.user.service.IUserService%26methods%3DgetUserById%2CqueryList%26pid%3D5752%26register.ip%3D192.168.31.199%26revision%3D1.0.0%26side%3Dconsumer%26timestamp%3D1638194873753%26version%3D1.0.0×tamp=1638195625410
处理后的
url = zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=user-consumer&dubbo=2.6.2&interface=com.alibaba.dubbo.registry.RegistryService&pid=5752×tamp=1638195625410
然后,根据处理后的url获取一个注册服务,调用方法createRegistry(url),实际上调用的是实现类的createRegistry方法,此处调用的是ZookeeperRegistryFactory的createRegistry方法,
ZookeeperRegistryFactory类中有一个ZookeeperTransporter成员变量,也是一个扩展点。根据传入的url和ZookeeperTransporter,构建一个zkClient,同时添加一个状态监听器,如果状态是RECONNECTED,则调用recover方法:
public class ZookeeperRegistry extends FailbackRegistry {
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
//进入else
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
//此处的toCategoriesPath(url): 0 = "/dubbo/com.bail.user.service.IUserService/providers"
//1 = "/dubbo/com.bail.user.service.IUserService/configurators"
//2 = "/dubbo/com.bail.user.service.IUserService/routers"
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
//zkListener 为空,创建ChildListener内部类
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
//listener = RegistryDirectory的一个实例对象
//zkListener = RegistryDirectoryl的另一个实例对象
zkListener = listeners.get(listener);
}
//创建的是一个永久性节点
zkClient.create(path, false);
//调用的是AbstractZookeeperClient的addChildListener方法
List<String> children = zkClient.addChildListener(path, zkListener);
//children[0]= dubbo://192.168.137.210:20880/com.bail.user.service.IUserService?anyhost=true&application=user-provider&dubbo=2.6.2&generic=false&getUserById.retries=3&getUserById.timeout=3000&interface=com.bail.user.service.IUserService&methods=getUserById,queryList&pid=12664&retries=2&revision=1.0.0&side=provider&timeout=8000×tamp=1638340894565&version=1.0.0
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
AbstractZookeeperClient
public abstract class AbstractZookeeperClient<TargetChildListener> implements ZookeeperClient {
public List<String> addChildListener(String path, final ChildListener listener) {
//ChildListener 类型为ZookeeperRegistry
//TargetChildListener 类型为CuratorZookeeperClient$CuratorWatcherImpl
ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);
if (listeners == null) {
childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>());
listeners = childListeners.get(path);
}
TargetChildListener targetListener = listeners.get(listener);
if (targetListener == null) {
listeners.putIfAbsent(listener, createTargetChildListener(path, listener));
targetListener = listeners.get(listener);
}
return addTargetChildListener(path, targetListener);
}
}
recover方法是父类FailbackRegistry 中定义的方法。然后将得到的registry放入到AbstractRegistryFactory的Map<String, Registry> REGISTRIES容器中,在得到注册中心后,继续调用refer方法中的后序方法dorefer:doRefer(cluster, registry, type, url),其中的cluster是一个扩展点对象,此方法返回一个invoker对象,接下来看doRefer方法:
new RegistryDirectory(注册表目录)方法简单的返回了一个directory 对象,简单看一下RegistryDirectory结构,再继续往下走:
//注册表对象
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
//有三个扩展点对象、集群类型、路由工厂、配置器工厂
private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
private static final RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
private static final ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();
}
然后将注册中心赋值给对象,然后构建了一个subscribeUrl订阅URL,继续调用subscribe(URL url, NotifyListener listener)订阅方法,其中订阅的监听者就是我们传进去的参数NotifyListener。然后调用doSubscribe方法发送一个订阅请求。调用toCategoriesPath(url)方法,根据url得到三个不同的订阅路径,分别是
0 = "/dubbo/com.bail.user.service.IUserService/providers"
1 = "/dubbo/com.bail.user.service.IUserService/configurators"
2 = "/dubbo/com.bail.user.service.IUserService/routers"
,订阅到不同的三个路径后,调用notify方法,进行通知功能。
consumer://192.168.31.199/com.bail.user.service.IUserService?application=user-consumer&
dubbo=2.6.2&interface=com.bail.user.service.IUserService&methods=getUserById,queryList
&pid=5752&revision=1.0.0&side=consumer×tamp=1638194873753&version=1.0.0
然后调用FailbackRegistry的register方法,然后调用子类(即ZookeeperRegistry )的doRegister方法,发送一个注册请求给服务端。
看一下doRefer中属性值的变化, registry.register注册了一个consumer节点到zookeeper;
cluster.join(directory)方法,根据注册表返回一个接口的invoker,此处默认加载的cluster为FailoverCluster,并且有一个包装类MockClusterWrapper。
调用提供消费者注册表注册消费者方法ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory)
至此doRefer执行完毕,返回一个invoker对象
public class RegistryProtocol implements Protocol {
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//url = zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=user-consumer&dubbo=2.6.2&pid=11332&refer=application%3Duser-consumer%26dubbo%3D2.6.2%26interface%3Dcom.bail.user.service.IUserService%26methods%3DgetUserById%2CqueryList%26pid%3D11332%26register.ip%3D192.168.137.210%26revision%3D1.0.0%26side%3Dconsumer%26timestamp%3D1638323468558%26version%3D1.0.0×tamp=1638323742757
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
//构建一个消费类型的订阅URL
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
//调用注册中心的注册方法,实际上调用的是FailbackRegistry的register
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
//注册表订阅了一个provider,configurators、touters三个类型的节点,其中category=providers,configurators,routers
//directory进行了订阅、通知等操作
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
}
调用了FailbackRegistry的register,内部调用了 doRegister(url)方法,实际上调用的是AbstractZookeeperClient的create方法
public abstract class FailbackRegistry extends AbstractRegistry {
public void register(URL url) {
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
failedRegistered.add(url);
}
}
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
//调用的是ZookeeperRegistry的doSubscribe()方法。
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (urls != null && !urls.isEmpty()) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list, retry regularly
Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
listeners = failedNotified.get(url);
}
listeners.put(listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
}
public abstract class AbstractRegistry implements Registry {
//针对某个URL的监听者
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
//listeners为空
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners == null) {
//创建了一个以当前URL为key的MAP,并将当前注册表作为监听者放入set集合
subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = subscribed.get(url);
}
listeners.add(listener);
}
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.isEmpty())
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);
listener.notify(categoryList);
}
}
}
public abstract class AbstractZookeeperClient<TargetChildListener> implements ZookeeperClient {
public void create(String path, boolean ephemeral) {
//path = /dubbo/com.bail.user.service.IUserService/consumers/consumer://192.168.137.210/com.bail.user.service.IUserService?application=user-consumer&category=consumers&check=false&dubbo=2.6.2&interface=com.bail.user.service.IUserService&methods=getUserById,queryList&pid=11332&revision=1.0.0&side=consumer×tamp=1638323468558&version=1.0.0
int i = path.lastIndexOf('/');
if (i > 0) {
//parentPath = /dubbo/com.bail.user.service.IUserService/consumers
String parentPath = path.substring(0, i);
//如果路径不存在,则创建节点
if (!checkExists(parentPath)) {
create(parentPath, false);
}
}
//ephemeral = true ,说明dubbo中的节点是临时性的
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}
}
doRefer方法中,构建了一个RegistryDirectory对象,此类中也包含多个扩展点对象,不过都静态final类型的:
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
private static final RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
private static final ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();
public RegistryDirectory(Class<T> serviceType, URL url) {
super(url);
if (serviceType == null)
throw new IllegalArgumentException("service type is null.");
if (url.getServiceKey() == null || url.getServiceKey().length() == 0)
throw new IllegalArgumentException("registry serviceKey is null.");
this.serviceType = serviceType;
this.serviceKey = url.getServiceKey();
this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");
this.multiGroup = group != null && ("*".equals(group) || group.contains(","));
String methods = queryMap.get(Constants.METHODS_KEY);
this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);
}
}
public void subscribe(URL url) {
//url = consumer://192.168.137.210/com.bail.user.service.IUserService?application=user-consumer&category=providers,configurators,routers&dubbo=2.6.2&interface=com.bail.user.service.IUserService&methods=getUserById,queryList&pid=11332&revision=1.0.0&side=consumer×tamp=1638323468558&version=1.0.0
setConsumerUrl(url);
//调用注册中心的subscribe方法,即FailbackRegistry的subscribe
registry.subscribe(url, this);
}
zkClient = zookeeperTransporter.connect(url);
//url=zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=user-consumer&dubbo=2.6.2&interface=com.alibaba.dubbo.registry.RegistryService&pid=11332×tamp=1638323742757
cluster.join(directory
cluster.join(directory)方法,根据注册表返回一个接口的invoker,此处默认加载的cluster为FailoverCluster,并且有一个包装类MockClusterWrapper。返回doRefer方法
public class MockClusterWrapper implements Cluster {
private Cluster cluster;
public MockClusterWrapper(Cluster cluster) {
this.cluster = cluster;
}
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}
}
ProviderConsumerRegTable
调用ProviderConsumerRegTable
public class ProviderConsumerRegTable {
//包含两个静态成员变量一个提供调用者、一个消费调用者
public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
}
//调用注册方法,在map容器中保存调用者
public static void registerConsumer(Invoker invoker, URL registryUrl, URL consumerUrl, RegistryDirectory registryDirectory) {
//构造一个消费调用者Wrapper
ConsumerInvokerWrapper wrapperInvoker = new ConsumerInvokerWrapper(invoker, registryUrl, consumerUrl, registryDirectory);
String serviceUniqueName = consumerUrl.getServiceKey();
Set<ConsumerInvokerWrapper> invokers = consumerInvokers.get(serviceUniqueName);
if (invokers == null) {
consumerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ConsumerInvokerWrapper>());
invokers = consumerInvokers.get(serviceUniqueName);
}
invokers.add(wrapperInvoker);
}
ConsumerInvokerWrapper
public class ConsumerInvokerWrapper<T> implements Invoker {
private Invoker<T> invoker;
private URL originUrl;
private URL registryUrl;
private URL consumerUrl;
private RegistryDirectory registryDirectory;
public ConsumerInvokerWrapper(Invoker<T> invoker, URL registryUrl, URL consumerUrl, RegistryDirectory registryDirectory) {
this.invoker = invoker;
this.originUrl = URL.valueOf(invoker.getUrl().toFullString());
this.registryUrl = URL.valueOf(registryUrl.toFullString());
this.consumerUrl = consumerUrl;
this.registryDirectory = registryDirectory;
}
}