各层说明
- 「config 配置层」:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
- 「proxy 服务代理层」:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
- 「registry 注册中心层」:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
- 「cluster 路由层」:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
- 「monitor 监控层」:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService
- 「protocol 远程调用层」:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
- 「exchange 信息交换层」:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
- 「transport 网络传输层」:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
- 「serialize 数据序列化层」:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool
模块分包说明
- 「dubbo-common 公共逻辑模块」:包括 Util 类和通用模型。
- 「dubbo-remoting 远程通讯模块」:相当于 Dubbo 协议的实现,如果 RPC 用 RMI协议则不需要使用此包。此模块包含了10层中的transport层和exchange层为rpc调用的基础模块。
- 「dubbo-rpc 远程调用模块」:抽象各种协议,以及动态代理,只包含一对一的调用,不关心集群的管理。此模块包含了10层中的protocol和proxy层。接口暴露和代理生成都在这一层。
- 「dubbo-cluster 集群模块」:将多个服务提供方伪装为一个提供方,包括:负载均衡, 容错,路由等,集群的地址列表可以是静态配置的,也可以是由注册中心下发。此模块为10层中的cluster层
- 「dubbo-registry 注册中心模块」:基于注册中心下发地址的集群方式,以及对各种注册中心的抽象。
- 「dubbo-monitor 监控模块」:统计服务调用次数,调用时间的,调用链跟踪的服务。此层为10层中的monitor层
- 「dubbo-config 配置模块」:是 Dubbo 对外的 API,用户通过 Config 使用Dubbo,隐藏 Dubbo 所有细节。此层包括了10层中的config层
- 「dubbo-container 容器模块」:是一个 Standlone 的容器,以简单的 Main 加载 Spring 启动,因为服务通常不需要 Tomcat/JBoss 等 Web 容器的特性,没必要用 Web 容器去加载服务。
- 「dubbo-serialization」:rpc调用的序列化方式(目前有5种fastjson、fst、hessian2、jdk、kryo)默认为hessian2,此模块为10层中的serialize层
项目启动初始化过程细节(生产者和消费者)
解析服务
基于 dubbo.jar 内的 META-INF/spring.handlers 配置,Spring 在遇到 dubbo 名称空间时,会回调 DubboNamespaceHandler。所有 dubbo 的标签,都统一用 DubboBeanDefinitionParser 进行解析,基于一对一属性映射,将 XML 标签解析为 Bean 对象。在 ServiceConfig.export() 或 ReferenceConfig.get() 初始化时,将 Bean 对象转换 URL 格式,所有 Bean 属性转成 URL 的参数。然后将 URL 传给 协议扩展点,基于扩展点的 扩展点自适应机制,根据 URL 的协议头,进行不同协议的服务暴露或引用。
暴露服务
-
在没有注册中心,直接暴露提供者的情况下 1.ServiceConfig 解析出的 URL 的格式为:dubbo://service-host/com.foo.FooService?version=1.0.0。基于扩展点自适应机制,通过 URL 的 dubbo:// 协议头识别,直接调用 DubboProtocol的 export() 方法,打开服务端口。
-
在有注册中心,需要注册提供者地址的情况下 2,ServiceConfig 解析出的 URL 的格式为: registry://registry-host/org.apache.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/com.foo.FooService?version=1.0.0"),基于扩展点自适应机制,通过 URL 的 registry:// 协议头识别,就会调用 RegistryProtocol 的 export() 方法,将 export 参数中的提供者 URL,先注册到注册中心。再重新传给 Protocol 扩展点进行暴露:dubbo://service-host/com.foo.FooService?version=1.0.0,然后基于扩展点自适应机制,通过提供者 URL 的 dubbo:// 协议头识别,就会调用 DubboProtocol 的 export() 方法,打开服务端口。
引用服务
-
在没有注册中心,直连提供者的情况下 3,ReferenceConfig 解析出的 URL 的格式为:dubbo://service-host/com.foo.FooService?version=1.0.0。基于扩展点自适应机制,通过 URL 的 dubbo:// 协议头识别,直接调用 DubboProtocol 的 refer() 方法,返回提供者引用。
- 从注册中心发现引用服务:在有注册中心,通过注册中心发现提供者地址的情况下 4,ReferenceConfig 解析出的 URL 的格式为:registry://registry-host/org.apache.dubbo.registry.RegistryService?refer=URL.encode("consumer://consumer-host/com.foo.FooService?version=1.0.0")。基于扩展点自适应机制,通过 URL 的 registry:// 协议头识别,就会调用 RegistryProtocol 的 refer() 方法,基于 refer 参数中的条件,查询提供者 URL,如:dubbo://service-host/com.foo.FooService?version=1.0.0。基于扩展点自适应机制,通过提供者 URL 的 dubbo:// 协议头识别,就会调用 DubboProtocol 的 refer() 方法,得到提供者引用。然后 RegistryProtocol 将多个提供者引用,通过 Cluster 扩展点,伪装成单个提供者引用返回。
项目启动初始化细节总结
不管生产者还是消费者,不管是否有注册中心,终究离不开Protocol这个接口。也就是说从生产者和消费者启动,他都会有Protocol产生,而当没有注册中心的时候,那么按上面所说的可以剔除上层并且复用,就会使用DubboProtocol实现类进行服务的暴露和引用。当有注册中心的时候根据Dubbo的自适应机制就会变成RegistryProtocol作为实现类,进行服务的暴露和引用。所以说Protocol的职责是对服务进行暴露和引用,而他的具体实现就要看使用Dubbo时的环境。
生产者暴露服务的过程
消费者引用服务的过程
服务者源码解析
项目初始化阶段
Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑。服务导出的入口方法是 ServiceBean 的 onApplicationEvent。onApplicationEvent 是一个事件响应方法,该方法会在收到 Spring 上下文刷新事件后执行服务导出操作。
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
ApplicationEventPublisherAware {
private transient volatile boolean exported;
private transient volatile boolean unexported;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 是否有延迟导出(延迟false) && 是否已导出 && 是不是已被取消导出
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
private boolean isDelay() {
Integer delay = getDelay();
ProviderConfig provider = getProvider();
if (delay == null && provider != null) {
delay = provider.getDelay();
}
return supportedApplicationListener && (delay == null || delay == -1);
}
public synchronized void export() {
// 当前类继承ServiceConfig.java所以会看当前
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
/**
* 真正的出操作
*/
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
if (exported) {
return;
}
exported = true;
// 检测 interfaceName 是否合法
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
// 检测 provider 是否为空,为空则新建一个,并通过系统变量为其初始化
checkDefault();
if (provider != null) {
...
}
if (module != null) {
...
}
if (application != null) {
...
}
// 检测 ref 是否为泛化服务类型
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
}
// ref 非 GenericService 类型
else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 对 interfaceClass,以及 <dubbo:method> 标签中的必要字段进行检查
checkInterfaceAndMethods(interfaceClass, methods);
// 对 ref 合法性进行检测
checkRef();
generic = Boolean.FALSE.toString();
}
// local 和 stub 在功能应该是一致的,用于配置本地存根
if (local != null) {
if ("true".equals(local)) {
local = interfaceName + "Local";
}
Class<?> localClass;
try {
// 获取本地存根类
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 检测本地存根类是否可赋值给接口类,若不可赋值则会抛出异常,提醒使用者本地存根类类型不合法
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
if (stub != null) {
...
}
// 检测各种对象是否为空,为空则新建,或者抛出异常
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
checkStub(interfaceClass);
checkMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
// 导出服务
doExportUrls();
CodecSupport.addProviderSupportedSerialization(getUniqueServiceName(), getExportedUrls());
// ProviderModel 表示服务提供者模型,此对象中存储了与服务提供者相关的信息。
// 比如服务的配置信息,服务实例等。每个被导出的服务对应一个 ProviderModel。
// ApplicationModel 持有所有的 ProviderModel。
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
/**
* 多协议多注册中心导出服务
* 把当前对象转换成URL.java
*/
private void doExportUrls() {
// 加载注册中心链接
List<URL> registryURLs = loadRegistries(true);
// 遍历 protocols,并在每个协议下导出服务
for (ProtocolConfig protocolConfig : protocols) {
// 组装URL
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
/**
* 把URL转成Invoker伪代码
*/
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
...
//methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
//执行method的所有操作
...
} // end of methods for
}
// 类似刚刚的泛型判断
if (ProtocolUtils.isGeneric(generic)) {
...
} else {
...
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// 暴露服务也就是URL转换成Invoker
...
String scope = url.getParameter(Constants.SCOPE_KEY);
// 如果 scope = none,则什么都不做
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// scope != remote,导出到本地
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// scope != local,导出到远程
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
// 有注册中心
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
...
// 为服务提供类(ref)生成 Invoker
// 刚刚说了生产者的ProxyFactory获取Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 拿到Invoker后暴露出去默认DubboProtocol
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
// 不存在注册中心,仅导出服务
else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
/**
* DubboProtocol.java的export方法
*/
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
}
消费者源码解析
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
// 检测 ref 是否为空,为空则通过 init 方法创建
if (ref == null) {
// init 方法主要用于处理配置,以及调用 createProxy 生成代理类
init();
}
return ref;
}
private void init() {
// 避免重复初始化
if (initialized) {
return;
}
// ...省略大量代码粗略总结下
// 1.主要用于检测 ConsumerConfig 实例是否存在
// 2.这段逻辑用于从系统属性或配置文件中加载与接口名相对应的配置,并将解析结果赋值给 url 字段。url 字段的作用一般是用于点对点调用。
String resolve = System.getProperty(interfaceName);
// 3.用于从系统属性或配置文件中加载与接口名相对应的配置,并将解析结果赋值给 url 字段。url 字段的作用一般是用于点对点调用。
// 4.用于检测几个核心配置类是否为空,为空则尝试从其他配置类中获取。
// 5.主要用于收集各种配置,并将配置存储到 map 中。
// 6.用于处理 MethodConfig 实例。该实例包含了事件通知配置,比如 onreturn、onthrow、oninvoke 等。
// 获取服务消费者 ip 地址
String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry == null || hostToRegistry.length() == 0) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
//attributes are stored by system context.
StaticContext.getSystemContext().putAll(attributes);
// 创建代理(重点)
ref = createProxy(map);
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
// 这一段代码是判断是否当前类在本地,也就是所谓的本地调用
if (isInjvm() == null) {
// url 配置被指定,则不做本地引用
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
}
// 根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用
// 比如如果用户显式配置了 scope=local,此时 isInjvmRefer 返回 true
else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
// by default, reference local service if there is
isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
// 获取 injvm 配置值
isJvmRefer = isInjvm().booleanValue();
}
// 本地引用
if (isJvmRefer) {
// 生成本地引用 URL,协议为 injvm
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
// 调用 refer 方法构建 InjvmInvoker 实例
// 本地引用的时候refprotocol为DubboProtocol
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
}
// 远程调用
else {
// url不为空说明消费者想点对点调用,此url可以再DubboReference的url属性设置
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
// 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
}
// url为空说明用户连接的是注册中心
else { // assemble URL from register center's configuration
// 加载注册中心 url
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 添加 refer 参数到 url 中,并将 url 添加到 urls 中
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
// 未配置注册中心,抛出异常
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
// 只有一个注册中心
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
//多个注册中心
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
// 最后会把多个invokers合并成一个,也就是通过cluster层把多个invokers合并成一个,然后具体走哪个就看消费者设置的策略
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
// 生产者检查是否可用 不可用直接报错程序启动不了使用dubbo.consumer.check=false去除
if (c && !invoker.isAvailable()) {
...
// create service proxy
// 创建代理
return (T) proxyFactory.getProxy(invoker);
}
消费者创建Invoker
public class DubboProtocol extends AbstractProtocol {
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
/**
* getClients。这个方法用于获取客户端实例
*/
private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
}
RegistryProtocol
public class RegistryProtocol implements Protocol {
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 取 registry 参数值,并将其设置为协议头
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//获取注册中心实例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// url转成Map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
// group的分组也就是@DubboReference(group=)
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
// 合并两个group也就是走到了cluster路由层让这一层选择分发哪个group
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//创建实例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
//设置注册中心实例
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 生成服务消费者链接
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// 注册服务消费者,在 consumers 目录下新节点
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
registry.register(registeredConsumerUrl);
directory.setRegisteredConsumerUrl(registeredConsumerUrl);
}
//当前消费者发送订阅的消息给注册中心,注册中心会把关注的生产者动态notify到消费者
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
}