执行步骤
ReferenceBean.getObject()
-->ReferenceConfig.get()
-->init()
-->createProxy(map)
-->refprotocol.refer(interfaceClass, urls.get(0))
-->ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry");
-->extension.refer(arg0, arg1);
-->ProtocolFilterWrapper.refer (三个AOP类)
-->RegistryProtocol.refer
-->registryFactory.getRegistry(url)//建立zk的连接,和服务端发布一样(省略代码)
-->doRefer(cluster, registry, type, url)
-->FailbackRegistry.register
//创建zk的节点,和服务端发布一样(省略代码)。
//节点名为:dubbo/per.qiao.service.TestService/consumers
-->registry.subscribe//订阅zk的节点,和服务端发布一样(省略代码)
//dubbo/per.qiao.service.TestService/providers,
//dubbo/per.qiao.service.TestService/configurators
//dubbo/per.qiao.service.TestService/routers
-->notify(url, listener, urls);
-->FailbackRegistry.notify
-->doNotify(url, listener, urls);
-->AbstractRegistry.notify
-->saveProperties(url);
//把注册信息保存到cache文件(路径规则与暴露时一样)
-->registryCacheExecutor.execute(new SaveProperties(...));
//采用线程池来处理
-->listener.notify(categoryList);
-->RegistryDirectory.notify
-->refreshInvoker(invokerUrls);
//将URL转换成Invoker key为URL的字符串形式
-->toInvokers(invokerUrls)
-->protocol.refer(serviceType, url), url, providerUrl);
-->Protocol$Adaptive.refer
-->ExtensionLoader.getExtensionLoader(class)
.getExtension("dubbo");
-->extension.refer(type, url);
-->QosProtocolWrapper.refer
//这里创建了一个过滤连
//buildInvokerChain(invoker,
//"refernce.filter","consumer")
-->ProtocolFilterWrapper.refer
//return new ListenerInvokerWrapper
-->ProtocolListenerWrapper.refer
//return new DubboInvoker
-->DubboProtocol.refer
-->destroyUnusedInvokers(
oldUrlInvokerMap,newUrlInvokerMap);
// 关闭未使用的Invoker
//最终目的:刷新Map<String, Invoker<T>> urlInvokerMap 对象
,刷新Map<String, List<Invoker<T>>> methodInvokerMap对象
-->cluster.join(directory)//加入集群路由
-->ExtensionLoader.getExtensionLoader(Cluster.class)
.getExtension("failover");
-->MockClusterWrapper.join
-->this.cluster.join(directory)
-->FailoverCluster.join
-->return new FailoverClusterInvoker<T>(directory)
-->new MockClusterInvoker // 返回的invoker对象
--------------------------------------------------------------------------------------------
-->proxyFactory.getProxy(invoker) //创建服务代理
-->ProxyFactory$Adpative.getProxy
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class)
.getExtension("javassist");
-->StubProxyFactoryWrapper.getProxy(invoker) //进行了后置增强
-->AbstractProxyFactory.getProxy
-->getProxy(invoker, interfaces)
-->Proxy.getProxy(interfaces)
-->JavassistProxyFactory.getProxy
-->Proxy.getProxy(interfaces)
//目前代理对象per.qiao.service.TestSevice
//, interface com.alibaba.dubbo.rpc.service.EchoService
-->newInstance(InvokerInvocationHandler(MockClusterInvoker))
//这个MockClusterInvoker是上面refprotocol.refer返回的invoker对象
//采用jdk自带的InvocationHandler,创建InvokerInvocationHandler对象。
生成的代理类
入口:
ReferenceConfig#init,
ref = createProxy(map);
JavassistProxyFactory#getProxy会生成一个代理类
与其说生成一个代理类,倒不如说是两个(具体在com.alibaba.dubbo.common.bytecode.Proxy#getProxy中)
一个clazz(ccp),一个pc(ccm)
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
Proxy.getProxy方法会生成两个类,并返回Proxy0,调用newInstance时,
会调用Proxy0#newInstance(handler), 最终返回proxy0对象
也就是说,ReferenceBean.getObject(调用者)就是这个proxy0对象
public class Proxy0 extends Proxy {
@Override
public Object newInstance() {
return super.newInstance();
}
@Override
public Object newInstance(java.lang.reflect.InvocationHandler h) {
return new proxy0(h);
}
}
具体操作类(注意:这两个类只有第一个字的大小写不同)
import java.lang.reflect.InvocationHandler;
/**
* Create by IntelliJ Idea 2018.2
*
* @author: qyp
* Date: 2019-05-27 10:46
*/
public class proxy0 implements com.alibaba.dubbo.rpc.service.EchoService, per.qiao.service.TestService {
/**
* 包含这两个接口的实现方法,这里为($echo,getData,getList)
*/
public static java.lang.reflect.Method[] methods;
/**
* 这个hanlder就是上面执行过程refprotocol.refer返回的结果(MockClusterInvoker)
*/
private java.lang.reflect.InvocationHandler handler;
public proxy0(InvocationHandler h) {
this.handler = h;
}
// ---------这个方法是EchoService中的-------------
@Override
public Object $echo(java.lang.Object arg0) {
Object[] args = new Object[1];
args[0] = arg0;
Object ret = null;
try {
ret = handler.invoke(this, methods[2], args);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return (java.lang.Object) ret;
}
// ----------下面两个方法是服务引用的接口中的方法-----------
@Override
public java.lang.String getData(java.lang.String arg0) {
Object[] args = new Object[1];
args[0] = arg0;
Object ret = null;
try {
ret = handler.invoke(this, methods[0], args);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return (java.lang.String) ret;
}
@Override
public java.util.List getList() {
Object[] args = new Object[0];
Object ret = null;
try {
ret = handler.invoke(this, methods[1], args);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return (java.util.List) ret;
}
}
详细说以下服务引用时,是怎么和zookeeper产生联系的;
问题:如果服务端(生产端)已经启动,客户端(消费段)后,zookeeper上的节点已经存在,那么久不会通知到客户端,那么zookeeper是怎么刷新本地服务列表的??
RegistryProtocol
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//将url转为Registry对象
Registry registry = registryFactory.getRegistry(url);
...
// type是接口
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//创建一个注册目录
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
//注册消费者节点
registry.register(registeredConsumerUrl);
directory.setRegisteredConsumerUrl(registeredConsumerUrl);
}
//订阅 providers,configurators,routers这三个节点
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
// 将订阅信息保存到本地注册表
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
cluster不是我们要分析的重点,
FailbackRegistry
public FailbackRegistry(URL url) {
super(url);
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// 重试注册失败的URL 默认5秒之后重试,间隔是5秒
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
retry();
} catch (Throwable t) {
...
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
public void register(URL url) {
super.register(url);
//删除注册失败的URL
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 向服务器端发送注册请求
doRegister(url);
} catch (Exception e) {
// 如果打开启动检测,则直接抛出异常 (配置的check属性)
...
}
// 将失败的注册请求记录到失败的列表中,定期重试
failedRegistered.add(url);
}
}
ZookeeperRegistry
用来创建消费者节点
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
...
}
}
再来分析消费者订阅
RegistryProtocol.doRefer
//订阅 providers,configurators,routers这三个节点
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
RegistryDirectory.subscribe
public void subscribe(URL url) {
//设置当前订阅URL
setConsumerUrl(url);
registry.subscribe(url, this);
}
FailbackRegistry
public void subscribe(URL url, NotifyListener listener) {
//设置订阅的回调监听器
super.subscribe(url, listener);
//删除失败的订阅路径
removeFailedSubscribed(url, listener);
try {
// 注册客户端信息到zookeeper并创建监听三个节点,顺便刷新本地注册表
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
// 如果订阅失败,则从本地缓存文件中获取监听的URL刷新注册表
// 需要了解到的是, 缓存中的数据是通过消费段注册,或者zookeeper通知时调用notify才有的
// 也只有订阅失败了才会有此操作
List<URL> urls = getCacheUrls(url);
if (urls != null && !urls.isEmpty()) {
notify(url, listener, urls);
} else {
...
}
// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}
ZookeeperRegistry
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
//链接到zookeeper
zkClient = zookeeperTransporter.connect(url);
//设置状态监听器
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
...
} else {
List<URL> urls = new ArrayList<URL>();
// 遍历需要监听的URL (三个)
for (String path : toCategoriesPath(url)) {
//从缓存中获取监听
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
//如果缓存中没有,创建监听
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
//监听器回调方法为ZookeeperRegistry#notify
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
//创建三个监听的节点
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 在注册zookeeper的节点监听器后,自动去刷新本地列表
notify(url, listener, urls);
}
} catch (Throwable e) {
...
}
}
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
try {
doNotify(url, listener, urls);
} catch (Exception t) {
...
}
}
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
super.notify(url, listener, urls);
}
AbstractRegistry
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
// 遍历监听的URL 3个 添加到result
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
//将监听过的URL保存到本地文件
saveProperties(url);
//刷新本地注册表
listener.notify(categoryList);
}
}
RegistryDirectory
public synchronized void notify(List<URL> urls) {
// 分别对应 provider, router 和 configurator节点
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
refreshInvoker(invokerUrls);
}
// 刷新本地注册表
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
//转换URL为Invoker对象 只有provider节点的url才能生成Invoker对象
// 这里返回的是一个invoker的过滤连结构,终点是DubboInvoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
//转换成方法名对应Invoker
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
// 销毁无用的Invoker对象
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
客户端在服务的时候会将消费端信息注册到zookeeper(也可以试别的)节点上,顺便监听了providers,configurators,routers这三个节点,然后调用了RegistryDirectory.notiry刷新本地注册表, 返回的结果(引用对象)为MockClusterInvoker包含了RegistryDirectory对象
小结:
1. 注册到zookeeper,并订阅providers,configurators和routers节点
2. 通过refprotocol.refer获取的invoker对象是MockClusterInvoker(默认包装了FailoverClusterInvoker)
3. ReferenceBean#getObject获取的对象是上面的proxy0对象, 依赖了(2)中的MockClusterInvoker