代码版本2.5.x
java spi
dubbo扩展机制spi
SPI的全名为Service Provider Interface,面向对象的设计里面,模块之间推荐基于接口编程,而不是对实现类进行硬编码,这样做也是为了模块设计的可拔插原则。为了在模块装配的时候不在程序里指明是哪个实现,就需要一种服务发现的机制,jdk的spi就是为某个接口寻找服务实现。
比如dubbo的支持的协议默认是dubbo协议,还支持http、hessian、webservic等,spi就是找通过接口找具体实现类的过程。
直接看一个使用例子来理解spi。
LoadBalance接口有多个实现类,RandomLoadBalance是默认实现。
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
random=com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=com.alibaba.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
consistenthash=com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
- LoadBalance只有一个方法,就是选择一个远程客户端
- @SPI(RandomLoadBalance.NAME) 表明LoadBalance是一个扩展点,如果url中没有显示指明实现方式,则RandomLoadBalance为默认实现方式
- LoadBalance的所有实现类通过key value形势配置在配置文件中
- 在初始化阶段会将所有的实现类加载到内存map中,同时会生成一个代理类,当运行阶段需要获取实现类的时候会通过该代理类来实现
- 代理类通过@Adaptive指定,如果@Adaptive放在类上表明该类为代理类,如果@Adaptive放在方法上,则会动态生成一个代理类
- 上面@Adaptive声明在select方法上,则会动态声明一个代理类。当运行阶段调用select方法后,实际调用的是代理类的select方法,代理类的select方法会解析url,从url中提取出"loadbalance"的值(根据 @Adaptive("loadbalance")),该值有可能是random、roundrobin、leastactive、consistenthash中的一个。最后根据这个值选择具体调用哪个实现类的select方法。
注册中心
- dubbo内部支持的四种注册中心实现方式,分别是dubbo、multicast、zookeeper、redis
RegistryService
该接口定义了注册、取消注册、订阅、取消订阅以及查询符合条件的已注册数据接口。
public interface RegistryService { // Registry extends RegistryService
/**
* 注册服务.
*
* 注册需处理契约:<br>
* 1. 当URL设置了check=false时,注册失败后不报错,在后台定时重试,否则抛出异常。<br>
* 2. 当URL设置了dynamic=false参数,则需持久存储,否则,当注册者出现断电等情况异常退出时,需自动删除。<br>
* 3. 当URL设置了category=overrides时,表示分类存储,缺省类别为providers,可按分类部分通知数据。<br>
* 4. 当注册中心重启,网络抖动,不能丢失数据,包括断线自动删除数据。<br>
* 5. 允许URI相同但参数不同的URL并存,不能覆盖。<br>
*
* @param url 注册信息,不允许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
*/
void register(URL url);
/**
* 取消注册服务.
*
* 取消注册需处理契约:<br>
* 1. 如果是dynamic=false的持久存储数据,找不到注册数据,则抛IllegalStateException,否则忽略。<br>
* 2. 按全URL匹配取消注册。<br>
*
* @param url 注册信息,不允许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
*/
void unregister(URL url);
/**
* 订阅服务.
*
* 订阅需处理契约:<br>
* 1. 当URL设置了check=false时,订阅失败后不报错,在后台定时重试。<br>
* 2. 当URL设置了category=overrides,只通知指定分类的数据,多个分类用逗号分隔,并允许星号通配,表示订阅所有分类数据。<br>
* 3. 允许以interface,group,version,classifier作为条件查询,如:interface=com.alibaba.foo.BarService&version=1.0.0<br>
* 4. 并且查询条件允许星号通配,订阅所有接口的所有分组的所有版本,或:interface=*&group=*&version=*&classifier=*<br>
* 5. 当注册中心重启,网络抖动,需自动恢复订阅请求。<br>
* 6. 允许URI相同但参数不同的URL并存,不能覆盖。<br>
* 7. 必须阻塞订阅过程,等第一次通知完后再返回。<br>
*
* @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @param listener 变更事件监听器,不允许为空
*/
void subscribe(URL url, NotifyListener listener);
/**
* 取消订阅服务.
*
* 取消订阅需处理契约:<br>
* 1. 如果没有订阅,直接忽略。<br>
* 2. 按全URL匹配取消订阅。<br>
*
* @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @param listener 变更事件监听器,不允许为空
*/
void unsubscribe(URL url, NotifyListener listener);
/**
* 查询注册列表,与订阅的推模式相对应,这里为拉模式,只返回一次结果。
*
* @see org.apache.dubbo.registry.NotifyListener#notify(List)
* @param url 查询条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @return 已注册信息列表,可能为空,含义同{@link org.apache.dubbo.registry.NotifyListener#notify(List<URL>)}的参数。
*/
List<URL> lookup(URL url);
}
Registry
继承RegistryService和Node,没有自己的实现
public interface Registry extends Node, RegistryService {
}
public interface Node {
//获得节点地址
URL getUrl();
//判断节点是否可用
boolean isAvailable();
//销毁节点
void destroy();
}
RegistryFactory
@SPI("dubbo")
public interface RegistryFactory {
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
- 用来获取注册中心Registry
- 是一个spi扩展接口,默认实现是DubboRegistryFactory
NotifyListener
public interface NotifyListener {
/**
* 当收到服务变更通知时触发。
* <p>
* 通知需处理契约:<br>
* 1. 总是以服务接口和数据类型为维度全量通知,即不会通知一个服务的同类型的部分数据,用户不需要对比上一次通知结果。<br>
* 2. 订阅时的第一次通知,必须是一个服务的所有类型数据的全量通知。<br>
* 3. 中途变更时,允许不同类型的数据分开通知,比如:providers, consumers, routers, overrides,允许只通知其中一种类型,但该类型的数据必须是全量的,不是增量的。<br>
* 4. 如果一种类型的数据为空,需通知一个empty协议并带category参数的标识性URL数据。<br>
* 5. 通知者(即注册中心实现)需保证通知的顺序,比如:单线程推送,队列串行化,带版本对比。<br>
*
* @param urls 已注册信息列表,总不为空,含义同{@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}的返回值。
*/
void notify(List<URL> urls);
}
AbstractRegistry
他是Registry的一个子类, 帮助Registry分担了部分功能,在该类中实现了把本地URL缓存到property文件中的机制,并且实现了注册中心的注册、订阅等方法。
FailbackRegistry
public void register(URL url) {
super.register(url);
//首先从失败的缓存中删除该url
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 向注册中心发送一个注册请求
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
// 如果开启了启动时检测,则直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
// 把这个注册失败的url放入缓存,并且定时重试。
failedRegistered.add(url);
}
}
- FailbackRegistry继承了AbstractRegistry,AbstractRegistry中的注册订阅等方法,实际上就是一些内存缓存的变化,而真正的注册订阅的实现逻辑在FailbackRegistry实现,并且FailbackRegistry提供了失败重试的机制(如果失败抛出异常,则加入到失败的缓存中进行重试)。
- 代码中doRegister是一个模板方法,真正的注册逻辑由具体的子类来实现(比如redis注册中心和zk注册中心的实现是不一样的),同理doUnregister、doSubscribe、doUnsubscribe三个方法也是这样的。
zookeeper注册中心
- 服务提供者启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址
- 服务消费者启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址
- 监控中心启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。
ZookeeperRegistry
该类继承了FailbackRegistry类,该类就是针对注册中心核心的功能注册、订阅、取消注册、取消订阅,查询注册列表进行展开,基于zookeeper来实现
接下来分别看一下注册、取消注册、订阅等方法
@Override
protected void doRegister(URL url) {
try {
// 创建URL节点,也就是URL层的节点
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
@Override
protected void doUnregister(URL url) {
try {
// 删除节点
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
- 这两个方法分别是注册和取消注册,也很简单,调用都是客户端create和delete方法,一个是创建一个节点,另一个是删除节点,该操作都在URL层
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 处理所有Service层发起的订阅,例如监控中心的订阅
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
// 获得根目录
String root = toRootPath();
// 获得url对应的监听器集合
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
// 不存在就创建监听器集合
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
// 获得节点监听器
ChildListener zkListener = listeners.get(listener);
// 如果该节点监听器为空,则创建
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
// 遍历现有的节点,如果现有的服务集合中没有该节点,则加入该节点,然后订阅该节点
for (String child : currentChilds) {
// 解码
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
// 重新获取,为了保证一致性
zkListener = listeners.get(listener);
}
// 创建service节点,该节点为持久节点
zkClient.create(root, false);
// 向zookeeper的service节点发起订阅,获得Service接口全名数组
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
// 遍历Service接口全名数组
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
// 发起该service层的订阅
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
// 处理指定 Service 层的发起订阅,例如服务消费者的订阅
List<URL> urls = new ArrayList<URL>();
// 遍历分类数组
for (String path : toCategoriesPath(url)) {
// 获得监听器集合
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
// 如果没有则创建
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
// 获得节点监听器
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
// 通知服务变化 回调NotifyListener
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
// 重新获取节点监听器,保证一致性
zkListener = listeners.get(listener);
}
// 创建type节点,该节点为持久节点
zkClient.create(path, false);
// 向zookeeper的type节点发起订阅
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
// 加入到自子节点数据数组
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 通知数据变化
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
// 获得监听器集合
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
// 获得子节点的监听器
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
// 如果为全部的服务接口,例如监控中心
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
// 获得根目录
String root = toRootPath();
// 移除监听器
zkClient.removeChildListener(root, zkListener);
} else {
// 遍历分类数组进行移除监听器
for (String path : toCategoriesPath(url)) {
zkClient.removeChildListener(path, zkListener);
}
}
}
}
}
@Override
public List<URL> lookup(URL url) {
if (url == null) {
throw new IllegalArgumentException("lookup url == null");
}
try {
List<String> providers = new ArrayList<String>();
// 遍历分组类别
for (String path : toCategoriesPath(url)) {
// 获得子节点
List<String> children = zkClient.getChildren(path);
if (children != null) {
providers.addAll(children);
}
}
// 获得 providers 中,和 consumer 匹配的 URL 数组
return toUrlsWithoutEmpty(url, providers);
} catch (Throwable e) {
throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
远程通信
- dubbo支持的远程通讯框架包括mina、netty、grizzly,其中netty是默认实现。
- 有一些概念要认识一下
- Endpoint:client与server都是Endpoint
- Channel:client与server之间通过channnel连接。一个client对应一个channel,一个server可以对应多个channel
- Transporter:网络传输层
Transporter
- Transporters:Transporter的装饰类
- AbstractPeer:该类实现了Endpoint和ChannelHandler两个接口
- AbstractEndpoint:
- AbstractServer:实现了服务器的公共逻辑,比如发送消息,关闭通道,连接通道,断开连接等。并且抽象了打开和关闭服务器两个方法
- AbstractClient:该类中也是做了客户端公用的重连逻辑,抽象了打开客户端、关闭客户端、连接服务器、断开服务器连接以及获得通道方法,让子类去重点关注这几个方法
- AbstractChannel:具体的发送消息逻辑在它的子类中实现
- ChannelHandlerDelegate:该类继承了ChannelHandler,从它的名字可以看出是ChannelHandler的代表
- AbstractChannelHandlerDelegate:该类实现了ChannelHandlerDelegate接口,并且有一个属性是ChannelHandler,上述已经说到这是装饰模式中的装饰角色,其中的所有实现方法都直接调用被装饰的handler属性的方法。
- DecodeHandler:该类为解码处理器,继承了AbstractChannelHandlerDelegate,对接收到的消息进行解码,在父类处理接收消息的功能上叠加了解码功能
可以看到做了三次判断,根据消息的不同会对消息的不同数据做解码。可以看到,这里用到装饰模式后,在处理消息的前面做了解码的处理,并且还能继续委托给handler来处理消息,通过组合做到了功能的叠加 - MultiMessageHandler:该类是多消息处理器的抽象类。同样继承了AbstractChannelHandlerDelegate类
逻辑很简单,当消息是多消息类型时,也就是一次性接收到多条消息的情况,循环去处理消息,当消息是单消息时候,直接交给handler去处理。 - WrappedChannelHandler
该类跟AbstractChannelHandlerDelegate的作用类似,都是装饰模式中的装饰角色,其中的所有实现方法都直接调用被装饰的handler属性的方法,该类是为了添加线程池的功能,它的子类都是去关心哪些消息是需要分发到线程池的,哪些消息直接由I / O线程执行,现在版本有四种场景,也就是它的四个子类,下面我一一描述。
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
// 创建线程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
// 设置组件的key
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
}
// 获得dataStore实例
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
// 把线程池放到dataStore中缓存
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
可以看到构造方法除了属性的填充以外,线程池是基于dubbo 的SPI Adaptive机制创建的,在dataStore中把线程池加进去, 该线程池就是AbstractClient 或 AbstractServer 从 DataStore 获得的线程池。
public ExecutorService getExecutorService() {
// 首先返回的不是共享线程池,是该类的线程池
ExecutorService cexecutor = executor;
// 如果该类的线程池关闭或者为空,则返回的是共享线程池
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
return cexecutor;
}
该方法是获得线程池的实例,不过该类里面有两个线程池,还加入了一个共享线程池,共享线程池优先级较低。
ExecutionChannelHandler
@Override
public void received(Channel channel, Object message) throws RemotingException {
// 获得线程池实例
ExecutorService cexecutor = getExecutorService();
// 如果消息是request类型,才会分发到线程池,其他消息,如响应,连接,断开连接,心跳将由I / O线程直接执行。
if (message instanceof Request) {
try {
// 把请求消息分发到线程池
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
// FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
// therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
// this scenario from happening, but a better solution should be considered later.
// 当线程池满了,SERVER_THREADPOOL_EXHAUSTED_ERROR错误无法正常返回
// 因此消费者方必须等到超时。这是一种预防的临时解决方案,所以这里直接返回该错误
if (t instanceof RejectedExecutionException) {
Request request = (Request) message;
if (request.isTwoWay()) {
String msg = "Server side(" + url.getIp() + "," + url.getPort()
+ ") thread pool is exhausted, detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
}
} else {
// 如果消息不是request类型,则直接处理
handler.received(channel, message);
}
}
该类继承了WrappedChannelHandler,也是增强了功能,处理的是接收请求消息时,把请求消息分发到线程池,而除了请求消息以外,其他消息类型都直接通过I / O线程直接执行。
上述就可以都看到对于请求消息的处理,其中有个打补丁的方式是当线程池满了的时候,消费者只能等待请求超时,所以这里直接返回线程池满的错误。
AllChannelHandler
该类也继承了WrappedChannelHandler,也是为了增强功能,处理的是连接、断开连接、捕获异常以及接收到的所有消息都分发到线程池。
以看到,所有操作以及消息都分到到线程池中。并且注意操作不同,传入的状态也不同。
ConnectionOrderedChannelHandler
该类也是继承了WrappedChannelHandler,增强功能,该类是把连接、取消连接以及接收到的消息都分发到线程池,但是不同的是,该类自己创建了一个跟连接相关的线程池,把连接操作和断开连接操分发到该线程池,而接收到的消息则分发到WrappedChannelHandler的线程池中。来看看具体的实现。
可以属性中有一个连接线程池,看到在构造函数里创建了该线程池,而queuewarninglimit是用来限制连接线程池的工作队列长度,比较简单。来看看连接和断开连接到逻辑。
可以看到,这两个操作都是分发到连接线程池connectionExecutor中,和AllChannelHandle类r中的分发的线程池不是同一个。而ConnectionOrderedChannelHandler的received方法跟AllChannelHandle一样,我就不贴出来。
MessageOnlyChannelHandler
该类也是继承了WrappedChannelHandler,是WrappedChannelHandler的最后一个子类,也是增强功能,不过该类只是处理了所有的消息分发到线程池。可以看到源码,比较简单:
接口Dispatcher
- 该接口是调度器接口,dispatch是线程池的调度方法,五种实现,分别是AllDispatcher、DirectDispatcher、MessageOnlyDispatcher、ExecutionDispatcher、ConnectionOrderedDispatcher
- Dispatcher是和上面的WrappedChannelHandler配合使用
AllDispatcher
对照着上述讲到的AllChannelHandler,是不是很清晰这种线程池的调度方法。并且该调度方法是默认的调度方法。
ConnectionOrderedDispatcher
对照上述讲到的ConnectionOrderedChannelHandler,也很清晰该线程池调度方法。
DirectDispatcher
该线程池调度方法是不调度线程池,直接执行。
ExecutionDispatcher
对照着上述的ExecutionChannelHandler讲解,也可以很清晰的看出该线程池调度策略。
MessageOnlyDispatcher
对照着上述讲到的MessageOnlyChannelHandler,可以很清晰该线程池调度策略。
ChannelHandlers
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// 调用了多消息处理器,对心跳消息进行了功能加强
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
该类是通道处理器工厂,会对传入的handler进行一次包装,无论是Client还是Server都会做这样的处理,也就是做了一些功能上的增强,就像上述我说到的装饰模式中的那些功能。
最关键的是这两个方法,看第二个方法,其实就是包装了MultiMessageHandler功能,增加了多消息处理的功能,以及对心跳消息做了功能增强。
信息交换层Exchange
- 它是Transport的上一层
- 官方文档对这一层的解释是封装请求响应模式,同步转异步,以 Request, Response为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer。
- 这一层的设计意图是什么?它应该算是在信息传输层上又做了部分装饰,为了适应rpc调用的一些需求,比如rpc调用中一次请求只关心它所对应的响应,这个时候只是一个message消息传输过来,是无法区分这是新的请求还是上一个请求的响应,这种类似于幂等性的问题以及rpc异步处理返回结果、内置事件等特性都是在Transport层无法解决满足的,所有在Exchange层讲message分成了request和response两种类型,并且在这两个模型上增加一些系统字段来处理问题。具体我会在下面讲到。而dubbo把一条消息分为了协议头和内容两部分:协议头包括系统字段,例如编号等,内容包括具体请求的参数和响应的结果等。在exchange层中大量逻辑都是基于协议头的
ExchangeChannel
该接口是信息交换通道接口,有四个方法,前两个是发送请求消息,区别就是第二个发送请求有超时的参数,getExchangeHandler方法就是返回一个信息交换处理器,第四个是需要覆写父类的方法。
HeaderExchangeChannel
该类实现了ExchangeChannel,是基于协议头的信息交换通道。
上述属性比较简单,还是放一下这个类的属性是因为该类中有channel属性,也就是说HeaderExchangeChannel是Channel的装饰器,每个实现方法都会调用channel的方法。
该方法是在channel的send方法上加上了request和response模型,最后再调用channel.send,起到了装饰器的作用。
该方法是请求方法,用Request模型把请求内容装饰起来,然后发送一个Request类型的消息,并且返回DefaultFuture实例,DefaultFuture我会在后面讲到。
ExchangeClient
HeaderExchangeClient
该类实现了ExchangeClient接口,是基于协议头的信息交互客户端类,同样它是Client、Channel的适配器。在该类的源码中可以看到所有的实现方法都是调用了client和channel属性的方法。该类主要的作用就是增加了心跳功能,为什么要增加心跳功能呢,对于长连接,一些拔网线等物理层的断开,会导致TCP的FIN消息来不及发送,对方收不到断开事件,那么就需要用到发送心跳包来检测连接是否断开。consumer和provider断开,处理措施不一样,会分别做出重连和关闭通道的操作。
构造函数就是对一些属性初始化设置,优先从url中获取。心跳超时时间小于心跳周期的两倍就抛出异常,意思就是至少重试两次心跳检测。
该方法就是开启心跳。利用心跳定时器来做到定时检测心跳。因为这是信息交换客户端类,所有这里的只是返回包含HeaderExchangeClient对象的不可变列表,因为客户端跟channel是一一对应的,只有这一个该客户端本身的channel需要心跳。
该方法是停止现有心跳,也就是停止定时器,释放空间。
其他方法都是调用channel和client属性的方法。
HeartBeatTask
该类实现了Runnable接口,实现的是心跳任务,里面包含了核心的心跳策略。
后两个属性跟HeaderExchangeClient中的属性含义一样,第一个是该类自己内部的一个接口:
该接口就定义了一个方法,获得需要心跳的通道集合。可想而知,会对集合内的通道都做心跳检测。
@Override
public void run() {
try {
long now = System.currentTimeMillis();
// 遍历所有通道
for (Channel channel : channelProvider.getChannels()) {
// 如果通道关闭了,则跳过
if (channel.isClosed()) {
continue;
}
try {
// 最后一次接收到消息的时间戳
Long lastRead = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP);
// 最后一次发送消息的时间戳
Long lastWrite = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
// 如果最后一次接收或者发送消息到时间到现在的时间间隔超过了心跳间隔时间
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
// 创建一个request
Request req = new Request();
// 设置版本号
req.setVersion(Version.getProtocolVersion());
// 设置需要得到响应
req.setTwoWay(true);
// 设置事件类型,为心跳事件
req.setEvent(Request.HEARTBEAT_EVENT);
// 发送心跳请求
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
}
}
// 如果最后一次接收消息的时间到现在已经超过了超时时间
if (lastRead != null && now - lastRead > heartbeatTimeout) {
logger.warn("Close channel " + channel
+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
// 如果该通道是客户端,也就是请求的服务器挂掉了,客户端尝试重连服务器
if (channel instanceof Client) {
try {
// 重新连接服务器
((Client) channel).reconnect();
} catch (Exception e) {
//do nothing
}
} else {
// 如果不是客户端,也就是是服务端返回响应给客户端,但是客户端挂掉了,则服务端关闭客户端连接
channel.close();
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
} catch (Throwable t) {
logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
}
}
该方法中是心跳机制的核心逻辑。注意以下几个点:
如果需要心跳的通道本身如果关闭了,那么跳过,不添加心跳机制。
无论是接收消息还是发送消息,只要超过了设置的心跳间隔,就发送心跳消息来测试是否断开
如果最后一次接收到消息到到现在已经超过了心跳超时时间,那就认定对方的确断开,分两种情况来处理对方断开的情况。分别是服务端断开,客户端重连以及客户端断开,服务端断开这个客户端的连接。,这里要好好品味一下谁是发送方,谁在等谁的响应,苦苦没有等到。
ResponseFuture
该接口是响应future接口,该接口的设计意图跟java.util.concurrent.Future很类似。发送出去的消息,泼出去的水,只有等到对方主动响应才能得到结果,但是请求方需要去主动回去该请求的结果,就显得有些艰难,所有产生了这样一个接口,它能够获取任务执行结果、可以核对请求消息是否被响应,还能设置回调来支持异步。
DefaultFuture
该类实现了ResponseFuture接口,其中封装了处理响应的逻辑。你可以把DefaultFuture看成是一个中介,买房和卖房都通过这个中介进行沟通,中介拥有着买房者的信息request和卖房者的信息response,并且促成他们之间的买卖。
可以看到,该类的属性包含了request、response、channel三个实例,在该类中,把请求和响应通过唯一的id一一对应起来。做到异步处理返回结果时能给准确的返回给对应的请求。可以看到属性中有两个集合,分别是通道集合和future集合,也就是该类本身也是所有 DefaultFuture 的管理容器
构造函数比较简单,每一个DefaultFuture实例都跟每一个请求一一对应,被存入到集合中管理起来。
该方法是接收响应,也就是某个请求得到了响应,那么代表这次请求任务完成,所有需要把future从集合中移除。具体的接收响应结果在doReceived方法中实现。
可以看到,当接收到响应后,会把等待的线程唤醒,然后执行回调来处理该响应结果。
该方法是执行回调来处理响应结果。分为了三种情况:
响应成功,那么执行完成后的逻辑。
超时,会按照超时异常来处理
其他,按照RuntimeException异常来处理
具体的处理都在ResponseCallback接口的实现类里执行,后面我会讲到。
该方法是实现了ResponseFuture定义的方法,是获得该future对应的请求对应的响应结果,其实future、请求、响应都是一一对应的。其中如果还没得到响应,则会线程阻塞等待,等到有响应结果或者超时,才返回。返回的逻辑在returnFromResponse中实现。
该方法是取消一个请求,可以直接关闭一个请求,也就是值创建一个响应来回应该请求,把response值设置到该请求对于到future中,做到了中断请求的作用。该方法跟closeChannel的区别是closeChannel中对response的状态设置了CHANNEL_INACTIVE,而cancel方法是中途被主动取消的,虽然有response值,但是并没有一个响应状态。
ExchangeHandler
该接口继承了ChannelHandler, TelnetHandler接口,是信息交换处理器接口。