zoukankan      html  css  js  c++  java
  • Guava中EventBus分析

    EventBus

    1. 什么是EventBus

      总线(Bus)一般指计算机各种功能部件之间传送信息的公共通信干线,而EventBus则是事件源(publisher)向订阅方(subscriber)发送订阅事件的总线,它解耦了观察者模式中订阅方和事件源之间的强依赖关系。
    

    图片来源:

    2. guava EventBus的构成

      下面以guava 19版本的EventBus的源码进行分析。EventBus有三个操作:注册Listener--register(Object Listener),注销Listener--unregister(Object Listener),发布Event--post(Object event)。在19版本之前,listener的注册和注销,事件的发布的工作都是由EventBus完成,在18版本之后,EventBus把Listener的注册和注销的工作委托给SubscriberRegistry, 把事件发布的工作委托给Dispatcher来完成,这样的修改职责分明,代码结构更加清晰了。在并发方面也有大的修改,以前对维护某类事件和其感兴趣的Subscriber的操作用了读写锁来控制对容器的操作,19版本及之后用了并发容器ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>>避免了对锁的使用。
    

    3. SubscriberRegistry

       在观察者模式中,事件源中会维护一个Listener的列表,而且向这个事件源注册的Listener一般只会收到一类事件的通知,如果Listener对多个不同类的事件感兴趣,则需要向多个事件源注册。EventBus是怎样实现Listener一次注册,能够知道Listener对那些事件感兴趣的,进而在有某类事件发生时通知到Listener的呢?答案在SubscriberRegistry这个类中。在SubscriberRegister有一个实例属性ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers,它维护了某个事件类型和对其感兴趣的Subscriber的列表。
    

       register(Object listener)的工作就是找出这个Listener对哪些事件感兴趣,然后把这种事件类型和该Listener构建成的Subscriber加到subscribers中。unregister的过程和register类似,只从subscribers删掉Listener感兴趣的事件。下面我们分别看看18版本和19版本register,主要的区别就是一个是加锁的版本,一个用的是并发容器。
    

    18版本:

    /**
       * Registers all subscriber methods on {@code object} to receive events.
       * Subscriber methods are selected and classified using this EventBus's
       * {@link SubscriberFindingStrategy}; the default strategy is the
       * {@link AnnotatedSubscriberFinder}.
       *
       * @param object  object whose subscriber methods should be registered.
       */
      public void register(Object object) {
        Multimap<Class<?>, EventSubscriber> methodsInListener =
            finder.findAllSubscribers(object);
        subscribersByTypeLock.writeLock().lock();
        try {
          subscribersByType.putAll(methodsInListener);
        } finally {
          subscribersByTypeLock.writeLock().unlock();
        }
      }
    

    19版本:

      /**
       * Registers all subscriber methods on the given listener object.
       */
      void register(Object listener) {
        Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
    
        for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
          Class<?> eventType = entry.getKey();
          Collection<Subscriber> eventMethodsInListener = entry.getValue();
    
          CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
    
          if (eventSubscribers == null) {
            CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
            eventSubscribers = MoreObjects.firstNonNull(
                subscribers.putIfAbsent(eventType, newSet), newSet);
          }
    
          eventSubscribers.addAll(eventMethodsInListener);
        }
      }
    
      SubscriberRegister通过reflection找出该Listener对象(包括其父类)哪些Method用@Subscriber注解了,用@Subscriber注解的方法表示当某件事件发生时,希望收到事件通知。在@Subscriber注解的方法中只能包含一个参数那就是Event,否则会出错。在reflection的时候用LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache缓存了该Listener Class和对应的Method,加快了后面对同一类Listener进行register的效率。用MethodIdentifier作为Map的key来判别Method的是否相等。
    
    private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
        Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
        Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
        for (Class<?> supertype : supertypes) {
          for (Method method : supertype.getDeclaredMethods()) {
            if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
              // TODO(cgdecker): Should check for a generic parameter type and error out
              Class<?>[] parameterTypes = method.getParameterTypes();
              checkArgument(parameterTypes.length == 1,
                  "Method %s has @Subscribe annotation but has %s parameters."
                      + "Subscriber methods must have exactly 1 parameter.",
                  method, parameterTypes.length);
    
              MethodIdentifier ident = new MethodIdentifier(method);
              if (!identifiers.containsKey(ident)) {
                identifiers.put(ident, method);
              }
            }
          }
        }
        return ImmutableList.copyOf(identifiers.values());
      }
    

    在构建Subscriber的时候根据方法是否有@AllowConcurrentEvents,分为同步和并发执行method。

    4. Dispatcher

       Dispatcher发布事件的时候有三种模式:
      1. ImmediateDispatcher,来了一个事件则通知对这个事件感兴趣的订阅者。
      2. LegacyAsyncDispatcher,会有一个全局的队列ConcurrentLinkedQueue<EventWithSubscriber> queue保存EventWithSubscriber(事件和subscriber),如果被不同的线程poll 不能保证在queue队列中的event是有序发布的。
      3. PerThreadQueuedDispatcher,在同一个线程post的Event,执行的顺序是有序的。用ThreadLocal<Queue<Event>> queue来实现每个线程post的Event是有序的,在把事件添加到queue后会有一个ThreadLocal<Boolean> dispatching来判断当前线程是否正在分发,如果正在分发,则这次添加的event不会马上进行分发而是等到dispatching的值为false才进行。这样做的原因是为了防止同一个事件被重复分发。我的理解是这样的:如果没有dispatching这个状态变量,在ThreadA中EventA发布了,ListenerA收到了,ListenerA进行处理,在处理的过程中如果又触发了EventA的发布,如果该线程不结束则会陷入到一种循环中去。
    

    5.EventBus的使用

    一般的应用的场景就是在用观察者模式的地方就可以用EventBus进行替代。结合Spring的使用过程如下:

    5.1定义Listener

    5.2向EventBus注册Listener

    5.3发送事件

    每天进步一点点
  • 相关阅读:
    css3
    如何去把数据渲染到页面
    js中的正则
    12.4
    react-router HashRouter和BrowserHistory的区别
    react 路由使用react-router-dom
    react 中的 三大重要属性state refs props
    在Vue中如何快速实现小球动画
    模块化 require.js 入门教学(前端必看系列)
    如何把设计稿中px值转化为想要的rem值
  • 原文地址:https://www.cnblogs.com/wagne/p/10188091.html
Copyright © 2011-2022 走看看