zoukankan      html  css  js  c++  java
  • 13.1 dubbo服务降级源码解析

    从 9.1 客户端发起请求源码 的客户端请求总体流程图中,截取部分如下:

    //代理发出请求
    proxy0.sayHello(String paramString)
    -->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)
      -->new RpcInvocation(method, args)
      -->MockClusterInvoker.invoke(Invocation invocation)//服务降级的地方

    dubbo就是通过MockClusterInvoker来实现服务降级的。

    一、示例

    1 public interface DemoService {
    2 //    String sayHello(String name);
    3     Car sayHello(String name);
    4 }

    将dubbo-demo中的服务接口定义一个返回模型Car。提供者实现如下:

    1 public class DemoServiceImpl implements DemoService {
    2     public Car sayHello(String name) {
    3         Car car = new Car();
    4         car.setCarNum("浙A10000");
    5         car.setGoMile(100);
    6         return car;
    7     }
    8 }

    消费者使用如下:

     1 public class Consumer {
     2     public static void main(String[] args) {
     3         ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
     4         context.start();
     5         DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
     6 
     7         while (true) {
     8             try {
     9                 Thread.sleep(1000);
    10                 Car hello = demoService.sayHello("world"); // call remote method
    11                 System.out.println(hello.getCarNum() + "-" + hello.getGoMile()); // get result
    12             } catch (Throwable throwable) {
    13                 throwable.printStackTrace();
    14             }
    15         }
    16     }
    17 }

    二、使用方式 

    实际使用中,会通过直接在dubbo-admin中设置服务降级策略,这里使用dubbo用户手册中的方式来更清晰的看一下服务降级的配置(实际上就是进行配置覆盖)

    配置规则

    1、使用自定义mock类(接口名+Mock)

    • mock = default => DemoServiceMock
    • mock = true => DemoServiceMock
    • mock = fail => DemoServiceMock
    • mock = force => DemoServiceMock

    2、先普通执行,执行失败之后再执行相应的mock逻辑

    • mock = fail:throw => throw new RpcException(" mocked exception for Service degradation. ");
    • mock = fail:throw XxxException => throw new RpcException(RpcException.BIZ_EXCEPTION, XxxException);
    • mock = fail:return => return null
    • mock = fail:return xxx => return xxx
    • mock = fail:return empty => return new Car()

    3、直接执行相应的mock逻辑

    • mock = force:throw => throw new RpcException(" mocked exception for Service degradation. ");
    • mock = force:throw XxxException => throw new RpcException(RpcException.BIZ_EXCEPTION, XxxException);
    • mock = force:return => return null
    • mock = force:return xxx => return xxx
    • mock = force:return empty => return new Car()

    进行配置:

     1 public class DegradeTest {
     2     public static void main(String[] args) {
     3         RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
     4         Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.211.55.5:2181"));
     5         // return null;
     6         registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:return"));
     7         registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:return+null"));
     8         // return 空对象;
     9         registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:return+empty"));
    10         // return value;
    11         registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:return+hello"));
    12         // throw exception
    13         registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:throw"));
    14         // throw custom-msg exception
    15         registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:throw+com.alibaba.dubbo.Test.MyRuntimeException"));
    16         // 执行mock类
    17         registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:com.alibaba.dubbo.demo.DemoServiceMock"));
    18     }
    19 }

    上述需要注意的是需要配置为“force:return+null”的格式而非“force:return null”。(实际上空格的url encode就是+号),上述代码的执行,实际上是在zk上创建configurators的子节点:

    关于覆盖配置:http://dubbo.io/books/dubbo-user-book/demos/config-rule.html

    • override:// 表示数据采用覆盖方式,支持 override 和 absent,可扩展,必填。
    • 0.0.0.0 表示对所有 IP 地址生效,如果只想覆盖某个 IP 的数据,请填入具体 IP,必填。
    • com.alibaba.dubbo.demo.DemoService表示只对指定服务生效,必填。
    • category=configurators 表示该数据为动态配置类型,必填。
    • dynamic=false 表示该数据为持久数据,当注册方退出时,数据依然保存在注册中心,必填。
    • enabled=true 覆盖规则是否生效,可不填,缺省生效。
    • application=demo-consumer 表示只对指定应用生效,可不填,表示对所有应用生效。
    • mock=force:return+null表示将满足以上条件的 mock 参数的值覆盖为 force:return+null。如果想覆盖其它参数,直接加在 override 的 URL 参数上。

    三、源码分析

     1 public class MockClusterInvoker<T> implements Invoker<T> {
     2     private final Directory<T> directory; //RegistryDirectory:存储invoker列表
     3     private final Invoker<T> invoker; //FailoverClusterInvoker:容错策略
     4 
     5     public Result invoke(Invocation invocation) throws RpcException {
     6         Result result = null;
     7 
     8         String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
     9         if (value.length() == 0 || value.equalsIgnoreCase("false")) {
    10             //no mock
    11             result = this.invoker.invoke(invocation);
    12         } else if (value.startsWith("force")) {
    13             ...
    14             //force:direct mock
    15             result = doMockInvoke(invocation, null);
    16         } else {
    17             //fail-mock
    18             try {
    19                 result = this.invoker.invoke(invocation);
    20             } catch (RpcException e) {
    21                 if (e.isBiz()) {
    22                     throw e;
    23                 } else {
    24                     ...
    25                     result = doMockInvoke(invocation, e);
    26                 }
    27             }
    28         }
    29         return result;
    30     }
    31 }

    首先去获取mock参数,

    • 如果没有配置,则直接使用FailoverClusterInvoker去正常的向provider发出请求;
    • 如果配置为以force开头的,则直接执行doMockInvoke(Invocation invocation, RpcException e),不再向provider发送请求;
    • 如果配置为以fail开头的,则先使用FailoverClusterInvoker去正常的向provider发出请求,如果失败抛出了非业务异常,则执行doMockInvoke(Invocation invocation, RpcException e);
     1     private Result doMockInvoke(Invocation invocation, RpcException e) {
     2         Result result = null;
     3         Invoker<T> minvoker;
     4 
     5         List<Invoker<T>> mockInvokers = selectMockInvoker(invocation); //获取mock类型的Invoker
     6         if (mockInvokers == null || mockInvokers.size() == 0) {
     7             minvoker = (Invoker<T>) new MockInvoker(directory.getUrl()); //如果没有配置mock类型的Invoker,则自己创建一个MockInvoker
     8         } else {
     9             minvoker = mockInvokers.get(0);
    10         }
    11         try {
    12             result = minvoker.invoke(invocation); //执行MockInvoker的invoke(Invocation invocation)方法
    13         } catch (RpcException me) {
    14             if (me.isBiz()) {
    15                 result = new RpcResult(me.getCause());
    16             } else { //非业务异常
    17                 throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
    18             }
    19         } catch (Throwable me) {
    20             throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
    21         }
    22         return result;
    23     }

    从RegistryDirectory中获取MockInvoker:

     1     /**
     2      * Return MockInvoker
     3      * Contract:
     4      * directory.list() will return a list of normal invokers if Constants.INVOCATION_NEED_MOCK is present in invocation, otherwise, a list of mock invokers will return.
     5      * if directory.list() returns more than one mock invoker, only one of them will be used.
     6      *
     7      * @param invocation
     8      * @return
     9      */
    10     private List<Invoker<T>> selectMockInvoker(Invocation invocation) {
    11         List<Invoker<T>> invokers = null;
    12         //TODO generic invoker?
    13         if (invocation instanceof RpcInvocation) {
    14             //Note the implicit contract (although the description is added to the interface declaration, but extensibility is a problem. The practice placed in the attachement needs to be improved)
    15             ((RpcInvocation) invocation).setAttachment(Constants.INVOCATION_NEED_MOCK, Boolean.TRUE.toString());
    16             //directory will return a list of normal invokers if Constants.INVOCATION_NEED_MOCK is present in invocation, otherwise, a list of mock invokers will return.
    17             try {
    18                 invokers = directory.list(invocation);
    19             } catch (RpcException e) {
    20                 if (logger.isInfoEnabled()) {
    21                     logger.info("Exception when try to invoke mock. Get mock invokers error for service:"
    22                             + directory.getUrl().getServiceInterface() + ", method:" + invocation.getMethodName()
    23                             + ", will contruct a new mock with 'new MockInvoker()'.", e);
    24                 }
    25             }
    26         }
    27         return invokers;
    28     }

    首先使用RegistryDirectory获取出方法名为sayHello的Invoker列表,之后使用MockInvokersSelector(Router)选取出MockInvoker。

     1 public class MockInvokersSelector implements Router {
     2 
     3     public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
     4                                       URL url, final Invocation invocation) throws RpcException {
     5         if (invocation.getAttachments() == null) {
     6             return getNormalInvokers(invokers);
     7         } else {
     8             String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
     9             if (value == null)
    10                 return getNormalInvokers(invokers);
    11             else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
    12                 return getMockedInvokers(invokers);
    13             }
    14         }
    15         return invokers;
    16     }
    17 
    18     private <T> List<Invoker<T>> getMockedInvokers(final List<Invoker<T>> invokers) {
    19         if (!hasMockProviders(invokers)) {
    20             return null;
    21         }
    22         List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(1);
    23         for (Invoker<T> invoker : invokers) {
    24             if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
    25                 sInvokers.add(invoker);
    26             }
    27         }
    28         return sInvokers;
    29     }
    30 
    31     private <T> boolean hasMockProviders(final List<Invoker<T>> invokers) {
    32         boolean hasMockProvider = false;
    33         for (Invoker<T> invoker : invokers) {
    34             if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
    35                 hasMockProvider = true;
    36                 break;
    37             }
    38         }
    39         return hasMockProvider;
    40     }
    41 }

    这里获取到的是空列表。

    所以会先创建一个MockInvoker对象,之后执行其invoker方法。

    MockInvoker:

     1     public Result invoke(Invocation invocation) throws RpcException {
     2         String mock = getUrl().getParameter(invocation.getMethodName() + "." + Constants.MOCK_KEY); //sayHello.mock
     3         if (invocation instanceof RpcInvocation) {
     4             ((RpcInvocation) invocation).setInvoker(this);
     5         }
     6         if (StringUtils.isBlank(mock)) {
     7             mock = getUrl().getParameter(Constants.MOCK_KEY); //mock
     8         }
     9 
    10         if (StringUtils.isBlank(mock)) {
    11             throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url));
    12         }
    13         mock = normallizeMock(URL.decode(mock));
    14         if (Constants.RETURN_PREFIX.trim().equalsIgnoreCase(mock.trim())) { // return
    15             RpcResult result = new RpcResult();
    16             result.setValue(null);
    17             return result;
    18         } else if (mock.startsWith(Constants.RETURN_PREFIX)) { // return value(包括return null)
    19             mock = mock.substring(Constants.RETURN_PREFIX.length()).trim();
    20             mock = mock.replace('`', '"');
    21             try {
    22                 Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
    23                 Object value = parseMockValue(mock, returnTypes);
    24                 return new RpcResult(value);
    25             } catch (Exception ew) {
    26                 throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: " + url, ew);
    27             }
    28         } else if (mock.startsWith(Constants.THROW_PREFIX)) { // throw xxx
    29             mock = mock.substring(Constants.THROW_PREFIX.length()).trim();
    30             mock = mock.replace('`', '"');
    31             if (StringUtils.isBlank(mock)) {// throw
    32                 throw new RpcException(" mocked exception for Service degradation. ");
    33             } else { // user customized class : throw xxx
    34                 Throwable t = getThrowable(mock);
    35                 throw new RpcException(RpcException.BIZ_EXCEPTION, t);
    36             }
    37         } else { //impl mock: 自定义mock类
    38             try {
    39                 Invoker<T> invoker = getInvoker(mock);
    40                 return invoker.invoke(invocation);
    41             } catch (Throwable t) {
    42                 throw new RpcException("Failed to create mock implemention class " + mock, t);
    43             }
    44         }
    45     }

    首先获取到mock配置,例如:mock=force:return+null,之后进行url解码为mock=force:return null,最后进行处理为mock=return null,然后根据规则走分支。

    mock参数的处理函数:

     1     /**
     2      * 一、使用自定义mock类
     3      * mock = default => DemoServiceMock
     4      * mock = true => DemoServiceMock
     5      * mock = fail => DemoServiceMock
     6      * mock = force => DemoServiceMock
     7      *
     8      * 二、先普通执行,执行失败之后再执行相应的mock逻辑
     9      * mock = fail:throw => throw new RpcException(" mocked exception for Service degradation. ");
    10      * mock = fail:throw XxxException => throw new RpcException(RpcException.BIZ_EXCEPTION, XxxException);
    11      * mock = fail:return => return null
    12      * mock = fail:return xxx => return xxx
    13      *
    14      * 三、直接执行相应的mock逻辑
    15      * mock = force:throw => throw new RpcException(" mocked exception for Service degradation. ");
    16      * mock = force:throw XxxException => throw new RpcException(RpcException.BIZ_EXCEPTION, XxxException);
    17      * mock = force:return => return null
    18      * mock = force:return xxx => return xxx
    19      *
    20      * @param mock
    21      * @return
    22      */
    23     private String normallizeMock(String mock) {
    24         if (mock == null || mock.trim().length() == 0) {
    25             return mock;
    26         } else if (ConfigUtils.isDefault(mock) || "fail".equalsIgnoreCase(mock.trim()) || "force".equalsIgnoreCase(mock.trim())) {
    27             mock = url.getServiceInterface() + "Mock";
    28         }
    29         if (mock.startsWith(Constants.FAIL_PREFIX)) {
    30             mock = mock.substring(Constants.FAIL_PREFIX.length()).trim();
    31         } else if (mock.startsWith(Constants.FORCE_PREFIX)) {
    32             mock = mock.substring(Constants.FORCE_PREFIX.length()).trim();
    33         }
    34         return mock;
    35     }

    我们这里来看一下自定义mock类。消费端编写:

     1 public class DemoServiceMock implements DemoService {
     2 
     3     @Override
     4     public Car sayHello(String name) {
     5         Car car = new Car();
     6         car.setCarNum("mock中");
     7         car.setGoMile(666);
     8         return car;
     9     }
    10 }

    配置覆盖:

    1         registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:com.alibaba.dubbo.demo.DemoServiceMock"));

    MockInvoker.invoke

    1             try {
    2                 Invoker<T> invoker = getInvoker(mock);
    3                 return invoker.invoke(invocation);
    4             } catch (Throwable t) {
    5                 throw new RpcException("Failed to create mock implemention class " + mock, t);
    6             }
     1     private Invoker<T> getInvoker(String mockService) {
     2         Invoker<T> invoker = (Invoker<T>) mocks.get(mockService);
     3         if (invoker != null) {
     4             return invoker;
     5         } else {
     6             Class<T> serviceType = (Class<T>) ReflectUtils.forName(url.getServiceInterface());
     7             if (ConfigUtils.isDefault(mockService)) {
     8                 mockService = serviceType.getName() + "Mock";
     9             }
    10 
    11             Class<?> mockClass = ReflectUtils.forName(mockService);
    12             if (!serviceType.isAssignableFrom(mockClass)) {
    13                 throw new IllegalArgumentException("The mock implemention class " + mockClass.getName() + " not implement interface " + serviceType.getName());
    14             }
    15 
    16             if (!serviceType.isAssignableFrom(mockClass)) {
    17                 throw new IllegalArgumentException("The mock implemention class " + mockClass.getName() + " not implement interface " + serviceType.getName());
    18             }
    19             try {
    20                 T mockObject = (T) mockClass.newInstance(); // 获取自定义mock类实例
    21                 invoker = proxyFactory.getInvoker(mockObject, (Class<T>) serviceType, url); // 和普通类一样创建Invoker
    22                 if (mocks.size() < 10000) {
    23                     mocks.put(mockService, invoker);
    24                 }
    25                 return invoker;
    26             } catch (InstantiationException e) {
    27                 throw new IllegalStateException("No such empty constructor "public " + mockClass.getSimpleName() + "()" in mock implemention class " + mockClass.getName(), e);
    28             } catch (IllegalAccessException e) {
    29                 throw new IllegalStateException(e);
    30             }
    31         }
    32     }

    上边看了return和自定义mock类,最后来看一下throw异常。

    默认抛出RpcException,异常信息:mocked exception for Service degradation. 也可以自定义异常,例如:

    1 public class MyRuntimeException extends RuntimeException {
    2     private String msg;
    3 
    4     public MyRuntimeException(String msg){
    5         this.msg = msg;
    6     }
    7 }

    自定义异常必须具有单参构造器且参数为String。

    配置覆盖:

    MockInvoker.invoke

    1 registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:throw+com.alibaba.dubbo.Test.MyRuntimeException"));
     1     private Throwable getThrowable(String throwstr) {
     2         Throwable throwable = (Throwable) throwables.get(throwstr);
     3         if (throwable != null) {
     4             return throwable;
     5         } else {
     6             Throwable t = null;
     7             try {
     8                 Class<?> bizException = ReflectUtils.forName(throwstr);
     9                 Constructor<?> constructor;
    10                 constructor = ReflectUtils.findConstructor(bizException, String.class);
    11                 t = (Throwable) constructor.newInstance(new Object[]{" mocked exception for Service degradation. "});
    12                 if (throwables.size() < 1000) {
    13                     throwables.put(throwstr, t);
    14                 }
    15             } catch (Exception e) {
    16                 throw new RpcException("mock throw error :" + throwstr + " argument error.", e);
    17             }
    18             return t;
    19         }
    20     }

    服务降级结束!!!

  • 相关阅读:
    centos 7 nginx 安装
    搭建Nuget.Server push时,"Failed to process request. 'Method Not Allowed'"
    Failed to create prime the NuGet cache
    Centos 7 安装 Visual stdio Code
    diskpart 格式化u盘 制作u盘启动盘方法
    sql server 2012 数据库日志文件过大,怎么缩小?
    浏览器同源政策及其规避方法
    redis底层数据结构--简单动态字符串 链表 字典 跳跃表 整数集合 压缩列表
    php的闭包
    hash一致性算法
  • 原文地址:https://www.cnblogs.com/java-zhao/p/8320519.html
Copyright © 2011-2022 走看看