zoukankan      html  css  js  c++  java
  • Guava 12:Guava EventBus源码剖析

    一、架构速读

    传统上,Java的进程内事件分发都是通过发布者和订阅者之间的显式注册实现的。设计EventBus就是为了取代这种显示注册方式,使组件间有了更好的解耦。EventBus不是通用型的发布-订阅实现,不适用于进程间通信。

    架构图如下:

    二、简单使用

    步骤如下:

    1.构造一个事件总线
    2.构造一个事件监听器
    3.把事件监听器注册到事件总线上
    4.事件总线发布事件,触发监听器方法

    主测试类如下:

     1 package guava.eventbus;
     2 
     3 import com.google.common.eventbus.EventBus;
     4 
     5 /**
     6  * @Description 主测试类
     7  * @author denny
     8  * @date 2018/7/18 上午9:54
     9  */
    10 public class MainTest {
    11 
    12 
    13     public static void main(String[] args) {
    14         // 1.构造一个事件总线
    15         EventBus eventBus = new EventBus("test");
    16 
    17         // 2.构造一个事件监听器
    18         EventListener listener = new EventListener();
    19 
    20         // 3.把事件监听器注册到事件总线上
    21         eventBus.register(listener);
    22 
    23         // 4.事件总线发布事件,触发监听器方法
    24         eventBus.post(new TestEvent1(1));
    25         eventBus.post(new TestEvent2(2));
    26         // 事件3是事件2子类,虽然监听器只订阅了父类事件2,一样可以监听到子类
    27         eventBus.post(new TestEvent3(3));
    28         // 未被订阅的事件,用DeadEvent可订阅
    29         eventBus.post(new TestEvent4(4));
    30     }
    31 }

    事件监听器如下:

     1 package guava.eventbus;
     2 
     3 import com.alibaba.fastjson.JSON;
     4 import com.google.common.eventbus.DeadEvent;
     5 import com.google.common.eventbus.Subscribe;
     6 
     7 /**
     8  * @Description 事件监听器
     9  * @author denny
    10  * @date 2018/7/18 上午9:53
    11  */
    12 public class EventListener {
    13 
    14     private int message = 0;
    15 
    16     /**
    17      * @Description 订阅事件1
    18      * @param event 事件1
    19      * @return void
    20      * @author denny
    21      * @date 2018/7/18 上午9:46
    22      */
    23     @Subscribe
    24     public void onEvent1(TestEvent1 event) {
    25         message = event.getMessage();
    26         System.out.println("EventListener onEvent1 监听器接收到消息:"+message);
    27     }
    28 
    29     /**
    30      * @Description 订阅事件2
    31      * @param event 事件2
    32      * @return void
    33      * @author denny
    34      * @date 2018/7/18 上午9:59
    35      */
    36     @Subscribe
    37     public void onEvent2(TestEvent2 event) {
    38         message = event.getMessage();
    39         System.out.println("EventListener onEvent2 监听器接收到消息:"+message);
    40     }
    41 
    42     /**
    43      * @Description 死亡事件(该事件没有被订阅会触发)
    44      * @param event 未订阅事件
    45      * @return void
    46      * @author denny
    47      * @date 2018/7/18 上午9:59
    48      */
    49     @Subscribe
    50     public void onDeadEvent(DeadEvent event) {
    51         System.out.println("EventListener DeadEvent 有消费没有被订阅!!!!event="+ event.toString());
    52     }
    53 }

    事件类:

     1 package guava.eventbus;
     2 
     3 /**
     4  * @Description 事件1
     5  * @author denny
     6  * @date 2018/7/18 上午9:54
     7  */
     8 public class TestEvent1 {
     9 
    10     private final int message;
    11 
    12     /**
    13      * 构造方法
    14      * @param message
    15      */
    16     public TestEvent1(int message) {
    17         this.message = message;
    18         System.out.println("TestEvent1 事件message:"+message);
    19     }
    20 
    21     public int getMessage() {
    22         return message;
    23     }
    24 }
    25 
    26 
    27 /**
    28  * @Description 事件2
    29  * @author denny
    30  * @date 2018/7/18 上午9:54
    31  */
    32 public class TestEvent2 {
    33 
    34     private final int message;
    35 
    36     /**
    37      * 构造方法
    38      * @param message
    39      */
    40     public TestEvent2(int message) {
    41         this.message = message;
    42         System.out.println("TestEvent2 事件message:"+message);
    43     }
    44 
    45     public int getMessage() {
    46         return message;
    47     }
    48 }
    49 
    50 /**
    51  * @Description 事件3
    52  * @author denny
    53  * @date 2018/7/18 上午9:54
    54  */
    55 public class TestEvent3 extends TestEvent2{
    56 
    57     private final int message;
    58 
    59     /**
    60      * 构造方法
    61      * @param message
    62      */
    63     public TestEvent3(int message) {
    64         super(message);
    65         this.message = message;
    66         System.out.println("TestEvent2 事件message:"+message);
    67     }
    68 
    69     @Override
    70     public int getMessage() {
    71         return message;
    72     }
    73 }
    74 
    75 /**
    76  * @Description 事件4
    77  * @author denny
    78  * @date 2018/7/18 上午9:54
    79  */
    80 public class TestEvent4 {
    81 
    82     private final int message;
    83 
    84     /**
    85      * 构造方法
    86      * @param message
    87      */
    88     public TestEvent4(int message) {
    89         this.message = message;
    90         System.out.println("TestEvent4 事件message:"+message);
    91     }
    92 
    93     public int getMessage() {
    94         return message;
    95     }
    96 }

    运行结果如下:

    TestEvent1 事件message:1
    EventListener onEvent1 监听器接收到消息:1 ---》触发订阅的事件1
    TestEvent2 事件message:2
    EventListener onEvent2 监听器接收到消息:2---》触发订阅的事件2(一个监听器可以订阅多个事件)
    TestEvent2 事件message:3
    TestEvent2 事件message:3
    EventListener onEvent2 监听器接收到消息:3---》订阅事件2,可触发订阅子类事件3
    TestEvent4 事件message:4
    EventListener DeadEvent 有消费没有被订阅!!!!event="DeadEvent{source=EventBus{test}, event=guava.eventbus.TestEvent4@19e1023e}"---》事件4没有被订阅,触发DeadEvent死亡事件。

    注意:

    1.事件总线EventBus

    不提供单列,用户自己看着用~

    2.监听器Listener

    1)监听器使用@Subscribe标记的方法(参数为自定义事件),即可实现事件的监听。要监听多个事件,就写多个方法(每个方法都用@Subscribe标记)即可。

    2)注意一定要把Listener注册到eventbus上。

    三、源码剖析

    源码版本为:guava-22.0.jar,先来回顾下第一节的样例代码:

     1 public static void main(String[] args) {
     2         // 1.构造一个事件总线
     3         EventBus eventBus = new EventBus("test");
     4 
     5         // 2.构造一个事件监听器
     6         EventListener listener = new EventListener();
     7 
     8         // 3.把事件监听器注册到事件总线上
     9         eventBus.register(listener);
    10 
    11         // 4.事件总线发布事件,触发监听器方法
    12         eventBus.post(new TestEvent1(1));
    13         eventBus.post(new TestEvent2(2));
    14         // 事件3是事件2子类,虽然监听器只订阅了父类事件2,一样可以监听到子类
    15         eventBus.post(new TestEvent3(3));
    16         // deadEvent未被订阅的事件,供用户自行处理
    17         eventBus.post(new TestEvent4(4));
    18     }

    如上图,虽然是google封装的事件总线,但是依然是观察者模式,那么核心就是发布、订阅。下面就从这两个方面来看一下源码,看看有没有值得借鉴的地方。

     3.1 核心类速读

    1.EventBus事件总线

    核心方法

    register:把监听器中申明的所有订阅事件方法注册到SubscriberRegistry(订阅者注册器)中。

    post发布事件给所有已注册过的订阅者,最终开启线程完成订阅方法。

    具体如下图:

      1 @Beta
      2 public class EventBus {
      6   private final String identifier;//事件总线标识:用于自定义标识这个事件总线
      7   private final Executor executor;//默认的线程执行器,用于把事件转发给订阅者
     10   private final SubscriberRegistry subscribers = new SubscriberRegistry(this);//订阅注册器
     11   private final Dispatcher dispatcher;//事件转发器
     12  15   //构造器:使用默认字符串
     16   public EventBus() {
     17     this("default");
     18   }
     19   //构造器:使用自定义字符串
     26   public EventBus(String identifier) {
     27     this(
     28         identifier,
     29         MoreExecutors.directExecutor(),
     30         Dispatcher.perThreadDispatchQueue(),
     31         LoggingHandler.INSTANCE);
     32   } 58 
     93   //注册监听者中申明的所有订阅方法(@Subscribe标记的),用以接收事件
     97   public void register(Object object) {
     98     subscribers.register(object);
     99   }
    100   // 解除订阅
    107   public void unregister(Object object) {
    108     subscribers.unregister(object);
    109   }
    110 
    111   //发布事件给所有已注册过的订阅者
    121   public void post(Object event) {
    // 找到事件的所有订阅者
    122 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); 123 if (eventSubscribers.hasNext()) {
    // 事件转发器,把事件转发给订阅者
    124 dispatcher.dispatch(event, eventSubscribers); 125 } else if (!(event instanceof DeadEvent)) { 126 // 如果该事件即没有订阅者,也没事DeadEvent,那么封装成DeadEvent并重新发布 127 post(new DeadEvent(this, event)); 128 }
    ...省略非重要方法
    167 }

    2.Subscriber订阅者

    核心方法:dispatchEvent使用executor线程执行器,单独开启线程执行订阅方法。

      1 class Subscriber {
      2 
      3   /**
      4    * 构造  5    */
      6   static Subscriber create(EventBus bus, Object listener, Method method) {
      7     return isDeclaredThreadSafe(method)
      8         ? new Subscriber(bus, listener, method)
      9         : new SynchronizedSubscriber(bus, listener, method);
     10   }
     11 
     12   /** 订阅者所属的事件总线*/
     13   @Weak private EventBus bus;
     14 
     15   /** 监听器 listener*/
     16   @VisibleForTesting final Object target;
     17 
     18   /** 订阅者方法 */
     19   private final Method method;
     20 
     21   /** 线程执行器,用来分发事件给订阅者 */
     22   private final Executor executor;
    23 /** 构造器:使用事件总线、监听器、订阅方法 */ 24 private Subscriber(EventBus bus, Object target, Method method) { 25 this.bus = bus; 26 this.target = checkNotNull(target); 27 this.method = method; 28 method.setAccessible(true); 29 30 this.executor = bus.executor(); 31 } 32 33 /** 34 * 使用executor线程执行器,执行订阅方法*/ 36 final void dispatchEvent(final Object event) { 37 executor.execute( 38 new Runnable() { 39 @Override 40 public void run() { 41 try { 42 invokeSubscriberMethod(event); 43 } catch (InvocationTargetException e) { 44 bus.handleSubscriberException(e.getCause(), context(event)); 45 } 46 } 47 }); 48 } 49 50 /** 51 * 调用订阅者方法*/ 54 @VisibleForTesting 55 void invokeSubscriberMethod(Object event) throws InvocationTargetException { 56 try { 57 method.invoke(target, checkNotNull(event)); 58 } catch (IllegalArgumentException e) { 59 throw new Error("Method rejected target/argument: " + event, e); 60 } catch (IllegalAccessException e) { 61 throw new Error("Method became inaccessible: " + event, e); 62 } catch (InvocationTargetException e) { 63 if (e.getCause() instanceof Error) { 64 throw (Error) e.getCause(); 65 } 66 throw e; 67 } 68 }
    ...
    120 }

    本节我们了解了2个核心类EventBus(注册监听器、发布事件)、Subscriber订阅者(执行订阅方法),下面我们从源码流程上来串连一遍。

     3.2.注册监听器

    我们从注册监听器开始看,eventBus.register(listener); 如下图所示:

     1 public void register(Object object) {
     2     subscribers.register(object);
     3 }
     4 
     5 /**
     6    * 把listener中申明的所有订阅方法都注册
     7    */
     8   void register(Object listener) {
    // 获取该监听器类型对应的所有订阅方法,key是事件类型,value是订阅者集合
    9 Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); 10 // 遍历map Map<K, Collection<V>> 11 for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { 12 //key:事件类型
          Class<?> eventType = entry.getKey();
    //value:订阅者集合
    13 Collection<Subscriber> eventMethodsInListener = entry.getValue(); 14 //从subscribers并发map 中获取事件对应的事件订阅者set,subscribersprivate final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap(); 15 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); 16 //不存在,就构造 17 if (eventSubscribers == null) { 18 CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>(); 19 eventSubscribers = 20 MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); 21 } 22 //存在,把订阅者集合添加进subscribers 23 eventSubscribers.addAll(eventMethodsInListener); 24 } 25 }

    1.调用SubscriberRegistry的register(listener)来执行注册监听器。

    2.register步骤如下:

    EventBus-包含-》SubscriberRegistry-包含-》ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers 用以维护事件和订阅者的映射

    1). findAllSubscribers从缓存中获取该监听器类型对应的所有订阅方法,key是event class,value是Subscriber集合

    2).遍历map,把订阅者集合添加进SubscriberRegistry-》subscribers。

    其中findAllSubscribers详细代码如下:

    12   /**
    13    * 返回所有该监听器订阅者,以事件分组
    14    */
    15   private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    16     Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    17     Class<?> clazz = listener.getClass();
    //从缓存中获取该监听器类型对应的所有订阅方法,遍历塞进Multimap
    18 for (Method method : getAnnotatedMethods(clazz)) { 19 Class<?>[] parameterTypes = method.getParameterTypes(); 20 Class<?> eventType = parameterTypes[0]; 21 methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); 22 } 23 return methodsInListener; 24 }

    如上图,

    1.方法getAnnotatedMethods:从缓存中取 listener中订阅方法的不可变列表,

    2.遍历塞进Multimap:一个key,多个value,每次put进去,往Collection<V>中add(value)。

    getAnnotatedMethods源码如下:

     1 private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
     2     return subscriberMethodsCache.getUnchecked(clazz);
     3   }
     4 
     5 private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
     6       CacheBuilder.newBuilder()
     7           .weakKeys()
     8           .build(
     9               new CacheLoader<Class<?>, ImmutableList<Method>>() {
    10                 @Override
    11                 public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
    12                   return getAnnotatedMethodsNotCached(concreteClass);
    13                 }
    14               });

    如上图,我们发现这里用到了google cache来做缓存,关于google cache飞机票

    这个cache的源码注释翻译如下:一个线程安全的缓存,包含从每个类到该类中的所有方法和所有超类的映射,这些超类都用{@code @Subscribe}注释。缓存是跨该类的所有实例共享的;如果创建了多个EventBus实例,并且在所有这些实例上注册了同一个类的对象,这将大大提高性能。

    值得借鉴点:EventBus-包含->SubscriberRegistry-->static final subscriberMethodsCache,即所有EventBus类的实例共享一个静态cache,性能高且线程安全。

    下面来看看具体怎么获取的订阅事件方法(监听器@Subscribe注解的订阅事件方法),核心方法getAnnotatedMethodsNotCached如下:

     1 private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
    //获取超类class集合
    2 Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); 3 Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
        //遍历超类
    4 for (Class<?> supertype : supertypes) {
    5   //遍历超类中的所有定义的方法
      
      
    for (Method method : supertype.getDeclaredMethods()) {
    //如果方法上有@Subscribe注解
    6 if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { 7 // 方法的参数类型数组 8 Class<?>[] parameterTypes = method.getParameterTypes(); 9 // 校验:事件订阅方法必须只能有一个参数,即事件类
            checkArgument(
    10 parameterTypes.length == 1, 11 "Method %s has @Subscribe annotation but has %s parameters." 12 + "Subscriber methods must have exactly 1 parameter.", 13 method, 14 parameterTypes.length); 15       // 封装方法定义对象 16 MethodIdentifier ident = new MethodIdentifier(method);
            // 去重并添加进map
    17 if (!identifiers.containsKey(ident)) { 18 identifiers.put(ident, method); 19 } 20 } 21 } 22 }
        // map转ImmutableList
    23 return ImmutableList.copyOf(identifiers.values()); 24 }

    3.3 发布事件

    eventBus.post(new TestEvent1(1));

    调用事件转发器Dispatcher,分发事件给订阅者,

    1 public void post(Object event) {
    //从注册器中获取当前事件对应的订阅者集合:eventBus-包含-》SubscriberRegistry-包含-》ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers
    2 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); 3 if (eventSubscribers.hasNext()) { 4 dispatcher.dispatch(event, eventSubscribers); 5 } else if (!(event instanceof DeadEvent)) { 6 //如果该事件即没有订阅者,也没事DeadEvent,那么封装成DeadEvent并重新发布
    7 post(new DeadEvent(this, event));
    8 }
    9 }

    Dispatcher是个抽象类,有多个内部类复写不同dispatch方法。EventBus默认构造时使用PerThreadQueuedDispatcher,即每个线程一个待转发事件队列。如下图所示:

     1   private static final class PerThreadQueuedDispatcher extends Dispatcher {
     2 
     3     // This dispatcher matches the original dispatch behavior of EventBus.
     4 
     5     /**
     6      * 每个线程待转发事件队列
     7      */
     8     private final ThreadLocal<Queue<Event>> queue =
     9         new ThreadLocal<Queue<Event>>() {
    10           @Override
    11           protected Queue<Event> initialValue() {
    12             return Queues.newArrayDeque();
    13           }
    14         };
    15 
    16     /**
    17      * 每个线程的转发状态,用于避免重入事件转发,初始化状态为fasle,即不在转发。
    18      */
    19     private final ThreadLocal<Boolean> dispatching =
    20         new ThreadLocal<Boolean>() {
    21           @Override
    22           protected Boolean initialValue() {
    23             return false;
    24           }
    25         };
    26 
    27     @Override
    28     void dispatch(Object event, Iterator<Subscriber> subscribers) {
    29       checkNotNull(event);
    30       checkNotNull(subscribers);
    31       Queue<Event> queueForThread = queue.get();
    // 事件添加进队列
    32 queueForThread.offer(new Event(event, subscribers)); 33 // 当前不在转发中,开始转发 34 if (!dispatching.get()) { 35 dispatching.set(true); 36 try { 37 Event nextEvent;
    // 迭代从线程队列中取事件
    38 while ((nextEvent = queueForThread.poll()) != null) {
    // 迭代事件的Iterator<Subscriber> subscribers,调用订阅者转发事件
    39 while (nextEvent.subscribers.hasNext()) { 40 nextEvent.subscribers.next().dispatchEvent(nextEvent.event); 41 } 42 } 43 } finally { 44 dispatching.remove(); 45 queue.remove(); 46 } 47 } 48 } 49 50 private static final class Event { 51 private final Object event; 52 private final Iterator<Subscriber> subscribers; 53 54 private Event(Object event, Iterator<Subscriber> subscribers) { 55 this.event = event; 56 this.subscribers = subscribers; 57 } 58 } 59 }

    可见核心方法在dispatchEvent,调用订阅者转发事件

     1   final void dispatchEvent(final Object event) {
     2     executor.execute(
     3         new Runnable() {
     4           @Override
     5           public void run() {
     6             try {
     7               invokeSubscriberMethod(event);
     8             } catch (InvocationTargetException e) {
     9               bus.handleSubscriberException(e.getCause(), context(event));
    10             }
    11           }
    12         });
    13   }
    14 
    15   /**
    16    * 执行订阅者方法18    */
    19   @VisibleForTesting
    20   void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    21     try {
    22       method.invoke(target, checkNotNull(event));
    23     } catch (IllegalArgumentException e) {
    24       throw new Error("Method rejected target/argument: " + event, e);
    25     } catch (IllegalAccessException e) {
    26       throw new Error("Method became inaccessible: " + event, e);
    27     } catch (InvocationTargetException e) {
    28       if (e.getCause() instanceof Error) {
    29         throw (Error) e.getCause();
    30       }
    31       throw e;
    32     }
    33   }

    四、总结

    本文我们先快速了解Google EventBus总体架构,然后从一个简单的应用入手知道如何使用,再深入剖析源码彻底了解原理,并分析了有哪些值得借鉴的地方,最后我们来看一下传统观察者模式和EventBus的区别:

    传统观察者模式和EventBus区别
     

    监听者管理

    监听特定事件

    把监听者注册到生产者

    按事件超类监听 检测没有监听者的事件 分发事件

    传统观察者模式

    用列表管理监听者,还要考虑线程同步;或者使用工具类 定义相应的事件监听者类

    调用事件生产者的register方法,开发者必须知道所有事件生产者的类型,才能正确地注册监听者

    很困难,需要开发者自己去实现匹配逻辑 在每个事件分发方法中添加逻辑代码 开发者自己写代码,包括事件类型匹配、异常处理、异步分发
    EventBus 内部已经实现了监听者管理

    以自定义Event为唯一参数创建方法,

    并用Subscribe注解标记。

    EventBus.register(Object) EventBus自动把事件分发给事件超类的监听者 EventBus会把所有发布后没有监听者处理的事件包装为DeadEvent

    EventBus.post(Object)

    异步分发可以直接用EventBus的子类AsyncEventBus

    ==参考==

    官方介绍https://github.com/google/guava/wiki/EventBusExplained

  • 相关阅读:
    如何在Eclipse中彻底修改一个项目名称
    用JS在html页面实现打印功能
    关于git提交、还原使用
    maven package:Max maven Unsupported major.minor version 51.0
    Tomcat 启动报错:javax.naming.NamingException: No naming context bound to this class loader
    maven web 项目中启动报错java.lang.ClassNotFoundException: org.springframework.web.util.Log4jConfigListener
    tripwire检查文件完整性
    设置mysql表名不区分大小写
    mysql-零基础安装
    nginx-0基础安装篇
  • 原文地址:https://www.cnblogs.com/dennyzhangdd/p/9324483.html
Copyright © 2011-2022 走看看