zoukankan      html  css  js  c++  java
  • dubbo事件通知机制(1)

    此文已由作者岳猛授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。


    dubbo事件通知机制:http://dubbo.io/books/dubbo-user-book/demos/events-notify.html

    一、使用方式

    两个服务:

    • DemoService:真正要调用的服务

    • Notify:事件通知服务(用在consumer端)

    provider

    1 package com.alibaba.dubbo.demo;
    2 
    3 public interface DemoService {
    4     String sayHello(String name);
    5 }


    1 public class DemoServiceImpl implements DemoService {
    2     @Override
    3     public String sayHello(String name) {
    4         throw new RpcException("ex, param: " + name);//测试onthrow方法
    5 //        return "Hello " + name;//测试onreturn方法
    6     }
    7 }


    consumer

    通知服务:Notify


    1 package com.alibaba.dubbo.demo.consumer.eventnotify;
    2 
    3 public interface Notify {
    4     void oninvoke(String name); // 调用之前
    5     void onreturnWithoutParam(String result); // 调用之后
    6     void onreturn(String result, String name); // 调用之后
    7     void onthrow(Throwable ex, String name);  // 出现异常
    8 }
      1 package com.alibaba.dubbo.demo.consumer.eventnotify;
     2 
     3 public class NotifyService implements Notify {
     4     @Override
     5     public void oninvoke(String name) {
     6         System.out.println("======oninvoke======, param: " + name);
     7     }
     8 
     9     @Override
    10     public void onreturnWithoutParam(String result) {
    11         System.out.println("======onreturn======, result: " + result);
    12     }
    13 
    14     @Override
    15     public void onreturn(String result, String name) {
    16         System.out.println("======onreturn======, param: " + name + ", result: " + result);
    17     }
    18 
    19     @Override
    20     public void onthrow(Throwable ex, String name) {
    21         System.out.println("======onthrow======, param: " + name + ", exception: " + ex.getMessage());
    22     }
    23 }


    xml配置:

    1     <bean id="notifyService"  class="com.alibaba.dubbo.demo.consumer.eventnotify.NotifyService"/>
    2     <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
    3         <dubbo:method name="sayHello" timeout="60000" oninvoke="notifyService.oninvoke" onreturn="notifyService.onreturnWithoutParam" onthrow="notifyService.onthrow"/>
    4     </dubbo:reference>

    之后就可以运行Consumer启动类,之后调用demoService.sayHello(String name)了。

    注意

    • oninvoke方法:

      • 必须具有与真实的被调用方法sayHello相同的入参列表:例如,oninvoke(String name)

    • onreturn方法:

      • 至少要有一个入参且第一个入参必须与sayHello的返回类型相同,接收返回结果:例如,onreturnWithoutParam(String result)

      • 可以有多个参数,多个参数的情况下,第一个后边的所有参数都是用来接收sayHello入参的:例如, onreturn(String result, String name)

    • onthrow方法:

      • 至少要有一个入参且第一个入参类型为Throwable或其子类,接收返回结果;例如,onthrow(Throwable ex)

      • 可以有多个参数,多个参数的情况下,第一个后边的所有参数都是用来接收sayHello入参的:例如,onthrow(Throwable ex, String name)

    • 如果是consumer在调用provider的过程中,出现异常时不会走onthrow方法的,onthrow方法只会在provider返回的RpcResult中含有Exception对象时,才会执行。(dubbo中下层服务的Exception会被放在响应RpcResult的exception对象中传递给上层服务

    二、源码解析

    整个事件通知的逻辑都在FutureFilter中,来看一下源码:


    1 /**
      2  * EventFilter
      3  */
      4 @Activate(group = Constants.CONSUMER)
      5 public class FutureFilter implements Filter {
      6 
      7     protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);
      8 
      9     public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
     10         final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
     11 
     12         //1 调用服务之前:执行xxxService.oninvoke方法
     13         fireInvokeCallback(invoker, invocation);
     14         //2 调用服务
     15         Result result = invoker.invoke(invocation);
     16         //3 调用服务之后
     17         if (isAsync) {
     18             asyncCallback(invoker, invocation);
     19         } else {
     20             syncCallback(invoker, invocation, result); 21         }
     22         //4 返回调用结果
     23         return result;
     24     }
     25 
     26     private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
     27         if (result.hasException()) {
     28             //3.1 调用服务之后:如果返回结果异常信息(注意:如果是consumer自己throw的异常,会在2的时候直接抛走,不会走到这里),直接执行xxxService.onthrow方法
     29             fireThrowCallback(invoker, invocation, result.getException());
     30         } else {
     31             //3.2 调用服务之后:如果返回值正常,执行xxxService.onreturn方法
     32             fireReturnCallback(invoker, invocation, result.getValue());
     33         }
     34     }
     35 
     36     private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
     37         Future<?> f = RpcContext.getContext().getFuture();
     38         if (f instanceof FutureAdapter) {
     39             ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
     40             // 3.1 调用服务之后:设置回调ResponseCallback对象到DefaultFuture中,当provider返回响应时,执行DefaultFuture.doReceived方法,该方法会调用ResponseCallback对象的done或者caught方法
     41             future.setCallback(new ResponseCallback() {
     42                 public void done(Object rpcResult) {
     43                     if (rpcResult == null) {
     44                         logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
     45                         return;
     46                     }
     47                     ///must be rpcResult
     48                     if (!(rpcResult instanceof Result)) {
     49                         logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
     50                         return;
     51                     }
     52                     Result result = (Result) rpcResult;
     53                     if (result.hasException()) {
     54                         fireThrowCallback(invoker, invocation, result.getException());
     55                     } else {
     56                         fireReturnCallback(invoker, invocation, result.getValue());
     57                     }
     58                 }
     59 
     60                 public void caught(Throwable exception) {
     61                     fireThrowCallback(invoker, invocation, exception);
     62                 }
     63             });
     64         }
     65     }
     66 
     67     /**
     68      * 反射执行xxxService.oninvoke方法:必须具有与真实的被调用方法sayHello相同的入参列表。
     69      */
     70     private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
     71         final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
     72         final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));
     73 
     74         if (onInvokeMethod == null && onInvokeInst == null) {
     75             return;
     76         }
     77         if (onInvokeMethod == null || onInvokeInst == null) {
     78             throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
     79         }
     80         if (onInvokeMethod != null && !onInvokeMethod.isAccessible()) {
     81             onInvokeMethod.setAccessible(true);
     82         }
     83         // 获取真实方法sayHello传入的参数
     84         Object[] params = invocation.getArguments();
     85         try {
     86             onInvokeMethod.invoke(onInvokeInst, params);
     87         } catch (InvocationTargetException e) {
     88             fireThrowCallback(invoker, invocation, e.getTargetException());
     89         } catch (Throwable e) {
     90             fireThrowCallback(invoker, invocation, e);
     91         }
     92     }
     93 
     94     /**
     95      * 反射执行xxxService.onreturn方法:至少要有一个入参,接收返回结果
     96      */
     97     private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
     98         final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
     99         final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));
    100 
    101         //not set onreturn callback
    102         if (onReturnMethod == null && onReturnInst == null) {
    103             return;
    104         }
    105 
    106         if (onReturnMethod == null || onReturnInst == null) {
    107             throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
    108         }
    109         if (onReturnMethod != null && !onReturnMethod.isAccessible()) {
    110             onReturnMethod.setAccessible(true);
    111         }
    112 
    113         Object[] args = invocation.getArguments();
    114         Object[] params;
    115         Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
    116         if (rParaTypes.length > 1) {
    117             // onreturn(xx, Object[]) 两个参数:第一个参数与真实方法sayHello方法返回结果类型相同,第二个接收所有的真实请求参数
    118             if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
    119                 params = new Object[2];
    120                 params[0] = result; // 真实方法的执行结果
    121                 params[1] = args;   // 真实方法sayHello传入的参数
    122             // onreturn(xx, Object... args) 多个参数:第一个参数与真实方法sayHello方法返回结果类型相同,后边几个接收所有的真实请求参数
    123             } else {
    124                 params = new Object[args.length + 1];
    125                 params[0] = result; // 真实方法的执行结果
    126                 System.arraycopy(args, 0, params, 1, args.length);
    127             }
    128         } else {
    129             // onreturn(xx) 只有一个参数:接收返回执行结果
    130             params = new Object[]{result}; // 执行结果
    131         }
    132         try {
    133             onReturnMethod.invoke(onReturnInst, params);
    134         } catch (InvocationTargetException e) {
    135             fireThrowCallback(invoker, invocation, e.getTargetException());
    136         } catch (Throwable e) {
    137             fireThrowCallback(invoker, invocation, e);
    138         }
    139     }
    140 
    141     /**


    免费体验云安全(易盾)内容安全、验证码等服务

    更多网易技术、产品、运营经验分享请点击


    相关文章:
    【推荐】 致传统企业朋友:不够痛就别微服务,有坑 (2)
    【推荐】 当我们在谈论multidex65535时,我们在谈论什么

  • 相关阅读:
    httpcontext in asp.net unit test
    initialize or clean up your unittest within .net unit test
    Load a script file in sencha, supports both asynchronous and synchronous approaches
    classes system in sencha touch
    ASP.NET MVC got 405 error on HTTP DELETE request
    how to run demo city bars using sencha architect
    sencha touch mvc
    sencha touch json store
    sencha touch jsonp
    51Nod 1344:走格子(贪心)
  • 原文地址:https://www.cnblogs.com/163yun/p/9970390.html
Copyright © 2011-2022 走看看