zoukankan      html  css  js  c++  java
  • Java EventBus

    1.EventBus是什么?

      EventBus是guava中的一个工具,官方解释如下:

     EventBus允许组件之间通过发布-订阅进行通信,而不需要组件之间显示的注册。它专门设计为了代替使用显示注册的传统的Java进程内事件分发。它不是通用的发布-订阅系统,也不是用于进程间通信的。

    Event可能发布到总线的任何对象。
    Subscribing 向EventBus注册侦听器的行为,以便其处理程序方法将接收事件。
    Listener 通过公开处理程序方法希望接收事件的对象。
    Handler method EventBus用于传递已发布事件的公共方法。处理程序方法由@Subscribe批注标记。
    Posting an event 通过EventBus将事件提供给所有侦听器。

    优点:简化组件之间的通信。是发布者和订阅之间解耦,同时避免了复杂且容易出错的依赖性和生命周期问题。使代码更加简洁

    2.使用

    //Example
    //
    Class is typically registered by the container. class EventBusChangeRecorder {
     //订阅事件 @Subscribe
    public void recordCustomerChange(ChangeEvent e) { recordChange(e.getChange()); } } //注册监听类 eventBus.register(new EventBusChangeRecorder()); // much later public void changeCustomer() ChangeEvent event = getChangeEvent();
    //发布事件 eventBus.post(event); }

    3.源码解析

    以下源码来源Guava版本-20.0

    3.1事件总线

    @Beta
    public class EventBus {
    
      private static final Logger logger = Logger.getLogger(EventBus.class.getName());
    
      private final String identifier;
      private final Executor executor;
      private final SubscriberExceptionHandler exceptionHandler;
    
      private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
      private final Dispatcher dispatcher;
    
      /**
       * Creates a new EventBus named "default".
       */
      public EventBus() {
        this("default");
      }
    
      /**
       * Creates a new EventBus with the given {@code identifier}.
       */
      public EventBus(String identifier) {
        this(
            identifier,
            MoreExecutors.directExecutor(),
            Dispatcher.perThreadDispatchQueue(),
            LoggingHandler.INSTANCE);
      }
    
      /**
       * Creates a new EventBus with the given {@link SubscriberExceptionHandler}.
       */
      public EventBus(SubscriberExceptionHandler exceptionHandler) {
        this(
            "default",
            MoreExecutors.directExecutor(),
            Dispatcher.perThreadDispatchQueue(),
            exceptionHandler);
      }
    
      EventBus(
          String identifier,
          Executor executor,
          Dispatcher dispatcher,
          SubscriberExceptionHandler exceptionHandler) {
        this.identifier = checkNotNull(identifier);
        this.executor = checkNotNull(executor);
        this.dispatcher = checkNotNull(dispatcher);
        this.exceptionHandler = checkNotNull(exceptionHandler);
      }
    
      /**
       * Returns the identifier for this event bus.
       */
      public final String identifier() {
        return identifier;
      }
    
      /**
       * Returns the default executor this event bus uses for dispatching events to subscribers.
       */
      final Executor executor() {
        return executor;
      }
    
      /**
       * 处理上下文订阅者抛出的异常
       */
      void handleSubscriberException(Throwable e, SubscriberExceptionContext context) {
        checkNotNull(e);
        checkNotNull(context);
        try {
          exceptionHandler.handleException(e, context);
        } catch (Throwable e2) {
          // if the handler threw an exception... well, just log it
          logger.log(
              Level.SEVERE,
              String.format(Locale.ROOT, "Exception %s thrown while handling exception: %s", e2, e),
              e2);
        }
      }
    
      /**
       * 注册所有的订阅者为了接收事件
       */
      public void register(Object object) {
        subscribers.register(object);
      }
    
      /**
       * 注销所有已注册的订阅方法
       * 要注册的对象,如果之前没有该对象没有注册过抛出IllegalArgumentException
       */
      public void unregister(Object object) {
        subscribers.unregister(object);
      }
    
      /**
       * 发布事件给所有订阅者,发送完成就返回成功,不管订阅者有没有处理成功。如果没有订阅者,该事件将成为一个死亡事件,它将包装在死亡事件中并重新发布 
       */
      public void post(Object event) {
        //找到事件的所有订阅者
        Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
        if (eventSubscribers.hasNext()) {
          //事件转发器,把事件转发给订阅者
          dispatcher.dispatch(event, eventSubscribers);
        } else if (!(event instanceof DeadEvent)) {
          // 如果该事件即没有订阅者,也没事。那么封装成DeadEvent并重新发布
          post(new DeadEvent(this, event));
        }
      }
    
      @Override
      public String toString() {
        return MoreObjects.toStringHelper(this).addValue(identifier).toString();
      }
      ...省略异常日志处理方法
      }
    }
    • subscribers是SubscriberRegistry类型的,实际上EventBus在添加、移除和遍历观察者的时候都会使用该实例的方法,所有的观察者信息也都维护在该实例中.
    • executor是事件分发过程中使用到的线程池,可以自己实现; dispatcher是Dispatcher类型的子类,用来在发布事件的时候分发消息给监听者,它有几个默认的实现,分别针对不同的分发方式;
    • exceptionHandler是SubscriberExceptionHandler类型的,它用来处理异常信息,在默认的EventBus实现中,会在出现异常的时候打印出log,当然我们也可以定义自己的异常处理策咯。

      通过SubscriberRegistry了解如何注册和取消注册以及遍历。我们需要在EventBus中维护几个映射,以便在发布事件的时候找到并通知所有的监听者,首先是事件类型->观察者列表的映射。

    EventBus中发布事件是针对各个方法的,我们将一个事件对应的类型信息和方法信息等都维护在一个对象中,在EventBus中就是观察者Subscriber. 然后,通过事件类型映射到观察者列表,当发布事件的时候,只要根据事件类型到列表中寻找所有的观察者并触发监听方法即可。 在SubscriberRegistry中通过如下数据结构来完成这一映射:

    /**
       * 通过事件类型索引所有的注册订阅者
       * CopyOnWriteArraySet值使获取事件的所有当前订阅者的不可变快照变得容易且相对轻便,而没有任何锁定。
       */
      private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();
      从上面的定义形式中我们可以看出,这里使用的是事件的Class类型映射到Subscriber列表的。这里的Subscriber列表使用的是Java中的CopyOnWriteArraySet集合,
    它底层使用了CopyOnWriteArrayList,并对其进行了封装,也就是在基本的集合上面增加了去重的操作。这是一种适用于读多写少场景的集合,在读取数据的时候不会加锁,
    写入数据的时候进行加锁,并且会进行一次数组拷贝。
      在分析register()方法之前,我们先看下SubscriberRegistry内部经常使用的几个方法,它们的原理与我们上面提出的问题息息相关。首先是findAllSubscribers()方法,它用来获取指定监听者对应的全部观察者集合。下面是它的代码:
      /**
       * 返回给定监听器的所有订阅者(按其订阅的事件类型分组)。
       */
      private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
        // 创建一个哈希表
        Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
        //获取监听者的类型
        Class<?> clazz = listener.getClass();
        //遍历上述方法,并且根据方法和类型参数创建观察者并将其插入到映射表中
        for (Method method : getAnnotatedMethods(clazz)) {
          Class<?>[] parameterTypes = method.getParameterTypes();
          //事件的类型
          Class<?> eventType = parameterTypes[0];
          methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
        }
        return methodsInListener;
      }

      这里注意一下Multimap数据结构,它是Guava中提供的集合结构,与普通的哈希表不同的地方在于,它可以完成一对多操作。这里用来存储事件类型到观察者的一对多映射。

    • 调用SubscriberRegistry的register(listener)来执行注册监听器。
    • register步骤如下:
      EventBus->SubscriberRegistry->ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers 用以维护事件和订阅者的映射。

      当新注册监听者的时候,getAnnotatedMethods用反射获取全部的订阅者,为了避免浪费性能,会通过subscriberMethodsCache从缓存中加载。

     private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
        return subscriberMethodsCache.getUnchecked(clazz);
      }
    //subscriberMethodsCache的定义是:
    private
    static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache = CacheBuilder.newBuilder() .weakKeys() .build( new CacheLoader<Class<?>, ImmutableList<Method>>() { @Override public ImmutableList<Method> load(Class<?> concreteClass) throws Exception { return getAnnotatedMethodsNotCached(concreteClass); //2 } });
      这里的作用机制是:当使用subscriberMethodsCache.getUnchecked(clazz)获取指定监听者中的方法的时候会先尝试从缓存中进行获取,如果缓存中不存在就会执行2处的代码,
    调用SubscriberRegistry中的getAnnotatedMethodsNotCached()方法获取这些监听方法。其实就是使用反射并完成一些校验,并不复杂。
    //获取超类class集合
    private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
        Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
        Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
        ////遍历超类
        for (Class<?> supertype : supertypes) {
          ////遍历超类中的所有定义的方法
          for (Method method : supertype.getDeclaredMethods()) {
            ///如果方法上有@Subscribe注解
            if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
              //方法的参数类型数组
              Class<?>[] parameterTypes = method.getParameterTypes();
              //校验:事件订阅方法必须只能有一个参数,即事件类
              checkArgument(
                  parameterTypes.length == 1,
                  "Method %s has @Subscribe annotation but has %s parameters."
                      + "Subscriber methods must have exactly 1 parameter.",
                  method,
                  parameterTypes.length);
              //封装方法定义对象
              MethodIdentifier ident = new MethodIdentifier(method);
              //去重并添加进map
              if (!identifiers.containsKey(ident)) {
                identifiers.put(ident, method);
              }
            }
          }
        }
        return ImmutableList.copyOf(identifiers.values());
      }
    这样,我们就分析完了findAllSubscribers()方法,整理一下:当注册监听者的时候,首先会拿到该监听者的类型,然后从缓存中尝试获取该监听者对应的所有监听方法,如果没有的话就遍历该类的方法进行获取,并添加到缓存中;
    然后,会遍历上述拿到的方法集合,根据事件的类型(从方法参数得知)和监听者等信息创建一个观察者,并将事件类型-观察者键值对插入到一个一对多映射表中并返回。
    20.0版本register()在EventBus中没有具体的实现,所以我们看下SubscriberRegistry中的register():
     /**
       * 在给定的监听器对象上注册所有订阅者方法。
       */
      void register(Object listener) {
        //获取事件类型-观察者映射表
        Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
        //遍历上述映射表并将新注册的观察者映射表添加到全局的subscribers中
        for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
          Class<?> eventType = entry.getKey();
          Collection<Subscriber> eventMethodsInListener = entry.getValue();
    
          CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
          //如果指定事件对应的观察者列表不存在就创建一个新的
          if (eventSubscribers == null) {
            CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
            eventSubscribers =
                MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
          }
    
          eventSubscribers.addAll(eventMethodsInListener);
        }
      }

    post()方法如下:

    public void post(Object event) {
        // 调用SubscriberRegistry的getSubscribers方法获取该事件对应的全部观察者
        Iterator<Subscriber> eventSubscribers = this.subscribers.getSubscribers(event);
        if (eventSubscribers.hasNext()) {
            // 使用Dispatcher对事件进行分发
            this.dispatcher.dispatch(event, eventSubscribers);
        } else if (!(event instanceof DeadEvent)) {
            this.post(new DeadEvent(this, event));
        }
    }

    post()方法中还是调用SubscriberRegistry中的方法

     /**
       * 获取一个迭代器,该迭代器表示在调用此方法时给定事件的所有订阅者的不变快照。
       */
      Iterator<Subscriber> getSubscribers(Object event) {
        // 获取事件类型的所有父类型和自身构成的集合
        ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass()); //3
    
        List<Iterator<Subscriber>> subscriberIterators =
            Lists.newArrayListWithCapacity(eventTypes.size());
    
        // 遍历上述事件类型,并从subscribers中获取所有的观察者列表
        for (Class<?> eventType : eventTypes) {
          CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
          if (eventSubscribers != null) {
            // eager no-copy snapshot
            subscriberIterators.add(eventSubscribers.iterator());
          }
        }
        return Iterators.concat(subscriberIterators.iterator());
      }

      从EventBus.post()方法可以看出,当我们使用Dispatcher进行事件分发的时候,需要将当前的事件和所有的观察者作为参数传入到方法中。然后,在方法的内部进行分发操作。最终某个监听者的监听方法是使用反射进行触发的,这部分逻辑在Subscriber内部,而Dispatcher是事件分发的方式的策略接口。EventBus中提供了3个默认的Dispatcher实现,分别用于不同场景的事件分发:

    • ImmediateDispatcher:直接在当前线程中遍历所有的观察者并进行事件分发;
    • LegacyAsyncDispatcher:异步方法,存在两个循环,一先一后,前者用于不断往全局的队列中塞入封装的观察者对象,后者用于不断从队列中取出观察者对象进行事件分发;实际上,EventBus有个字类AsyncEventBus就是用该分发器进行事件分发的。
    • PerThreadQueuedDispatcher:这种分发器使用了两个线程局部变量进行控制,当dispatch()方法被调用的时候,会先获取当前线程的观察者队列,并将传入的观察者列表传入到该队列中;然后通过一个布尔类型的线程局部变量,判断当前线程是否正在进行分发操作,如果没有在进行分发操作,就通过遍历上述队列进行事件分发。
    上述三个分发器内部最终都会调用Subscriber的dispatchEvent()方法进行事件分发:
      final void dispatchEvent(final Object event) {
        //使用指定的执行器执行任务
        executor.execute(()->{
                try {
                  //使用反射触发监听方法
                  invokeSubscriberMethod(event);
                } catch (InvocationTargetException e) {
                  //使用EventBus内部的SubscriberExceptionHandler处理异常
                  bus.handleSubscriberException(e.getCause(), context(event));
                }
              }
            });
      }
    上述方法中的executor是执行器,它是通过EventBus获取到的;处理异常的SubscriberExceptionHandler类型也是通过EventBus获取到的。(原来EventBus中的构造方法中的字段是在这里用到的!
    参考:https://www.jianshu.com/p/4bddd45a8e7a
  • 相关阅读:
    SpringBoot之旅第三篇-日志
    SpringBoot之旅第二篇-配置
    SpringBoot之旅第一篇-初探
    394. 字符串解码
    1190. 反转每对括号间的子串
    921. 使括号有效的最少添加
    Leetcode 1171. 从链表中删去总和值为零的连续节点
    设计模式之过滤器模式——Java语言描述
    MySQL查询执行的基础
    设计模式之桥接模式——Java语言描述
  • 原文地址:https://www.cnblogs.com/shuaixiaobing/p/13577292.html
Copyright © 2011-2022 走看看