zoukankan      html  css  js  c++  java
  • Google的Guava包下的EventBus源码解析

    EventBus解析

    1、EventBus的构造方法

    • 使用EventBus作为具体实现类
    • 使用AsyncEventBus作为实现类

    (1)使用EventBus作为实现类,其构造方法有:

    public EventBus() {
            this("default");
        }
    
    public EventBus(String identifier) {
            this(identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), EventBus.LoggingHandler.INSTANCE);
    }
    
    public EventBus(SubscriberExceptionHandler exceptionHandler) {
            this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
    }
    

    (2)使用AsyncEventBus作为实现类,其构造方法为:

    public AsyncEventBus(String identifier, Executor executor) {
            super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
        }
        public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
            super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
        }
        public AsyncEventBus(Executor executor) {
            super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
        }
    

    统一调用的构造方法为:

    EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) {
            this.subscribers = new SubscriberRegistry(this);
            this.identifier = (String)Preconditions.checkNotNull(identifier);
            this.executor = (Executor)Preconditions.checkNotNull(executor);
            this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher);
            this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler);
        }
    

    参数的意义分别是:

    identifier:类似当前EventBus对象的别名,可以描述该EventBus的用途,默认为“default”

    executor:使用异步执行时传入的自定义线程池

    dispatcher:指定分发消息的模式

    exceptionHandler:处理订阅消息异常的方法

    subscribers:注册订阅者的类

    2、两种实现方式创建对象时的区别

    (1)EventBus实现:

    • 默认使用default作为identifier,
    • 执行器使用MoreExecutors.directExecutor()
    • dispatcher:使用Dispatcher.perThreadDispatchQueue()队列
    • exceptionHandler:默认使用EventBus提供的LoggingHandler.INSTANCE,如果有传入参数,就是用参数

    (2)AsyncEventBus实现:

    • 默认使用default作为identifier
    • 执行器使用自定义的对象
    • dispatcher:使用Dispatcher.legacyAsync()类型
    • exceptionHandler:默认使用LoggingHandler.INSTANCE,如果有传入参数,就是用参数

    (3)Dispatcher调度器:

    eventbus包下的Dispatcher类提供了三种类型的调度器,分别为:

    • PerThreadQueuedDispatcher
    • LegacyAsyncDispatcher
    • ImmediateDispatcher

    (4)Executor执行器

    • EventBus默认提供的是DirectExecutor,单线程的执行器

    image-20210423180051368

    3、注册对象到EventBus

    3.1、注册对象

    image-20210422143552452

    使用this.subscribers对象的register方法注册,此处的subscribers对象为SubscriberRegistry类。

    3.2、进入SubscriberRegistry类,register方法

    image-20210422143928515

    3.2.1、查找所有订阅的方法

    使用Muitmap集合存储一个对象下有哪些方法订阅了,具体实现findAllSubscribers方法,该方法内部如下:

    image-20210422151612206

    (1)该方法内部使用getAnnotatedMethods方法获取clazz及其多级父类以及实现的接口中所有方法上有@Subscribe注解的方法。方法具体实现如下:

    image-20210422151922491

    private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
        //获取到传递的class对象的类以及父类以及实现的接口
        Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
        //创建一个Map集合
        Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
        //遍历得到的class对象
        for (Class<?> supertype : supertypes) {
            //获取class对象的所有方法
          for (Method method : supertype.getDeclaredMethods()) {
              //如果方法上有Subscribe注解,并且isSynthetic表示方法不是由java编译器生成的
            if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
              // 获取该方法的参数类型
              Class<?>[] parameterTypes = method.getParameterTypes();
               //检查方法的参数只能是1个
              checkArgument(
                  parameterTypes.length == 1,
                  "Method %s has @Subscribe annotation but has %s parameters."
                      + "Subscriber methods must have exactly 1 parameter.",
                  method,
                  parameterTypes.length);
    		//根据方法创建MethodIdentifier对象,其中包含方法名、方法的参数类型
              MethodIdentifier ident = new MethodIdentifier(method);
                //如果map集合中不包含该对象,就将ident和method对象存储到identifiers的map集合中
              if (!identifiers.containsKey(ident)) {
                identifiers.put(ident, method);
              }
            }
          }
        }
        //返回map集合中的方法
        return ImmutableList.copyOf(identifiers.values());
      }
    

    (2)遍历该对象中添加了@Subscribe注解的方法集合

    image-20210422151612206

    (3)获取该方法的参数类型,将第0个参数类型赋值给eventType

    再获取eventType时,会查找监听对象的父类以及接口,查看有没有订阅方法。

    (4)将参数类型作为key,订阅者作为value,存储到Multimap集合中。value中包含被监听的bus,被监听的bus中的执行器,监听该bus的对象listener,以及监听参数类型的方法。

    image-20210423111538593

    会去检查订阅方法上有没有注解AllowConcurrentEvents,如果有该注解,在使用create方法创建订阅者对象时,订阅者对象使用Subcriber,如果没有注解使用SynchronizedSubscriber对象。这两种对象在真正分发事件时区别才会体现出来。

    示例如下:说明objA中的三个方法都有@Subcribe注解

    订阅图示

    对象继承情况:

    image-20210422172305077

    对象实现接口,接口内使用JDK8提供的default来实现方法,并添加注解,也可以订阅。

    image-20210422174948729

    (5)获取到该对象的所偶遇订阅者后,返回Multimap集合

    3.2.2、将获取到的订阅方法进行缓存

    image-20210422155617555

    (1)遍历得到的Multimap集合

    • 获取Key值,也就是该对象内部订阅方法的参数类型
    • 获取订阅者
    • 根据参数类型,获取全局变量subscribers中已有的所有订阅者

    全局变量:

    image-20210422160251928

    • 判断订阅者set集合如果为空,创建CopyOnWriteArraySet集合,然后使用subscribers.putIfAbsent将订阅类型(参数类型)和set集合存储进去。使用MoreObjects的方法校验第一个参数返回如果是null,就是用第二个参数,如果第二个newSet参数还是null,就会报空指针异常,会返回一个空的set集合。
    • 将该订阅类型对应的订阅者保存到eventSubscribers中,也就是保存到了全局变量subscribers中。

    4、取消对象注册到EventBus

    4.1、取消注册对象

    image-20210422163915014

    当前EventBus对象中的subscribers是SubscriberRegistry类的对象,执行该类中的unregister方法

    4.2、进入SubscriberRegistry类,unregister方法

    image-20210422164419672

    4.2.1、获取该对象中的订阅方法

    使用findAllSubscribers方法,与注册对象中的方法相同,都是查找该类以及其多级父类中的订阅类型和方法。

    4.2.2、遍历获取到的订阅方法

    (1)获取到集合中的key,也就是该对象中订阅的类型(方法上的参数类型)

    (2)获取到集合中key对应的value,也就是该对象中的订阅方法。

    (3)获取到全局变量subscribers中缓存的数据,赋值给currentSubscribers

    (4)如果currentSubscribers为null,则抛出异常,如果移除该对象中的所有订阅者返回结果为false,也抛出异常,如果为true,则正常移除。

    5、发送消息

    5.1、EventBus中的post方法

    image-20210422165443897

    5.1.1、获取该消息关联的类型所有订阅者

    通过subscribers(SubscriberRegistry类)中的全局变量获取订阅该事件以及该事件父类和接口的所有方法。

    image-20210422170200851

    获取传递的event事件的类、父类以及实现的接口。

    例如:如果发送的是MsgA这个消息,那么就会找到Msg类,获取到的集合中就包含了MsgA类型和Msg类型,所有订阅了这两个类型的都会接收到该消息。接口也是一样的(MsgA和MsgB都实现了Msg接口)。

    image-20210422181758196

    下图中就描述了发送的消息在实现了接口的情况。

    image-20210422180217912

    如图中所示,MsgA和MsgB实现了Msg接口,如果有订阅者订阅的类型(参数类型)是Msg,那么发送的时候不管发送MsgA还是MsgB,订阅Msg的方法都可以接收到。

    下图中表示发送的消息有父类的情况:

    image-20210422181052932

    如图中所示,MsgA和MsgB继承了Msg类,如果有订阅者订阅的类型(参数类型)是Msg,那么发送的时候不管发送MsgA还是MsgB,订阅Msg的方法都可以接收到。如果发送的是Msg类,那就只有订阅了Msg类的方法可以接收到。

    总结:发送消息的时候,会查找该消息的父类以及接口,查看有没有订阅的方法,如果有就会都发送一次。

    5.1.2、使用dispacther向订阅者发送

    image-20210422182623424

    根据构造方法调度器使用了两种类型,分别是:

    • PerThreadQueuedDispatcher
    • LegacyAsyncDispatcher
    (1)默认PerThreadQueuedDispatcher调度器的实现

    image-20210423100835082

    • 获取ThreadLocal中创建的Queue队列,如果已创建,就会获取当前线程对应的Queue队列,初始化一个ArrayDeque队列,ArrayDeque是一个双端队列,即可以实现队列的先进先出,也可以实现栈的先进后出。它是线程不安全的,而且不允许有null值。它是可以自动扩容的循环数组,每次扩容都是2的n次方,初始大小为16.

    image-20210423101456082

    • 通过offer将事件以及订阅者存储到队列尾部
    • 如果dispatching返回为false,说明没有在分发事件,将dispatching设置为true,表示正在分发事件
    • 循环获取队列头部的事件(先进先出),然后再循环获取事件对应的订阅者,通过订阅者Subscriber对象的dispatchEvent方法发送event事件

    image-20210423103414482

    • 使用executor执行器通过反射去执行监听的方法。默认执行器为MoreExecutors.directExecutor(),直接发送。
    • 订阅者对象中包含了(注册的对象实例target,对象实例中监听的方法method),通过反射去invoke,因为订阅者对象根据订阅方法上有没有添加AllowConcurrentEvents注解分为两种,SynchronizedSubscriber和Subscriber,前者是后者的子类,重写了方法invokeSubscriberMethod方法。
    • 在执行时,如果订阅方法标注了AllowConcurrentEvents注解,使用Subscriber中的方法,如果没有标注注解,则使用SynchronizedSubscriber中的invokeSubscriberMethod。下图为Subscriber的方法

    image-20210423103645944

    下图为SynchronizedSubscriber中的方法,在原有基础上,使用synchronized锁住该对象,然后去执行。

    image-20210423113252073

    两者的区别就是:如果添加了注解,那就直接使用Subscriber中的方法,如果没有添加注解,则使用加锁的方法去执行。

    (2)LegacyAsyncDispatcher调度器的实现

    image-20210423105344770

    • 在初始化时,创建了一个ConcurrentLinkedQueue队列,内部存储EventWithSubscriber对象。
    • 在dispatch方法中,遍历传递的订阅者,使用event和订阅者构建EventWithSubscriber对象,存储到集合queue的尾部。
    • 循环取队列头部的对象,使用订阅者的dispatchEvent方法发送事件event。
    • 发送事件时,根据注册时检测到的是否添加注解,分为加锁执行方法和不加锁执行方法,内部逻辑与PerThreadQueuedDispatcher相同。

    5.1.3、post->DeadEvent

    在缓存中找不到订阅者并且它本身不是一个DeadEvent事件时,就会发送一个DeadEvent。如果找不到DeadEvent事件的订阅者,就会不进行处理。

    6、流程总结

    (1)两种创建方式对比

    类型 EventBus AsyncEventBus
    identifier 默认“default” 默认“default”
    executor DirectExecutor 自定义
    dispatcher PerThreadQueuedDispatcher LegacyAsyncDispatcher
    subscribers SubscriberRegistry SubscriberRegistry
    exceptionHandler 默认LoggingHandler 默认LoggingHandler

    两种方式的相同点:

    • 注册订阅者和取消订阅者逻辑都是相同的
    • 根据订阅方法有没有添加注解决定执行订阅方法的方式(加锁或者不加锁)

    两种方式的区别:

    • EventBus使用默认的DirectExecutor,内部使用单线程去执行任务
    • AsyncEventBus使用传递的Executor,去执行,如果传递的是Single线程,那和EventBus就没什么区别。
    • EventBus的分发模式使用的是ArrayDeque双端队列,先存入然后再取出执行,不是线程安全的集合,通过ThredLocal来在线程内部维护ArrayDeque队列。
    • AsyncEventBus使用的是ConcurrentLinkedQueue,同样是先存入然后取出,支持多线程同时访问。

    (2)方法执行流程

    image-20210423142557966

  • 相关阅读:
    性能测试——Jmeter基本用法概述
    Postman-newman基础用法
    数据库基础总结
    性能测试概述
    pytest+allure生成测试报告
    CSS入门基础
    HTML基础知识总结
    robotframework:无法导入资源Not a valid file or directory to import
    robotframework及官方编辑器RIDE的安装与配置
    常用排序算法
  • 原文地址:https://www.cnblogs.com/YpfBolg/p/14695110.html
Copyright © 2011-2022 走看看