zoukankan      html  css  js  c++  java
  • 9.5 dubbo事件通知机制

    dubbo事件通知机制:http://dubbo.io/books/dubbo-user-book/demos/events-notify.html

    一、使用方式

    两个服务:

    • DemoService:真正要调用的服务
    • Notify:事件通知服务(用在consumer端)

    provider

    1 package com.alibaba.dubbo.demo;
    2 
    3 public interface DemoService {
    4     String sayHello(String name);
    5 }
    1 public class DemoServiceImpl implements DemoService {
    2     @Override
    3     public String sayHello(String name) {
    4         throw new RpcException("ex, param: " + name);//测试onthrow方法
    5 //        return "Hello " + name;//测试onreturn方法
    6     }
    7 }

    consumer

    通知服务:Notify

    1 package com.alibaba.dubbo.demo.consumer.eventnotify;
    2 
    3 public interface Notify {
    4     void oninvoke(String name); // 调用之前
    5     void onreturnWithoutParam(String result); // 调用之后
    6     void onreturn(String result, String name); // 调用之后
    7     void onthrow(Throwable ex, String name);  // 出现异常
    8 }
     1 package com.alibaba.dubbo.demo.consumer.eventnotify;
     2 
     3 public class NotifyService implements Notify {
     4     @Override
     5     public void oninvoke(String name) {
     6         System.out.println("======oninvoke======, param: " + name);
     7     }
     8 
     9     @Override
    10     public void onreturnWithoutParam(String result) {
    11         System.out.println("======onreturn======, result: " + result);
    12     }
    13 
    14     @Override
    15     public void onreturn(String result, String name) {
    16         System.out.println("======onreturn======, param: " + name + ", result: " + result);
    17     }
    18 
    19     @Override
    20     public void onthrow(Throwable ex, String name) {
    21         System.out.println("======onthrow======, param: " + name + ", exception: " + ex.getMessage());
    22     }
    23 }

    xml配置:

    1     <bean id="notifyService"  class="com.alibaba.dubbo.demo.consumer.eventnotify.NotifyService"/>
    2     <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
    3         <dubbo:method name="sayHello" timeout="60000" oninvoke="notifyService.oninvoke" onreturn="notifyService.onreturnWithoutParam" onthrow="notifyService.onthrow"/>
    4     </dubbo:reference>

    之后就可以运行Consumer启动类,之后调用demoService.sayHello(String name)了。

    注意

    • oninvoke方法:
      • 必须具有与真实的被调用方法sayHello相同的入参列表:例如,oninvoke(String name)
    • onreturn方法:
      • 至少要有一个入参且第一个入参必须与sayHello的返回类型相同,接收返回结果:例如,onreturnWithoutParam(String result)
      • 可以有多个参数,多个参数的情况下,第一个后边的所有参数都是用来接收sayHello入参的:例如, onreturn(String result, String name)
    • onthrow方法:
      • 至少要有一个入参且第一个入参类型为Throwable或其子类,接收返回结果;例如,onthrow(Throwable ex)
      • 可以有多个参数,多个参数的情况下,第一个后边的所有参数都是用来接收sayHello入参的:例如,onthrow(Throwable ex, String name)
    • 如果是consumer在调用provider的过程中,出现异常时不会走onthrow方法的,onthrow方法只会在provider返回的RpcResult中含有Exception对象时,才会执行。(dubbo中下层服务的Exception会被放在响应RpcResult的exception对象中传递给上层服务

    二、源码解析

    整个事件通知的逻辑都在FutureFilter中,来看一下源码:

      1 /**
      2  * EventFilter
      3  */
      4 @Activate(group = Constants.CONSUMER)
      5 public class FutureFilter implements Filter {
      6 
      7     protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);
      8 
      9     public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
     10         final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
     11 
     12         //1 调用服务之前:执行xxxService.oninvoke方法
     13         fireInvokeCallback(invoker, invocation);
     14         //2 调用服务
     15         Result result = invoker.invoke(invocation);
     16         //3 调用服务之后
     17         if (isAsync) {
     18             asyncCallback(invoker, invocation);
     19         } else {
     20             syncCallback(invoker, invocation, result);
     21         }
     22         //4 返回调用结果
     23         return result;
     24     }
     25 
     26     private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
     27         if (result.hasException()) {
     28             //3.1 调用服务之后:如果返回结果异常信息(注意:如果是consumer自己throw的异常,会在2的时候直接抛走,不会走到这里),直接执行xxxService.onthrow方法
     29             fireThrowCallback(invoker, invocation, result.getException());
     30         } else {
     31             //3.2 调用服务之后:如果返回值正常,执行xxxService.onreturn方法
     32             fireReturnCallback(invoker, invocation, result.getValue());
     33         }
     34     }
     35 
     36     private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
     37         Future<?> f = RpcContext.getContext().getFuture();
     38         if (f instanceof FutureAdapter) {
     39             ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
     40             // 3.1 调用服务之后:设置回调ResponseCallback对象到DefaultFuture中,当provider返回响应时,执行DefaultFuture.doReceived方法,该方法会调用ResponseCallback对象的done或者caught方法
     41             future.setCallback(new ResponseCallback() {
     42                 public void done(Object rpcResult) {
     43                     if (rpcResult == null) {
     44                         logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
     45                         return;
     46                     }
     47                     ///must be rpcResult
     48                     if (!(rpcResult instanceof Result)) {
     49                         logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
     50                         return;
     51                     }
     52                     Result result = (Result) rpcResult;
     53                     if (result.hasException()) {
     54                         fireThrowCallback(invoker, invocation, result.getException());
     55                     } else {
     56                         fireReturnCallback(invoker, invocation, result.getValue());
     57                     }
     58                 }
     59 
     60                 public void caught(Throwable exception) {
     61                     fireThrowCallback(invoker, invocation, exception);
     62                 }
     63             });
     64         }
     65     }
     66 
     67     /**
     68      * 反射执行xxxService.oninvoke方法:必须具有与真实的被调用方法sayHello相同的入参列表。
     69      */
     70     private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
     71         final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
     72         final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));
     73 
     74         if (onInvokeMethod == null && onInvokeInst == null) {
     75             return;
     76         }
     77         if (onInvokeMethod == null || onInvokeInst == null) {
     78             throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
     79         }
     80         if (onInvokeMethod != null && !onInvokeMethod.isAccessible()) {
     81             onInvokeMethod.setAccessible(true);
     82         }
     83         // 获取真实方法sayHello传入的参数
     84         Object[] params = invocation.getArguments();
     85         try {
     86             onInvokeMethod.invoke(onInvokeInst, params);
     87         } catch (InvocationTargetException e) {
     88             fireThrowCallback(invoker, invocation, e.getTargetException());
     89         } catch (Throwable e) {
     90             fireThrowCallback(invoker, invocation, e);
     91         }
     92     }
     93 
     94     /**
     95      * 反射执行xxxService.onreturn方法:至少要有一个入参,接收返回结果
     96      */
     97     private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
     98         final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
     99         final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));
    100 
    101         //not set onreturn callback
    102         if (onReturnMethod == null && onReturnInst == null) {
    103             return;
    104         }
    105 
    106         if (onReturnMethod == null || onReturnInst == null) {
    107             throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
    108         }
    109         if (onReturnMethod != null && !onReturnMethod.isAccessible()) {
    110             onReturnMethod.setAccessible(true);
    111         }
    112 
    113         Object[] args = invocation.getArguments();
    114         Object[] params;
    115         Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
    116         if (rParaTypes.length > 1) {
    117             // onreturn(xx, Object[]) 两个参数:第一个参数与真实方法sayHello方法返回结果类型相同,第二个接收所有的真实请求参数
    118             if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
    119                 params = new Object[2];
    120                 params[0] = result; // 真实方法的执行结果
    121                 params[1] = args;   // 真实方法sayHello传入的参数
    122             // onreturn(xx, Object... args) 多个参数:第一个参数与真实方法sayHello方法返回结果类型相同,后边几个接收所有的真实请求参数
    123             } else {
    124                 params = new Object[args.length + 1];
    125                 params[0] = result; // 真实方法的执行结果
    126                 System.arraycopy(args, 0, params, 1, args.length);
    127             }
    128         } else {
    129             // onreturn(xx) 只有一个参数:接收返回执行结果
    130             params = new Object[]{result}; // 执行结果
    131         }
    132         try {
    133             onReturnMethod.invoke(onReturnInst, params);
    134         } catch (InvocationTargetException e) {
    135             fireThrowCallback(invoker, invocation, e.getTargetException());
    136         } catch (Throwable e) {
    137             fireThrowCallback(invoker, invocation, e);
    138         }
    139     }
    140 
    141     /**
    142      * 反射执行xxxService.onthrow方法:至少要有一个入参且第一个入参类型为Throwable或其子类,接收返回结果
    143      */
    144     private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
    145         final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
    146         final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));
    147 
    148         //onthrow callback not configured
    149         if (onthrowMethod == null && onthrowInst == null) {
    150             return;
    151         }
    152         if (onthrowMethod == null || onthrowInst == null) {
    153             throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
    154         }
    155         if (onthrowMethod != null && !onthrowMethod.isAccessible()) {
    156             onthrowMethod.setAccessible(true);
    157         }
    158         Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
    159         if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
    160             try {
    161                 Object[] args = invocation.getArguments();
    162                 Object[] params;
    163 
    164                 if (rParaTypes.length > 1) {
    165                     // onthrow(xx, Object[]) 两个参数:第一个参数接收exception,第二个接收所有的真实请求参数
    166                     if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
    167                         params = new Object[2];
    168                         params[0] = exception;
    169                         params[1] = args;
    170                     // onthrow(xx, Object... args) 多个参数:第一个参数接收exception,后边几个接收所有的真实请求参数
    171                     } else {
    172                         params = new Object[args.length + 1];
    173                         params[0] = exception;
    174                         System.arraycopy(args, 0, params, 1, args.length);
    175                     }
    176                 } else {
    177                     // onthrow(xx) 只有一个参数:接收exception
    178                     params = new Object[]{exception};
    179                 }
    180                 onthrowMethod.invoke(onthrowInst, params);
    181             } catch (Throwable e) {
    182                 logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
    183             }
    184         } else {
    185             logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
    186         }
    187     }
    188 }

    从@Activate(group = Constants.CONSUMER)来看FutureFilter只用在consumer端;不管是同步调用还是异步调用,都会走FutureFilter。

    原理:

    • 首先走oninvoke(String name)方法;
    • 然后走sayHello(String name)
    • 最后根据同步还是异步分别走不同的逻辑。 

    其中同步很简单,看sayHello(String name)的返回结果RpcResult中是否有exception对象,如果有,执行onthrow(Throwable ex, String name);如果没有执行onreturnWithoutParam(String result)。

    异步的操作:由于不知道provider什么时候回执行完毕,所以要添加回调等待provider端返回结果后,再执行onthrow(Throwable ex, String name)或者onreturnWithoutParam(String result),这种模式很重要,这是统计异步方法调用时间的一种非常好的模式

    重点看一下异步!

    三、异步回调模式

     1     private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
     2         Future<?> f = RpcContext.getContext().getFuture();
     3         if (f instanceof FutureAdapter) {
     4             ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
     5             // 3.1 调用服务之后:设置回调ResponseCallback对象到DefaultFuture中,当provider返回响应时,执行DefaultFuture.doReceived方法,该方法会调用ResponseCallback对象的done或者caught方法
     6             future.setCallback(new ResponseCallback() {
     7                 public void done(Object rpcResult) {
     8                     if (rpcResult == null) {
     9                         logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
    10                         return;
    11                     }
    12                     ///must be rpcResult
    13                     if (!(rpcResult instanceof Result)) {
    14                         logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
    15                         return;
    16                     }
    17                     Result result = (Result) rpcResult;
    18                     if (result.hasException()) {
    19                         fireThrowCallback(invoker, invocation, result.getException());
    20                     } else {
    21                         fireReturnCallback(invoker, invocation, result.getValue());
    22                     }
    23                 }
    24 
    25                 public void caught(Throwable exception) {
    26                     fireThrowCallback(invoker, invocation, exception);
    27                 }
    28             });
    29         }
    30     }

    上述的future对象是DefaultFuture,这里首先new了一个ResponseCallback回调函数,设置到了DefaultFuture的ResponseCallback callback属性中。来看一下DefaultFuture类:

     1     private volatile Response response;
     2     private volatile ResponseCallback callback;
     3 
     4     public boolean isDone() {
     5         return response != null;
     6     }
     7 
     8     public void setCallback(ResponseCallback callback) {
     9         if (isDone()) {
    10             invokeCallback(callback);
    11         } else {
    12             boolean isdone = false;
    13             lock.lock();
    14             try {
    15                 if (!isDone()) {
    16                     this.callback = callback;
    17                 } else {
    18                     isdone = true;
    19                 }
    20             } finally {
    21                 lock.unlock();
    22             }
    23             if (isdone) {
    24                 invokeCallback(callback);
    25             }
    26         }
    27     }
     1     private void invokeCallback(ResponseCallback c) {
     2         ResponseCallback callbackCopy = c;
     3         if (callbackCopy == null) {
     4             throw new NullPointerException("callback cannot be null.");
     5         }
     6         c = null;
     7         Response res = response;
     8         if (res == null) {
     9             throw new IllegalStateException("response cannot be null. url:" + channel.getUrl());
    10         }
    11 
    12         if (res.getStatus() == Response.OK) {
    13             try {
    14                 // 返回正常,回调ResponseCallback回调函数的done方法
    15                 callbackCopy.done(res.getResult());
    16             } catch (Exception e) {
    17                 logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(), e);
    18             }
    19         } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
    20             try {
    21                 TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
    22                 // 如果超时,回调ResponseCallback回调函数的caught方法
    23                 callbackCopy.caught(te);
    24             } catch (Exception e) {
    25                 logger.error("callback invoke error ,url:" + channel.getUrl(), e);
    26             }
    27         } else {
    28             try {
    29                 RuntimeException re = new RuntimeException(res.getErrorMessage());
    30                 // 其他异常,回调ResponseCallback回调函数的caught方法
    31                 callbackCopy.caught(re);
    32             } catch (Exception e) {
    33                 logger.error("callback invoke error ,url:" + channel.getUrl(), e);
    34             }
    35         }
    36     }

    从setCallback(ResponseCallback callback),如果此时provider端已经返回了响应(response!=null),则直接执行ResponseCallback回调函数中的done方法或者caught方法;否则,将上边创建的ResponseCallback实例赋值给DefaultFuture的ResponseCallback callback属性中。那么之后会在什么时候执行回调函数的方法呢?当consumer接收到provider的响应的时候!

     1     public static void received(Channel channel, Response response) {
     2         try {
     3             DefaultFuture future = FUTURES.remove(response.getId());
     4             if (future != null) {
     5                 future.doReceived(response);
     6             } else {
     7                 logger.warn("The timeout response finally returned at "
     8                         + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
     9                         + ", response " + response
    10                         + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
    11                         + " -> " + channel.getRemoteAddress()));
    12             }
    13         } finally {
    14             CHANNELS.remove(response.getId());
    15         }
    16     }
    17 
    18     private void doReceived(Response res) {
    19         lock.lock();
    20         try {
    21             response = res;
    22             if (done != null) {
    23                 done.signal();
    24             }
    25         } finally {
    26             lock.unlock();
    27         }
    28         // 调用回调函数
    29         if (callback != null) {
    30             invokeCallback(callback);
    31         }
    32     }

    当provider返回响应时,会调用DefaultFuture.received(Channel channel, Response response)方法(9.3 客户端接收响应信息(异步转同步的实现)),此时会执行回调函数。事件通知的源码就分析完了!最后看一个回调模式的使用场景:统计异步方法的调用时间

     1     private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
     2         Future<?> f = RpcContext.getContext().getFuture();
     3         final long start = System.currentTimeMillis();
     4         if (f instanceof FutureAdapter) {
     5             ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
     6             future.setCallback(new ResponseCallback() {
     7                 public void done(Object rpcResult) {
     8                     long cost = System.currentTimeMillis() - start;
     9                 }
    10             });
    11         }
    12     }

    上边的代码只是一个形式,实际上start时间需要在调用sayHello方法之前进行记录。

  • 相关阅读:
    Python使用SMTP模块、email模块发送邮件
    harbor搭建及使用
    ELK搭建-windows
    ELK技术栈之-Logstash详解
    【leetcode】1078. Occurrences After Bigram
    【leetcode】1073. Adding Two Negabinary Numbers
    【leetcode】1071. Greatest Common Divisor of Strings
    【leetcode】449. Serialize and Deserialize BST
    【leetcode】1039. Minimum Score Triangulation of Polygon
    【leetcode】486. Predict the Winner
  • 原文地址:https://www.cnblogs.com/java-zhao/p/8436460.html
Copyright © 2011-2022 走看看