问题
- 为什么返回的对象不直接是 Invoker 而是代理类呢
回答 :
Dubbo服务暴露的主要目的是让本地的服务bean能够让其它进程通过网络调用。在暴露服务前,dubbo需要根据配置信息收集服务相关信息,服务的配置信息都配置在ServiceConfig中。Dubbo接收到Spring触发的ContextRefreshedEvent事件后,dubbo进行真正的服务暴露过程。Dubbo服务暴露目的是让消费者调用, Dubbo会将注册的服务bean包装为可执行对象Invoker,但是暴露的服务bean不能任意被消费者调用,所以消费者对服务bean的访问需要受到控制,这些控制的逻辑会放在代理逻辑中实现,而ProxyFactory用于生成代理对象。除此之外,当前进程的服务bean能够被其它进程调用的前提是能够被其它进程发现,所以这会涉及到服务注册的问题,Registry抽象了注册中心。服务调用又会涉及到网络协议,比如长连接还是短连接协议,关于暴露协议的封装由Protocol接口负责。
概述
服务引用的时机
Dubbo 服务引用的时机有两个,第一个是在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,第二个是在 ReferenceBean 对应的服务被注入到其他类中时引用。这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。默认情况下,Dubbo 使用懒汉式引用服务。如果需要使用饿汉式,可通过配置 dubbo:reference 的 init 属性开启。
服务引用概述过程
外界看到的某个接口的服务引用,而实际是dubbo 经过包装配置的invoker对象。在 Dubbo 中,我们可以通过两种方式引用远程服务。第一种是使用服务直连的方式引用服务,第二种方式是基于注册中心进行引用。服务直连的方式仅适合在调试或测试服务的场景下使用,不适合在线上环境使用。
当我们的服务被注入到其他类中时,Spring 会第一时间调用 getObject 方法,并由该方法执行服务引用逻辑。按照惯例,在进行具体工作之前,需先进行配置检查与收集工作。接着根据收集到的信息决定服务用的方式,有三种,第一种是引用本地 (JVM) 服务,第二是通过直连方式引用远程服务,第三是通过注册中心引用远程服务。不管是哪种引用方式,最后都会得到一个 Invoker 实例。如果有多个注册中心,多个服务提供者,这个时候会得到一组 Invoker 实例,此时需要通过集群管理类 Cluster 将多个 Invoker 合并成一个实例。合并后的 Invoker 实例已经具备调用本地或远程服务的能力了,但并不能将此实例暴露给用户使用,这会对用户业务代码造成侵入。此时框架还需要通过代理工厂类 (ProxyFactory) 为服务接口生成代理类,并让代理类去调用 Invoker 逻辑。避免了 Dubbo 框架代码对业务代码的侵入,同时也让框架更容易使用。
源码分析
服务引用的分析从 ReferenceBean 开始,看一下类结构
其中 FactoryBean 和 ApplicationContextAware, InitializingBean 都是继承自 spring 用于在 springboot 引用,
//这个方法重写自 InitializingBean,该方法用于当bean set完属性后,用户进行自定义的逻辑
@Override
@SuppressWarnings({"unchecked"})
public void afterPropertiesSet() throws Exception {
// 初始化 dubbo 配置 bean
// Initializes Dubbo's Config Beans before @Reference bean autowiring
prepareDubboConfigBeans();
// lazy init by default.
if (init == null) {
init = false;
}
// eager init if necessary.
if (shouldInit()) {
getObject();
}
}
通过 createProxy 的调用栈看到调用会经过 RegistryProtocol 的 doRefer 方法,从这里我们可以大概知道生成 invoker 的过程。
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//这个 RegistryDirectory 存放注册信息的资料,当注册信息发生变化的时候通过 subscribe 方法分发,内部存在一些监听器处理信息时间,同时内部
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
//订阅 providers、configurators、routers 等节点数据
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
//这才是真正生成 invoker 的地方
// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
Invoker invoker = cluster.join(directory);
return invoker;
}
我们发现在返回 inovker 之前的 ,会调用 RegistryDirectory 的 subscribe 方法,这个方法正是就是同个服务提供者生成多个 invoker 的地方,调用栈如下,可以看见该例中调用的是 DubboProtocol 。
我们稍微讲解一下,然后再分析 DubboProtocol 生成 invoker 的过程。 首先,多个提供者生成 invoker 的步骤在 AbstractRegistry 的 notify 方法中(可以从前面的调用栈找到)
/**
* Notify changes from the Provider side.
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
...
// keep every provider's category. 找到提供者的目录(提供者信息)
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
//更新操作!!!!要是第一次,必然多个提供者创建多个invoker,从调用栈可以看出下面的 notity 方法最终也调用到了 DubboProtocol 生成 DubboInvoker 的过程。
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
saveProperties(url);
}
}
最后 AbstractProtocol 的 refer 被调用
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
DubboProtocol 的 protocolBindingRefer 被调用 :
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
到了这里我们,逻辑不难懂,就是创建一个 DubboInvoker 然后返回。其中有个 getClients 的方法,这个方法用于获取客户端实例,实例类型为 ExchangeClient。ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如 NettyClient、MinaClient 等,默认情况下,Dubbo 使用 NettyClient 进行通信。这里做一下分析,DubboProtocol 的 buildReferenceCountExchangeClient 方法
/**
* Build a single client
*
* @param url
* @return
*/
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
ExchangeClient exchangeClient = initClient(url);
return new ReferenceCountExchangeClient(exchangeClient);
}
/**
* Create new connection
*
* @param url
*/
private ExchangeClient initClient(URL url) {
// client type setting.
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
//假如有指定懒加载
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
//使用 SPI 机制进行调用
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
最后会经过
/**
* DefaultMessenger
*
*
*/
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 这里包含了多个调用,分别如下:
// 1. 创建 HeaderExchangeHandler 对象
// 2. 创建 DecodeHandler 对象
// 3. 通过 Transporters 构建 Client 实例
// 4. 创建 HeaderExchangeClient 对象
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
...
}
而这个 Transporters中的 connect 方法 也是同样的套路,也是使用 SPI 机制,
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
//SPI 机制
return getTransporter().connect(url, handler);
}
默认是 netty
public class NettyTransporter implements Transporter {
...
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
总结
我们可以说服务引用就是Dubbo进行 :
- 消费者注册
- 封装 invoker 返回给消费者,供消费者透明使用 的过程。
参考资料
- dubbo 官方文档


