zoukankan      html  css  js  c++  java
  • 【一起学设计模式】观察者模式实战:真实项目中屡试不爽的瓜娃EventBus到底如何实现观察者模式的?

    申明

    本文章首发自本人公众号:壹枝花算不算浪漫,如若转载请标明来源!

    感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫

    22.jpg22.jpg

    前言

    之前出过一个设计模式的系列文章,这些文章和其他讲设计模式的文章 有些不同

    文章没有拘泥于讲解设计模式的原理,更多的是梳理工作中实际用到的一些设计模式,并提取出对应业务模型进行总结,回顾下之前的一些文章:

    【一起学设计模式】策略模式实战一:基于消息发送的策略模式实战

    【一起学习设计模式】策略模式实战二:配合注解 干掉业务代码中冗余的if else…

    【一起学设计模式】访问者模式实战:权限管理树删除节点操作

    【一起学设计模式】命令模式+模板方法+工厂方法实战: 如何优雅的更新商品库存…

    【一起学设计模式】状态模式+装饰器模式+简单工厂模式实战:(一)提交个订单我到底经历了什么鬼?

    【一起学设计模式】中介者模式+观察者模式+备忘录模式实战:(二)提交个订单我到底经历了什么鬼?

    所以:任何脱离实际业务的设计模式都是耍流氓

    image.pngimage.png

    业务梳理

    最近项目在对接神策埋点相关需求。
    有一个场景是:产品自定义了很多埋点事件,有些事件需要后端进行一定的业务处理,然后进行埋点。

    业务其实很简单,就是前端请求到后端,后端进行一定业务处理组装后将数据发送到神策后台。

    说到这里是不是还有小伙伴没听懂??那么就画张图吧:

    image.pngimage.png

    这里只是简单的举个栗子,说明下业务场景。

    针对这个业务场景,最开始的想法是尽量少的侵入原有业务方法,所以这里选择使用观察者模式。

    原有业务场景中加入发布事件的能力,然后订阅者自己消费进行埋点数据逻辑。做到尽可能的业务解耦。

    观察者模式

    这里还是要多啰嗦几句,说下观察者模式原理:

    所谓的观察者模式也称为发布订阅模式,这里肯定至少存在两种角色:发布者/订阅者

    接着看下UML图:

    image.pngimage.png

    所涉及到的角色如下:      

    • 抽象主题(Subject):提供接口,可以增加和剔除观察者对象。一般用抽象类或者接口实现。
    • 抽象观察者(Observer):提供接口,在得到主题的通知时更新自己。一般用抽象类或者接口实现。
    • 具体主题(ConcreteSubject):将有关状态存入具体观察者,在具体主题的内部状态发生变化时,给所有注册过的观察者发出通知。一般是具体子类实现。
    • 具体观察者(ConcreteObserver):存储与主题的状态自恰的状态。具体观察者角色实现抽象观察者角色所要求的更新接口,以便使本身的状态与主题的状态 像协调。如果需要,具体观察者角色可以保持一个指向具体主题对象的引用    

    在上述类图中,ConcreteSubject中有一个存储Observer的列表,这意味着ConcreteSubject并不需要知道引用了哪些ConcreteObserver,只要实现(继承)了Observer的对象都可以存到该列表中。在需要的时候调用Observer的update方法。

    话不多说,我们自己动手来模拟一个简单的观察者模式:

    /**
     * 观察者模式测试代码
     *
     * @author wangmeng
     * @date 2020/4/25 19:38
     */
    public class ObserverTest {

        public static void main(String[] args) {
            Subject subject = new Subject();
            Task1 task1 = new Task1();
            subject.addObserver(task1);

            Task2 task2 = new Task2();
            subject.addObserver(task2);

            subject.notifyObserver("xxxx");
        }
    }

    class Subject {
        // observer集合
        private List<Observer> observerList = Lists.newArrayList();

        // add
        public void addObserver(Observer observer) {
            observerList.add(observer);
        }

        // remove
        public void removeObserver(Observer observer) {
            observerList.remove(observer);
        }

        // 通知观察者
        public void notifyObserver(Object object) {
            for (Observer item : observerList) {
                item.update(object);
            }
        }
    }

    interface Observer {
        void update(Object object);
    }

    class Task1 implements Observer {
        @Override
        public void update(Object object) {
            System.out.println("task1 received: " + object);
        }
    }

    class Task2 implements Observer {
        @Override
        public void update(Object object) {
            System.out.println("task2 received: " + object);
        }
    }

    针对于观察者模式,JDK和Spring也有一些内置实现,具体可以参见:JDK中Observable,Spring中ApplicationListener

    这里就不再赘述了,想深入了解的小伙伴可执行谷歌,毕竟我们这次文章的重点还是Guava中观察者模式的使用实现原理。

    业务代码示例

    这里使用的是Guava中自带的EventBus组件,我们继续用取消订单业务场景做示例,这里抽离了部分代码,只展示核心的一些代码:

    1. 事件总线服务

    /**
     * 事件总线服务
     *
     * @author wangmeng
     * @date 2020/4/14
     */
    @Service
    public class EventBusService {
        /**
        * 订阅者异步执行器,如果同步可以使用EventBus
        **/
        @Autowired
        private AsyncEventBus asyncEventBus;
        /**
        * 订阅者集合,里面方法通过@Subscribe进行事件订阅
        **/
        @Autowired
        private EventListener eventListener;

        /**
        * 注册方法,启动的时候将所有的订阅者进行注册
        **/
        @PostConstruct
        public void register() {
            asyncEventBus.register(eventListener);
        }

        /**
         * 消息投递,根据入参自动投递到对应的方法中去消费。
         */
        public void post(Object object) {
            asyncEventBus.post(object);
        }
    }

    这里使用了异步的实现方式,如果使用同步的方式可以将AsyncEventBus改为EventBus

    2. 异步AsyncEventBus配置:

    /**
     * AsyncEventBus 线程池配置
     *
     * @author wangmeng
     * @date 2020/04/14
     */
    @Configuration
    public class EventBusConfiguration {

        /** Set the ThreadPoolExecutor's core pool size. */
        private int corePoolSize = 10;
        /** Set the ThreadPoolExecutor's maximum pool size. */
        private int maxPoolSize = 30;
        /** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */
        private int queueCapacity = 500;

        @Bean
        public AsyncEventBus asyncEventBus() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(corePoolSize);
            executor.setMaxPoolSize(maxPoolSize);
            executor.setQueueCapacity(queueCapacity);
            executor.setThreadNamePrefix("jv-mall-user-sensorsData:");
            executor.initialize();
            return new AsyncEventBus(executor);
        }
    }

    线程池数据大家可以随意配置,这里只是参考。

    3. 观察者实现

    /**
     * 观察者代码
     *
     * @author wangmeng
     * @date 2020/4/14
     */
    @Service
    @Slf4j
    public class EventListener {

        @Autowired
        private SensorsDataManager sensorsDataManager;

        /**
         * 观察者处理数据埋点方法
         */
        @Subscribe
        @AllowConcurrentEvents
        public void handleCancelOrderEvent(TrackCancelOrderDTO cancelOrderDTO) {
            Map<String, Object> propertyMap = this.buildBasicProperties(cancelOrderDTO);
            propertyMap.put(SensorsDataConstants.ORDER_ID, registerDTO.getOrderId());
            // 各种属性赋值,这里只截取一点
            propertyMap.put(SensorsDataConstants.PROPERTY_IS_SUCCESS, registerDTO.getIsSuccess());
            propertyMap.put(SensorsDataConstants.PROPERTY_FAIL_REASON, registerDTO.getFailReason());
            sensorsDataManager.send(registerDTO.getUserId(), SensorsEventConstants.EVENT_CANCEL_ORDER, propertyMap);
        }
    }

    这个EventLister 是我们在上面EventBusService中注册的类,观察者方法上面添加@Subscribe即可对发布者的数据进行订阅。

    @AllowConcurrentEvents注解字面意思是允许事件并发执行,这个原理后面会讲。

    PS:这里sensorsDataManager是封装生成埋点相关的类。

    发布者实现

    在业务逻辑中加入埋点数据发布的方法:

    @Autowired
    private EventBusService eventBusService;

    public void cancelOrder(Long orderId) {

        // 业务逻辑执行

        // 埋点数据
        TrackCancelOrderDTO trackCancelOrderDTO = trackBaseOrderInfoManager.buildTrackBaseOrderDTO(orderInfoDO, context.getOrderParentInfoDO(), TrackCancelOrderDTO.class);
        trackCancelOrderDTO.setCancelReason(orderInfoDO.getCancelReason());
        trackCancelOrderDTO.setCancelTime(orderInfoDO.getCancelTime());
        trackCancelOrderDTO.setPlatformName(SensorsDataConstants.PLATFORM_APP);
        trackCancelOrderDTO.setUserId(orderInfoDO.getUserId().toString());
        eventBusService.post(trackCancelOrderDTO);
    }

    到了这里所有的如何使用EventBus的代码都已经贴出来了,下面就看看具体的源码实现吧

    源码剖析

    事件总线订阅源码实现

    com.google.common.eventbus.SubscriberRegistry#register:

    void register(Object listener) {
        //查找所有订阅者,维护了一个key是事件类型,value是定订阅这个事件类型的订阅者集合的一个map
        Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

        for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
          //获取事件类型
          Class<?> eventType = entry.getKey();
          //获取这个事件类型的订阅者集合
          Collection<Subscriber> eventMethodsInListener = entry.getValue();

          //从缓存中按事件类型查找订阅者集合
          CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

          if (eventSubscribers == null) {
            //从缓存中取不到,更新缓存
            CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
            eventSubscribers =
                MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
          }

          eventSubscribers.addAll(eventMethodsInListener);
        }
      }

    事件和订阅事件的订阅者集合是在com.google.common.eventbus.SubscriberRegistry这里维护的:

    private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
        Maps.newConcurrentMap();

    到这里,订阅者已经准备好了,准备接受事件了。通过debug 看下subscribers中数据:

    image.pngimage.png

    发布事件源码实现

    com.google.common.eventbus.EventBus#post

    public void post(Object event) {
        //获取事件的订阅者集合
        Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
        if (eventSubscribers.hasNext()) {
          //转发事件
          dispatcher.dispatch(event, eventSubscribers);
          //如果不是死亡事件,重新包装成死亡事件重新发布
        } else if (!(event instanceof DeadEvent)) {
          // the event had no subscribers and was not itself a DeadEvent
          post(new DeadEvent(this, event));
        }
    }

    Iterator<Subscriber> getSubscribers(Object event) {
        //获取事件类型类的超类集合
        ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());

        List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size());

        for (Class<?> eventType : eventTypes) {
          //获取事件类型的订阅者集合
          CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
          if (eventSubscribers != null) {
            // eager no-copy snapshot
            subscriberIterators.add(eventSubscribers.iterator());
          }
        }

        return Iterators.concat(subscriberIterators.iterator());
      }

    事件转发器有三种实现:

    image.pngimage.png

    第一种是立即转发,实时性比较高,其他两种都是队列实现。

    我们使用的是AsyncEventBus,其中指定的事件转发器是:LegacyAsyncDispatcher,接着看看其中的dispatch()方法的实现:

    com.google.common.eventbus.Dispatcher.LegacyAsyncDispatcher

    private static final class LegacyAsyncDispatcher extends Dispatcher {

        private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
                Queues.newConcurrentLinkedQueue();

        @Override
        void dispatch(Object event, Iterator<Subscriber> subscribers) {
          checkNotNull(event);
          while (subscribers.hasNext()) {
            // 先将所有发布的事件放入队列中
            queue.add(new EventWithSubscriber(event, subscribers.next()));
          }

          EventWithSubscriber e;
          while ((e = queue.poll()) != null) {
            // 消费队列中的消息
            e.subscriber.dispatchEvent(e.event);
          }
        }
    }

    接着看subscriber.dispatchEvent()方法实现:

    final void dispatchEvent(final Object event) {
        executor.execute(
            new Runnable() {
              @Override
              public void run() {
                try {
                  invokeSubscriberMethod(event);
                } catch (InvocationTargetException e) {
                  bus.handleSubscriberException(e.getCause(), context(event));
                }
              }
            });
    }

    执行订阅方法都是异步实现,我们在上面初始化AsyncEventBus的时候有为其构造线程池,就是在这里使用的。

    在看invokeSubscriberMethod()具体代码之前,我们先来看看@AllowConcurrentEvents,我们在订阅方法上有加这个注解,来看看这个注解的作用吧:

    image.pngimage.png

    在我们执行register()方法的时候,会为每一个订阅者构造一个Subscriber对象,如果配置了@AllowConcurrentEvents注解,就会为它配置一个允许并发的Subscriber对象。

    class Subscriber {

      /**
       * Creates a {@code Subscriber} for {@code method} on {@code listener}.
       */
      static Subscriber create(EventBus bus, Object listener, Method method) {
        return isDeclaredThreadSafe(method)
            ? new Subscriber(bus, listener, method)
            : new SynchronizedSubscriber(bus, listener, method);
      }

      private static boolean isDeclaredThreadSafe(Method method) {
        // 如果有AllowConcurrentEvents注解,则返回true
        return method.getAnnotation(AllowConcurrentEvents.class) != null;
      }

      @VisibleForTesting
      void invokeSubscriberMethod(Object event) throws InvocationTargetException {
        try {
          // 通过反射直接执行订阅者中方法
          method.invoke(target, checkNotNull(event));
        } catch (IllegalArgumentException e) {
          throw new Error("Method rejected target/argument: " + event, e);
        } catch (IllegalAccessException e) {
          throw new Error("Method became inaccessible: " + event, e);
        } catch (InvocationTargetException e) {
          if (e.getCause() instanceof Error) {
            throw (Error) e.getCause();
          }
          throw e;
        }
      }

      @VisibleForTesting
      static final class SynchronizedSubscriber extends Subscriber {

        private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
          super(bus, target, method);
        }

        @Override
        void invokeSubscriberMethod(Object event) throws InvocationTargetException {
          // SynchronizedSubscriber不支持并发,这里用synchronized修饰,所有执行都串行化执行
          synchronized (this) {
            super.invokeSubscriberMethod(event);
          }
        }
      }
    }

    这里面包含了invokeSubscriberMethod()方法的实现原理,其实就是通过反射去执行订阅者中的方法。

    还有就是如果没有添加注解,就会走SynchronizedSubscriberinvokeSubscriberMethod()逻辑,添加了synchronized关键字,不支持并发执行。

    总结

    这里主要是整理了guava 中实现观察者模式的使用及原理。

    大家如果有类似的业务场景也可以使用到自己项目中。

  • 相关阅读:
    Codeforces Round 546 (Div. 2)
    Codeforces Round 545 (Div. 2)
    Codeforces Round 544(Div. 3)
    牛客小白月赛12
    Codeforces Round 261(Div. 2)
    Codeforces Round 260(Div. 2)
    Codeforces Round 259(Div. 2)
    Codeforces Round 258(Div. 2)
    Codeforces Round 257 (Div. 2)
    《A First Course in Probability》-chaper5-连续型随机变量-随机变量函数的分布
  • 原文地址:https://www.cnblogs.com/wang-meng/p/12777546.html
Copyright © 2011-2022 走看看