zoukankan      html  css  js  c++  java
  • guava eventbus 原理+源码分析

    前言:

    guava提供的eventbus可以很方便的处理一对多的事件问题, 最近正好使用到了,做个小结,使用的demo网上已经很多了,不再赘述,本文主要是源码分析+使用注意点+新老版本eventbus实现方式对比

    一.原理

    将定义的hander注册到eventbus中,eventbus遍历该handler及其父类中含有@subscribe注解的方法,封装成subscriber对象,一个event会对应多个方法,Map<EventType.class,List<Subscriber>>,但既然是guava出品,这种情况下一定会用自己家的MultiMap了,接收到event后根据类型匹配对应的subscriber去执行,接下来从源码角度探究下

     二.源码分析

    主要分析注册与分发处理,会贴相关的源码的注释(guava版本github 2021 1月版本),方便你阅读

    1.注册流程

    分析之前我们先简要拓展下关于guava cache的用法,compute if absent,不存在则计算,对应getOrLoad方法(暴露给用户的是get()),有则直接返回,

    注册流程抓住一个关键点即可,即一个subscriber对应一个被@subscriber标记的method,为了方便阅读,我把代码贴到一起

      1   /** Registers all subscriber methods on the given listener object. */
      2   void register(Object listener) {
      3     // key-eventType.class value-List<Subscriber>,一个subscriber对应一个方法
      4     Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
      5 
      6     for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      7       Class<?> eventType = entry.getKey();
      8       Collection<Subscriber> eventMethodsInListener = entry.getValue();
      9       // 并发读写
     10       CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
     11 
     12       if (eventSubscribers == null) {
     13         CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
     14         // eventType.class不存在时才put,concurrenthashmap的putIfAbsent()
     15         // 有可能为null,用newSet替换
     16         eventSubscribers =
     17             MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
     18       }
     19       // 添加
     20       eventSubscribers.addAll(eventMethodsInListener);
     21     }
     22   }
     23   
     24   
     25   /**
     26    * Returns all subscribers for the given listener grouped by the type of event they subscribe to.
     27    */
     28   private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
     29     Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
     30     Class<?> clazz = listener.getClass();
     31     for (Method method : getAnnotatedMethods(clazz)) {
     32       Class<?>[] parameterTypes = method.getParameterTypes();
     33       Class<?> eventType = parameterTypes[0];
     34       // 创建subscriber时,如果未添加@AllowConcurrentEvents注解则生成同步的subscriber
     35       methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
     36     }
     37     return methodsInListener;
     38   }
     39 
     40   private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
     41     try {
     42       return subscriberMethodsCache.getUnchecked(clazz);
     43     } catch (UncheckedExecutionException e) {
     44       throwIfUnchecked(e.getCause());
     45       throw e;
     46     }
     47   }
     48 
     49 // 映射关系缓存,getOrload
     50   private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
     51       CacheBuilder.newBuilder()
     52           .weakKeys()
     53           .build(
     54               new CacheLoader<Class<?>, ImmutableList<Method>>() {
     55                 @Override
     56                 public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
     57                   return getAnnotatedMethodsNotCached(concreteClass);
     58                 }
     59               });
     60 
     61 private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
     62     // 获得listener的所有父类及自身的class(包括接口)
     63     Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
     64     Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
     65     for (Class<?> supertype : supertypes) {
     66       for (Method method : supertype.getDeclaredMethods()) {
     67         if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
     68           // TODO(cgdecker): Should check for a generic parameter type and error out
     69           Class<?>[] parameterTypes = method.getParameterTypes();
     70           // 参数校验,@subscribe注解的方法有且有能有一个非原始类型参数
     71           checkArgument(
     72               parameterTypes.length == 1,
     73               "Method %s has @Subscribe annotation but has %s parameters. "
     74                   + "Subscriber methods must have exactly 1 parameter.",
     75               method,
     76               parameterTypes.length);
     77 
     78           checkArgument(
     79               !parameterTypes[0].isPrimitive(),
     80               "@Subscribe method %s's parameter is %s. "
     81                   + "Subscriber methods cannot accept primitives. "
     82                   + "Consider changing the parameter to %s.",
     83               method,
     84               parameterTypes[0].getName(),
     85               Primitives.wrap(parameterTypes[0]).getSimpleName());
     86 
     87           MethodIdentifier ident = new MethodIdentifier(method);
     88           // 重写的方法只放入一次
     89           if (!identifiers.containsKey(ident)) {
     90             identifiers.put(ident, method);
     91           }
     92         }
     93       }
     94     }
     95     return ImmutableList.copyOf(identifiers.values());
     96   }
     97 
     98 
     99   // 创建subscriber
    100   static Subscriber create(EventBus bus, Object listener, Method method) {
    101     return isDeclaredThreadSafe(method)
    102         ? new Subscriber(bus, listener, method)
    103         : new SynchronizedSubscriber(bus, listener, method);
    104   }
    105 
    106   @VisibleForTesting
    107   static final class SynchronizedSubscriber extends Subscriber {
    108 
    109     private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
    110       super(bus, target, method);
    111     }
    112 
    113     @Override
    114     void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    115       synchronized (this) {
    116         super.invokeSubscriberMethod(event);
    117       }
    118     }
    119   }

    值得注意的是subscriber的生成,即便你使用了AsyncEventbus,却没有在处理方法上声明@AllowConcurrentEvents,那么在处理event时仍然是同步执行的,注册流程并发安全问题请看第三部分

    2.分发流程

    先看下如何获得event对应的subscriber

     1 public void post(Object event) {
     2     Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
     3     if (eventSubscribers.hasNext()) {
     4       // 分发,dispatcher有三种实现,ImmediateDispatcher(同步处理event,深度优先)
     5       // LegacyAsyncDispatcher(异步处理event)
     6       // PerThreadQueuedDispatcher(默认,同步调用,广度优先) 内置队列,可以保证同一线程内的event的顺序
     7       dispatcher.dispatch(event, eventSubscribers);
     8     } else if (!(event instanceof DeadEvent)) {
     9       // the event had no subscribers and was not itself a DeadEvent
    10       // 把所有没有被订阅的event包装成deadevent,用户可以自己定义处理deadevent的方法,作为兜底
    11       post(new DeadEvent(this, event));
    12     }
    13   }
    14 
    15   Iterator<Subscriber> getSubscribers(Object event) {
    16     //获得event的所有父类及自身的class(包括接口),从获取subscriber的流程来看,post一个event
    17     // 时,除了调用该event的处理方法也会调用该event父类的处理方法
    18     ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());
    19 
    20     List<Iterator<Subscriber>> subscriberIterators =
    21         Lists.newArrayListWithCapacity(eventTypes.size());
    22 
    23     for (Class<?> eventType : eventTypes) {
    24       CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
    25       if (eventSubscribers != null) {
    26         // eager no-copy snapshot
    27         subscriberIterators.add(eventSubscribers.iterator());
    28       }
    29     }
    30     // 类似flatmap,扁平化
    31     return Iterators.concat(subscriberIterators.iterator());
    32   }
    33 
    34   @VisibleForTesting
    35   static ImmutableSet<Class<?>> flattenHierarchy(Class<?> concreteClass) {
    36     try {
    37       return flattenHierarchyCache.getUnchecked(concreteClass);
    38     } catch (UncheckedExecutionException e) {
    39       throw Throwables.propagate(e.getCause());
    40     }
    41   }
    42 
    43   private static final LoadingCache<Class<?>, ImmutableSet<Class<?>>> flattenHierarchyCache =
    44       CacheBuilder.newBuilder()
    45           .weakKeys()
    46           .build(
    47               new CacheLoader<Class<?>, ImmutableSet<Class<?>>>() {
    48                 // <Class<?>> is actually needed to compile
    49                 @SuppressWarnings("RedundantTypeArguments")
    50                 @Override
    51                 public ImmutableSet<Class<?>> load(Class<?> concreteClass) {
    52                   return ImmutableSet.<Class<?>>copyOf(
    53                       TypeToken.of(concreteClass).getTypes().rawTypes());
    54                 }
    55               });

    从代码可以看出,先对该event查询上级,最后把所有event对应的subscriber返回,因此触发一个event时,其父event的subscriber也会被调用

    接下来看下post,流程eventbus有三种dispatcher(ImmediaDispatcher,PerThreadDispatcher,LegacyAsyncDispatcher)eventbus使用的是PerThreadDispatcher,AsyncEventBus使用LegacyAsyncDispatcher

    ①ImmediaDispatcher

    从名字中的Immedia"即时"就能看出这个dispatcher收到event后会立即处理,不会进行异步处理

    代码如下:

     从图中可以看出ImmediaDispatcher是针对每个event,调用其全部的subscriber进行处理,即尽可能多的调用subscriber,所以是广度优先,这个dispatcher目前未被使用,了解即可

     ②PerThreadQueueDispatcher(默认的dispatcher)

    同样从名称可以看出这种dispatcher是一个thread一个queue,那我们可以猜测内部有可能用了ThreadLocal,既然用了队列,说明想要起到一个缓冲event处理的过程

    队列的缓冲功能使得dispatcher有能力吞吐更高的event,因此是一种深度优先策略,此外每线程每队列的方式保证了event处理过程是对于每个线程而言是有序的,同样是广度优先,对

    每一个event都分发到相关的subscriber进行处理,除此之外还有一个值得称道的点,即Dispatching变量的使用,规避了递归产生的死循环问题

     1 private static final class PerThreadQueuedDispatcher extends Dispatcher {
     2 
     3     // This dispatcher matches the original dispatch behavior of EventBus.
     4 
     5     /** Per-thread queue of events to dispatch. */
     6     private final ThreadLocal<Queue<Event>> queue =
     7         new ThreadLocal<Queue<Event>>() {
     8           @Override
     9           protected Queue<Event> initialValue() {
    10             return Queues.newArrayDeque();
    11           }
    12         };
    13 
    14     /** Per-thread dispatch state, used to avoid reentrant event dispatching. */
    15     private final ThreadLocal<Boolean> dispatching =
    16         new ThreadLocal<Boolean>() {
    17           @Override
    18           protected Boolean initialValue() {
    19             return false;
    20           }
    21         };
    22 
    23     @Override
    24     void dispatch(Object event, Iterator<Subscriber> subscribers) {
    25       checkNotNull(event);
    26       checkNotNull(subscribers);
    27       // 如果只从代码来看,PerThreadQueuedDispatcher的dispatch方法始终
    28       // 是单线程调用,并不需要ThreadLocal,但从拓展的角度看,当用户自定义xxeventbus自己实现分发逻辑时,PerThreadQueuedDispatcher实现了线程安全的dispatch
    29       //因为eventbus有可能会被多个线程调用,从框架的角度看,无论用户是否多线程调用,都应该要保证线程安全
    30       // 引用issue 3530中 https://github.com/google/guava/issues/3530 的一个回答 if multiple threads are dispatching to this dispatcher, they will read different values for queueForThread and dispatching.
    31       Queue<Event> queueForThread = queue.get();
    32       queueForThread.offer(new Event(event, subscribers));
    33 
    34       // 如果未开始分发事件则进行处理,解决subscriber递归调用post产生的死循环
    35       if (!dispatching.get()) {
    36         dispatching.set(true);
    37         try {
    38           Event nextEvent;
    39           // 对每一个event,分发到相关的subscribers中
    40           while ((nextEvent = queueForThread.poll()) != null) {
    41             while (nextEvent.subscribers.hasNext()) {
    42               nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
    43             }
    44           }
    45         } finally {
    46           dispatching.remove();
    47           queue.remove();
    48         }
    49       }
    50     }

    接下来看下刚刚说的dispatching的妙用demo

    在guava-test下建立一个新的目录方便我们修改源码后进行测试,测试代码如下

    Listener

     1 /**
     2  * @author tele
     3  * @Description
     4  * @create 2020-11-23
     5  */
     6 public class Listener {
     7 
     8     private final EventBus eventBus;
     9 
    10     public Listener(EventBus eventBus) {
    11         this.eventBus = eventBus;
    12     }
    13 
    14     @Subscribe
    15     public void record(String s) {
    16         eventBus.post(s);
    17         System.out.println("receive:"+ s);
    18     }
    19 }

    Producer

     1 /**
     2  * @author tele
     3  * @Description
     4  * @create 2020-11-23
     5  */
     6 public class Producer {
     7 
     8     public String produce() {
     9         return "hello";
    10     }
    11 }

    Main

     1 /**
     2  * @author tele
     3  * @Description
     4  * @create 2020-11-23
     5  */
     6 public class Main {
     7 
     8     public static void main(String[] args) {
     9         EventBus eventBus = new EventBus();
    10         Listener listener = new Listener(eventBus);
    11         Producer producer = new Producer();
    12         eventBus.register(listener);
    13         String produce = producer.produce();
    14         eventBus.post(produce);
    15     }
    16 
    17 }

    代码很简单,问题在于Listener递归调用了post方法,按照代码示意运行后会栈溢出(队列中event堆积),receive:hello永远不会打印,可事实真的如此吗?

     很奇怪是吗,并没有产生堆栈溢出的问题,反而是不停的输出receive:hello,接下来我们修改下PerThreadDispatcher的代码,将dispatching变量注释掉

      

    再执行下demo

     果然溢出了,关键点就在于dispatching变量对于同一线程的递归分发进行了处理,已经处理过就不再次进行分发,这样我们的递归调用不停的产生的event得以被处理

     ③LegacyAsyncDispatcher

    看名字挺奇怪的,但有async字样,所以是异步的dispatcher,LegacyAsyncDispacther是AsyncEventBus的专用dispatcher,由于将event对应的subscriber拆分后入队,多线程情况下无法保证event入队顺序,也就无法保证subscriber的调用顺序,但这样处理实现了深度优先,即尽可能多的调用不同的event的subscriber,与PerThreadDispatcher相比代码难度小了不少,由于AsyncEventBus的初始化需要传入线程池参数,所以AsyncEventBus实现了真正的异步处理

     1 /** Implementation of a {@link #legacyAsync()} dispatcher. */
     2   private static final class LegacyAsyncDispatcher extends Dispatcher {
     3 
     4     // This dispatcher matches the original dispatch behavior of AsyncEventBus.
     5     //
     6     // We can't really make any guarantees about the overall dispatch order for this dispatcher in
     7     // a multithreaded environment for a couple reasons:
     8     //
     9     // 1. Subscribers to events posted on different threads can be interleaved with each other
    10     //    freely. (A event on one thread, B event on another could yield any of
    11     //    [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.)
    12     // 2. It's possible for subscribers to actually be dispatched to in a different order than they
    13     //    were added to the queue. It's easily possible for one thread to take the head of the
    14     //    queue, immediately followed by another thread taking the next element in the queue. That
    15     //    second thread can then dispatch to the subscriber it took before the first thread does.
    16     //
    17     // All this makes me really wonder if there's any value in queueing here at all. A dispatcher
    18     // that simply loops through the subscribers and dispatches the event to each would actually
    19     // probably provide a stronger order guarantee, though that order would obviously be different
    20     // in some cases.
    21 
    22     /** Global event queue. */
    23     private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
    24         Queues.newConcurrentLinkedQueue();
    25 
    26     @Override
    27     void dispatch(Object event, Iterator<Subscriber> subscribers) {
    28       checkNotNull(event);
    29       // 拆分后入队
    30       while (subscribers.hasNext()) {
    31         queue.add(new EventWithSubscriber(event, subscribers.next()));
    32       }
    33 
    34       EventWithSubscriber e;
    35       while ((e = queue.poll()) != null) {
    36         e.subscriber.dispatchEvent(e.event);
    37       }
    38     }
    39 
    40     private static final class EventWithSubscriber {
    41       private final Object event;
    42       private final Subscriber subscriber;
    43 
    44       private EventWithSubscriber(Object event, Subscriber subscriber) {
    45         this.event = event;
    46         this.subscriber = subscriber;
    47       }
    48     }
    49   }

    注意点:

    1.eventbus默认使用的线程池MoreExecutors.directExecutor(),其execute方法是直接调用传入的runnable的run方法,是非异步的 

    2.使用AsyncEventBus时,请在对应的方法上添加@AllowConcurrenEvents

    三.从并发安全的角度出发,对比下新老版本的注册流程

    本部分为补充内容,重点探讨新老版本的注册并发安全问题,可略过

    从20.0开始,event bus的注册程变成了上面分析的,那么之前的版本是如何实现的呢,一起来分析下.先切到16.0 的tag,注册代码如下

    显然是使用了读写锁,不加锁,eventType会相互覆盖(HashMultiMap是非线程安全的),先给eventbus加个getSubscriberByType(),记得修改下EventSubscriber的修饰符为public,然后做个多线程的测试

     1 /**
     2  * @author tele
     3  * @Description
     4  * @create 2021-01-24
     5  */
     6 public class ListenerA {
     7 
     8     @Subscribe
     9     public void handle(String msg) {
    10         System.out.println("ListenerA:" + msg);
    11     }
    12 
    13 }
    14 
    15 /**
    16  * @author tele
    17  * @Description
    18  * @create 2021-01-24
    19  */
    20 public class ListenerB {
    21 
    22     @Subscribe
    23     public void handle(String msg) {
    24         System.out.println("ListenerB:" + msg);
    25     }
    26 
    27 }
    28 
    29 /**
    30  * @author tele
    31  * @Description
    32  * @create 2021-01-24
    33  */
    34 public class Main {
    35 
    36 
    37     public static void main(String[] args) throws InterruptedException {
    38 
    39         final EventBus eventBus = new EventBus();
    40         final ListenerA a = new ListenerA();
    41         ListenerB b = new ListenerB();
    42         CountDownLatch countDownLatch = new CountDownLatch(6);
    43 
    44         Runnable r1 = ()-> {
    45             eventBus.register(a);
    46             countDownLatch.countDown();
    47         };
    48         Thread t1 = new Thread(r1);
    49         Thread t2 = new Thread(r1);
    50         Thread t3 = new Thread(r1);
    51 
    52         Runnable r2 = ()-> {
    53             eventBus.register(b);
    54             countDownLatch.countDown();
    55         };
    56         Thread t4 = new Thread(r2);
    57         Thread t5 = new Thread(r2);
    58         Thread t6 = new Thread(r2);
    59 
    60         t1.start();
    61         t2.start();
    62         t3.start();
    63         t4.start();
    64         t5.start();
    65         t6.start();
    66         countDownLatch.await();
    67         SetMultimap<Class<?>, EventSubscriber> subscribersByType = eventBus.getSubscribersByType();
    68         subscribersByType.asMap().forEach((k,v)-> {
    69             System.out.println("key:" + k);
    70             v.forEach(System.out::println);
    71         });
    72     }
    73 }

    输出结果如下:

     ok,没啥问题,接下来再修改下源码把使用读写锁的两行代码注释掉,再执行下代码

     

     输出结果如下:

    显然,ListenerA的注册结果被覆盖了,这里简要说下原因,subscribersByType,k-v结构简略表示为 K-event.class ,value-Set<Listener.class>,我们知道java中的hashset不重复的特性是基于hashmap实现的.同样的,这里的SetMultiMap实际是用的HashMultiMap,翻翻源码就知道了,内部存储数据的容器是hashmap,那么这个问题就转换成了hashmap的线程安全问题了,hashmap多线程put hash相同的元素会产生丢失问题,多线程下同时put get有可能导致get 出null.了解到这我们就知道为什么要加锁了,使用读写锁的版本一直持续到19.0,从20.0开始从开始使用并发容器代替读写锁,因为对于eventbus而言始终是读远大于写,基于cow机制实现的CopyOnWriteArrayList在读写同时进行时通过延迟更新的策略不阻塞线程,对于event的处理 而言是可以接受的,因为本次event在post时没有分发到对应的subsriber,下次同类型的event触发就ok了,事实上,这种场景极少,因为从使用经历来看,一般是项目启动时就注册,分发都是需要处理逻辑时才会触发,不阻塞与每次都需要加解读锁相比,显然不阻塞的性能更好了.老版本的分发流程不再赘述,因为确实没啥好分析的了,如果你能看懂上面分析的新版本的dispatcher,当你看老版本的时候就会感觉很简单了

    四.优势与缺陷

    1.进程内使用,无法实现跨进程处理,需要跨进程传递消息,还是老老实实的用消息队列吧

    2.和redis一样基于内存,天然的不可靠,redis好歹还有aof和rdb,可event bus没有任何持久化机制

    3.个人对新版的Subscriber实现方式有点看法,没必须要把线程池参数传递给Subscriber,因为Subscriber只是被执行者,16.0的版本线程池参数是AsyncEventBus持有

    4.优势:简单,开箱即用

    五.小结

    1.只分析了注册与分发流程,异常处理之类的没有涉及,用法的话,网上已经很多了,不再赘述

    2.event bus的代码很巧妙,细细品味还有很多巧妙之处,比如上面那个dispatching变量

    六.参考文档

    1.github https://github.com/google/guava/wiki/EventBusExplained#for-producers

  • 相关阅读:
    137. Single Number II (Bit)
    136. Single Number (Bit)
    89. Gray Code (Bit)
    57. Insert Interval (Array; Sort)
    56. Merge Intervals (Array; Sort)
    UNIX 网络编程笔记-CH3:套接字编程简介
    UNIX 网络编程笔记-CH2:TCP、UDP概貌
    TSP-旅行商问题
    Java 集合:迭代器(Iterator, Iterable)
    PAT 1029. Median
  • 原文地址:https://www.cnblogs.com/tele-share/p/14258352.html
Copyright © 2011-2022 走看看