部分描述来自官方文档,本文加以润色,半原创
集群概述
我们先来看一下集群的作用 :
为了处理这些问题,Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker。集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,至于具体调用哪个服务提供者,以及调用失败后如何处理等问题,现在都交给集群模块去处理。集群模块是服务提供者和服务消费者的中间层,为服务消费者屏蔽了服务提供者的情况,这样服务消费者就可以专心处理远程调用相关事宜。比如发请求,接受服务提供者返回的数据等。这就是集群的作用。
集群容错包含多个组件 : Cluster , Cluster Invoker , Directory , Router 和 LoadBalance 等,工作过程如下 :
分为两个阶段 :
- 在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。
- 在服务消费者进行远程调用时,经过路由,负载均衡等组件返回最终的 Invoker ,并调用最终的invoke 方法 。
dubbo 提供以下几种对于 fail 的处理 。
源码分析
分析之前我们先要知道Cluster invoker 和 Cluster 的区别,后者是个接口,而前面只是 invoker 一种实现。 Cluster 接口,可以看到该接口只有一个方法,目的很明确就是生成 invoker .
@SPI(FailoverCluster.NAME) public interface Cluster { /** * Merge the directory invokers to a virtual invoker. * * @param <T> * @param directory * @return cluster invoker * @throws RpcException */ @Adaptive <T> Invoker<T> join(Directory<T> directory) throws RpcException; }
下面看一下 Cluster 的实现类。
public class FailoverCluster extends AbstractCluster { public final static String NAME = "failover"; @Override public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<>(directory); } } public abstract class AbstractCluster implements Cluster { // 最终返回的对象是 InterceptorInvokerNode ,该类是内部类,定义在下面 private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) { AbstractClusterInvoker<T> last = clusterInvoker; List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key); if (!interceptors.isEmpty()) { for (int i = interceptors.size() - 1; i >= 0; i--) { final ClusterInterceptor interceptor = interceptors.get(i); final AbstractClusterInvoker<T> next = last; last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next); } } return last; } // Directory 指的是上节的服务字典(保存着privider的各种信息) @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY)); } //子类实现 protected abstract <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException; //内部类 protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> { @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { //before方法 interceptor.before(next, invocation); //执行 asyncResult = interceptor.intercept(next, invocation); } catch (Exception e) { // onError callback if (interceptor instanceof ClusterInterceptor.Listener) { ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; listener.onError(e, clusterInvoker, invocation); } throw e; } finally { //最终会执行 after 方法 interceptor.after(next, invocation); } return asyncResult.whenCompleteWithContext((r, t) -> { // onResponse callback if (interceptor instanceof ClusterInterceptor.Listener) { ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; if (t == null) { listener.onMessage(r, clusterInvoker, invocation); } else { listener.onError(t, clusterInvoker, invocation); } } }); } ... } }
可以看到 AbstractCluster 很巧妙地把返回 invoker 的操作留给子类实现,然后调用 buildClusterInterceptors 方法返回一个经过封装带有拦截器的 invoker 子类,我们可以看到该 invoker 的 invoke 方法出现的 before 和 after 钩子函数,就像 Spring 的切面编程,从而实现拦截器的功能。
ok,下面就看一下 cluster Invoker 的实现。
Cluster Invoker 源码分析
前面说过,集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,这个在服务引用那篇文章中分析过,就不赘述。第二个阶段是在服务消费者进行远程调用时,此时 AbstractClusterInvoker 的 invoke 方法会被调用。列举 Invoker,负载均衡等操作均会在此阶段被执行。因此下面先来看一下 invoke 方法的逻辑。
public abstract class AbstractClusterInvoker<T> implements Invoker<T> { @Override public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); // 绑定 attachments 到 invocation 中. // binding attachments into invocation. Map<String, Object> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } //可用的 invokers 取出来 List<Invoker<T>> invokers = list(invocation); //负载均衡选择 LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); //子类方法实现 return doInvoke(invocation, invokers, loadbalance); } ... protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException; } ================================================================ // FailoverClusterInvoker 的 doInvoke 方法实现 @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); // for 重试 for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { checkWhetherDestroyed(); copyInvokers = list(invocation); // check again checkInvokers(copyInvokers, invocation); } //选择最终的 invoker , select 定义在父类中 Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { //最终调用 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { //记录提供者信息,以便抛出异常可以排查 providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le); }
这里我们知道了大概的思路,虽然 FailoverClusterInvoker 也叫 invoker , 但是最终可以远程调用的 invoker 是在 Directory (服务字典)中, FailoverClusterInvoker 只是做了个封装。
我们看一下父类的 select 方法是如何实现获取最终的 invoker 的 。
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (CollectionUtils.isEmpty(invokers)) { return null; } String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName(); boolean sticky = invokers.get(0).getUrl() .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY); //ignore overloaded method if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null; } //ignore concurrency problem if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected); if (sticky) { stickyInvoker = invoker; } return invoker; } private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (CollectionUtils.isEmpty(invokers)) { return null; } if (invokers.size() == 1) { return invokers.get(0); } // 调用 loadbalance 的 select 方法 Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); //If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect. if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { //重新选择的逻辑 Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if (rInvoker != null) { invoker = rInvoker; } else { //Check the index of current selected invoker, if it's not the last one, choose the one at index+1. int index = invokers.indexOf(invoker); try { //Avoid collision invoker = invokers.get((index + 1) % invokers.size()); } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t); } } return invoker; }
其中 select 方法,主要的目的 :
select 方法的主要逻辑集中在了对粘滞连接特性的支持上。首先是获取 sticky 配置,然后再检测 invokers 列表中是否包含 stickyInvoker,如果不包含,则认为该 stickyInvoker 不可用,此时将其置空。这里的 invokers 列表可以看做是存活着的服务提供者列表,如果这个列表不包含 stickyInvoker,那自然而然的认为 stickyInvoker 挂了,所以置空。如果 stickyInvoker 存在于 invokers 列表中,此时要进行下一项检测 — 检测 selected 中是否包含 stickyInvoker。如果包含的话,说明 stickyInvoker 在此之前没有成功提供服务(但其仍然处于存活状态)。此时我们认为这个服务不可靠,不应该在重试期间内再次被调用,因此这个时候不会返回该 stickyInvoker。如果 selected 不包含 stickyInvoker,此时还需要进行可用性检测,比如检测服务提供者网络连通性等。当可用性检测通过,才可返回 stickyInvoker,否则调用 doSelect 方法选择 Invoker。如果 sticky 为 true,此时会将 doSelect 方法选出的 Invoker 赋值给 stickyInvoker。
接着是分析 doSelect 方法,我们先看一下方法上的注解 :
Select a invoker using loadbalance policy. a) Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or, if this invoker is unavailable, then continue step b (reselect), otherwise return the first selected invoker b) Reselection, the validation rule for reselection: selected > available. This rule guarantees that the selected invoker has the minimum chance to be one in the previously selected list, and also guarantees this invoker is available.
上面的注解已经说明了该方法执行的逻辑,简单点说就是获取之前一直成功的 invoker ,否则进行 reselect , reselect 过程中会避开之前失败的节点。
FailbackClusterInvoker
@Override protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { Invoker<T> invoker = null; try { checkInvokers(invokers, invocation); invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e); //失败后,加入一个任务,用于定时重试 addFailed(loadbalance, invocation, invokers, invoker); return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore } } private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) { //Timer类 if (failTimer == null) { synchronized (this) { if (failTimer == null) { failTimer = new HashedWheelTimer( new NamedThreadFactory("failback-cluster-timer", true), 1, TimeUnit.SECONDS, 32, failbackTasks); } } } RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD); try { // 默认 5 秒执行一次 failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS); } catch (Throwable e) { logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage()); } } /** * RetryTimerTask 内部类 */ private class RetryTimerTask implements TimerTask { private final Invocation invocation; private final LoadBalance loadbalance; private final List<Invoker<T>> invokers; private final int retries; private final long tick; private Invoker<T> lastInvoker; private int retryTimes = 0; RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) { this.loadbalance = loadbalance; this.invocation = invocation; this.invokers = invokers; this.retries = retries; this.tick = tick; this.lastInvoker=lastInvoker; } @Override public void run(Timeout timeout) { try { //重新选择,重新调用 Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker)); lastInvoker = retryInvoker; retryInvoker.invoke(invocation); } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); if ((++retryTimes) >= retries) { logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation); } else { //重新放入执行 rePut(timeout); } } } private void rePut(Timeout timeout) { if (timeout == null) { return; } Timer timer = timeout.timer(); if (timer.isStop() || timeout.isCancelled()) { return; } timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS); } }
FailbackClusterInvoker 利用一个 Timer 去执行重试操作,逻辑没什么复杂的。其他的 invoker 不再分析了。
使用示例
下面展示failback 的集群失败策略。 provider 端
public interface DemoService { String sayHello(String name) throws Exception; } public class DemoServiceImpl implements DemoService { private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class); int count = 0 ; @Override public String sayHello(String name) throws Exception{ logger.info("Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress()); logger.info("我被调用啦 + " + count++); try { Thread.sleep(8000); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress(); } ... }
提供者的配置
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <dubbo:application name="demo-provider"/> <dubbo:registry address="zookeeper://127.0.0.1:2181"/> <dubbo:protocol name="dubbo"/> <bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/> <dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failback" /> </beans>
消费端
public class Application { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml"); context.start(); try { DemoService demoService = context.getBean("demoService", DemoService.class); String s = demoService.sayHello("aaa"); System.out.println("s : " + s); System.out.println("进入等待 : "); Thread.sleep(1000 * 50); } catch (Exception e) { Thread.sleep(1000 * 50); } } }
然后看一下提供端的日志:
[13/04/20 16:38:30:713 CST] DubboServerHandler-192.168.10.27:20880-thread-2 INFO provider.DemoServiceImpl: Hello aaa, request from consumer: /192.168.10.27:8693 [13/04/20 16:38:30:713 CST] DubboServerHandler-192.168.10.27:20880-thread-2 INFO provider.DemoServiceImpl: 我被调用啦 + 0 [13/04/20 16:38:37:544 CST] DubboServerHandler-192.168.10.27:20880-thread-3 INFO provider.DemoServiceImpl: Hello aaa, request from consumer: /192.168.10.27:8693 [13/04/20 16:38:37:544 CST] DubboServerHandler-192.168.10.27:20880-thread-3 INFO provider.DemoServiceImpl: 我被调用啦 + 1 [13/04/20 16:38:44:544 CST] DubboServerHandler-192.168.10.27:20880-thread-4 INFO provider.DemoServiceImpl: Hello aaa, request from consumer: /192.168.10.27:8693 [13/04/20 16:38:44:544 CST] DubboServerHandler-192.168.10.27:20880-thread-4 INFO provider.DemoServiceImpl: 我被调用啦 + 2 [13/04/20 16:38:51:545 CST] DubboServerHandler-192.168.10.27:20880-thread-5 INFO provider.DemoServiceImpl: Hello aaa, request from consumer: /192.168.10.27:8693 [13/04/20 16:38:51:545 CST] DubboServerHandler-192.168.10.27:20880-thread-5 INFO provider.DemoServiceImpl: 我被调用啦 + 3
我们发现共重试了3次,然后消费者就丢弃了这个调用,不再调用,这种适用于异步消息通知的调用。
总结
dubbo对于集群内调用失败的几种处置方式
参考资料
- https://dubbo.apache.org/zh-cn/blog/dubbo-cluster-error-handling.html