zoukankan      html  css  js  c++  java
  • 【一起学设计模式】观察者模式实战:真实项目中屡试不爽的瓜娃EventBus到底如何实现观察者模式的?

    申明

    本文章首发自本人公众号:壹枝花算不算浪漫,如若转载请标明来源!

    感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫

    22.jpg22.jpg

    前言

    之前出过一个设计模式的系列文章,这些文章和其他讲设计模式的文章 有些不同

    文章没有拘泥于讲解设计模式的原理,更多的是梳理工作中实际用到的一些设计模式,并提取出对应业务模型进行总结,回顾下之前的一些文章:

    【一起学设计模式】策略模式实战一:基于消息发送的策略模式实战

    【一起学习设计模式】策略模式实战二:配合注解 干掉业务代码中冗余的if else…

    【一起学设计模式】访问者模式实战:权限管理树删除节点操作

    【一起学设计模式】命令模式+模板方法+工厂方法实战: 如何优雅的更新商品库存…

    【一起学设计模式】状态模式+装饰器模式+简单工厂模式实战:(一)提交个订单我到底经历了什么鬼?

    【一起学设计模式】中介者模式+观察者模式+备忘录模式实战:(二)提交个订单我到底经历了什么鬼?

    所以:任何脱离实际业务的设计模式都是耍流氓

    image.pngimage.png

    业务梳理

    最近项目在对接神策埋点相关需求。
    有一个场景是:产品自定义了很多埋点事件,有些事件需要后端进行一定的业务处理,然后进行埋点。

    业务其实很简单,就是前端请求到后端,后端进行一定业务处理组装后将数据发送到神策后台。

    说到这里是不是还有小伙伴没听懂??那么就画张图吧:

    image.pngimage.png

    这里只是简单的举个栗子,说明下业务场景。

    针对这个业务场景,最开始的想法是尽量少的侵入原有业务方法,所以这里选择使用观察者模式。

    原有业务场景中加入发布事件的能力,然后订阅者自己消费进行埋点数据逻辑。做到尽可能的业务解耦。

    观察者模式

    这里还是要多啰嗦几句,说下观察者模式原理:

    所谓的观察者模式也称为发布订阅模式,这里肯定至少存在两种角色:发布者/订阅者

    接着看下UML图:

    image.pngimage.png

    所涉及到的角色如下:      

    • 抽象主题(Subject):提供接口,可以增加和剔除观察者对象。一般用抽象类或者接口实现。
    • 抽象观察者(Observer):提供接口,在得到主题的通知时更新自己。一般用抽象类或者接口实现。
    • 具体主题(ConcreteSubject):将有关状态存入具体观察者,在具体主题的内部状态发生变化时,给所有注册过的观察者发出通知。一般是具体子类实现。
    • 具体观察者(ConcreteObserver):存储与主题的状态自恰的状态。具体观察者角色实现抽象观察者角色所要求的更新接口,以便使本身的状态与主题的状态 像协调。如果需要,具体观察者角色可以保持一个指向具体主题对象的引用    

    在上述类图中,ConcreteSubject中有一个存储Observer的列表,这意味着ConcreteSubject并不需要知道引用了哪些ConcreteObserver,只要实现(继承)了Observer的对象都可以存到该列表中。在需要的时候调用Observer的update方法。

    话不多说,我们自己动手来模拟一个简单的观察者模式:

    /**
     * 观察者模式测试代码
     *
     * @author wangmeng
     * @date 2020/4/25 19:38
     */
    public class ObserverTest {

        public static void main(String[] args) {
            Subject subject = new Subject();
            Task1 task1 = new Task1();
            subject.addObserver(task1);

            Task2 task2 = new Task2();
            subject.addObserver(task2);

            subject.notifyObserver("xxxx");
        }
    }

    class Subject {
        // observer集合
        private List<Observer> observerList = Lists.newArrayList();

        // add
        public void addObserver(Observer observer) {
            observerList.add(observer);
        }

        // remove
        public void removeObserver(Observer observer) {
            observerList.remove(observer);
        }

        // 通知观察者
        public void notifyObserver(Object object) {
            for (Observer item : observerList) {
                item.update(object);
            }
        }
    }

    interface Observer {
        void update(Object object);
    }

    class Task1 implements Observer {
        @Override
        public void update(Object object) {
            System.out.println("task1 received: " + object);
        }
    }

    class Task2 implements Observer {
        @Override
        public void update(Object object) {
            System.out.println("task2 received: " + object);
        }
    }

    针对于观察者模式,JDK和Spring也有一些内置实现,具体可以参见:JDK中Observable,Spring中ApplicationListener

    这里就不再赘述了,想深入了解的小伙伴可执行谷歌,毕竟我们这次文章的重点还是Guava中观察者模式的使用实现原理。

    业务代码示例

    这里使用的是Guava中自带的EventBus组件,我们继续用取消订单业务场景做示例,这里抽离了部分代码,只展示核心的一些代码:

    1. 事件总线服务

    /**
     * 事件总线服务
     *
     * @author wangmeng
     * @date 2020/4/14
     */
    @Service
    public class EventBusService {
        /**
        * 订阅者异步执行器,如果同步可以使用EventBus
        **/
        @Autowired
        private AsyncEventBus asyncEventBus;
        /**
        * 订阅者集合,里面方法通过@Subscribe进行事件订阅
        **/
        @Autowired
        private EventListener eventListener;

        /**
        * 注册方法,启动的时候将所有的订阅者进行注册
        **/
        @PostConstruct
        public void register() {
            asyncEventBus.register(eventListener);
        }

        /**
         * 消息投递,根据入参自动投递到对应的方法中去消费。
         */
        public void post(Object object) {
            asyncEventBus.post(object);
        }
    }

    这里使用了异步的实现方式,如果使用同步的方式可以将AsyncEventBus改为EventBus

    2. 异步AsyncEventBus配置:

    /**
     * AsyncEventBus 线程池配置
     *
     * @author wangmeng
     * @date 2020/04/14
     */
    @Configuration
    public class EventBusConfiguration {

        /** Set the ThreadPoolExecutor's core pool size. */
        private int corePoolSize = 10;
        /** Set the ThreadPoolExecutor's maximum pool size. */
        private int maxPoolSize = 30;
        /** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */
        private int queueCapacity = 500;

        @Bean
        public AsyncEventBus asyncEventBus() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(corePoolSize);
            executor.setMaxPoolSize(maxPoolSize);
            executor.setQueueCapacity(queueCapacity);
            executor.setThreadNamePrefix("jv-mall-user-sensorsData:");
            executor.initialize();
            return new AsyncEventBus(executor);
        }
    }

    线程池数据大家可以随意配置,这里只是参考。

    3. 观察者实现

    /**
     * 观察者代码
     *
     * @author wangmeng
     * @date 2020/4/14
     */
    @Service
    @Slf4j
    public class EventListener {

        @Autowired
        private SensorsDataManager sensorsDataManager;

        /**
         * 观察者处理数据埋点方法
         */
        @Subscribe
        @AllowConcurrentEvents
        public void handleCancelOrderEvent(TrackCancelOrderDTO cancelOrderDTO) {
            Map<String, Object> propertyMap = this.buildBasicProperties(cancelOrderDTO);
            propertyMap.put(SensorsDataConstants.ORDER_ID, registerDTO.getOrderId());
            // 各种属性赋值,这里只截取一点
            propertyMap.put(SensorsDataConstants.PROPERTY_IS_SUCCESS, registerDTO.getIsSuccess());
            propertyMap.put(SensorsDataConstants.PROPERTY_FAIL_REASON, registerDTO.getFailReason());
            sensorsDataManager.send(registerDTO.getUserId(), SensorsEventConstants.EVENT_CANCEL_ORDER, propertyMap);
        }
    }

    这个EventLister 是我们在上面EventBusService中注册的类,观察者方法上面添加@Subscribe即可对发布者的数据进行订阅。

    @AllowConcurrentEvents注解字面意思是允许事件并发执行,这个原理后面会讲。

    PS:这里sensorsDataManager是封装生成埋点相关的类。

    发布者实现

    在业务逻辑中加入埋点数据发布的方法:

    @Autowired
    private EventBusService eventBusService;

    public void cancelOrder(Long orderId) {

        // 业务逻辑执行

        // 埋点数据
        TrackCancelOrderDTO trackCancelOrderDTO = trackBaseOrderInfoManager.buildTrackBaseOrderDTO(orderInfoDO, context.getOrderParentInfoDO(), TrackCancelOrderDTO.class);
        trackCancelOrderDTO.setCancelReason(orderInfoDO.getCancelReason());
        trackCancelOrderDTO.setCancelTime(orderInfoDO.getCancelTime());
        trackCancelOrderDTO.setPlatformName(SensorsDataConstants.PLATFORM_APP);
        trackCancelOrderDTO.setUserId(orderInfoDO.getUserId().toString());
        eventBusService.post(trackCancelOrderDTO);
    }

    到了这里所有的如何使用EventBus的代码都已经贴出来了,下面就看看具体的源码实现吧

    源码剖析

    事件总线订阅源码实现

    com.google.common.eventbus.SubscriberRegistry#register:

    void register(Object listener) {
        //查找所有订阅者,维护了一个key是事件类型,value是定订阅这个事件类型的订阅者集合的一个map
        Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

        for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
          //获取事件类型
          Class<?> eventType = entry.getKey();
          //获取这个事件类型的订阅者集合
          Collection<Subscriber> eventMethodsInListener = entry.getValue();

          //从缓存中按事件类型查找订阅者集合
          CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

          if (eventSubscribers == null) {
            //从缓存中取不到,更新缓存
            CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
            eventSubscribers =
                MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
          }

          eventSubscribers.addAll(eventMethodsInListener);
        }
      }

    事件和订阅事件的订阅者集合是在com.google.common.eventbus.SubscriberRegistry这里维护的:

    private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
        Maps.newConcurrentMap();

    到这里,订阅者已经准备好了,准备接受事件了。通过debug 看下subscribers中数据:

    image.pngimage.png

    发布事件源码实现

    com.google.common.eventbus.EventBus#post

    public void post(Object event) {
        //获取事件的订阅者集合
        Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
        if (eventSubscribers.hasNext()) {
          //转发事件
          dispatcher.dispatch(event, eventSubscribers);
          //如果不是死亡事件,重新包装成死亡事件重新发布
        } else if (!(event instanceof DeadEvent)) {
          // the event had no subscribers and was not itself a DeadEvent
          post(new DeadEvent(this, event));
        }
    }

    Iterator<Subscriber> getSubscribers(Object event) {
        //获取事件类型类的超类集合
        ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());

        List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size());

        for (Class<?> eventType : eventTypes) {
          //获取事件类型的订阅者集合
          CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
          if (eventSubscribers != null) {
            // eager no-copy snapshot
            subscriberIterators.add(eventSubscribers.iterator());
          }
        }

        return Iterators.concat(subscriberIterators.iterator());
      }

    事件转发器有三种实现:

    image.pngimage.png

    第一种是立即转发,实时性比较高,其他两种都是队列实现。

    我们使用的是AsyncEventBus,其中指定的事件转发器是:LegacyAsyncDispatcher,接着看看其中的dispatch()方法的实现:

    com.google.common.eventbus.Dispatcher.LegacyAsyncDispatcher

    private static final class LegacyAsyncDispatcher extends Dispatcher {

        private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
                Queues.newConcurrentLinkedQueue();

        @Override
        void dispatch(Object event, Iterator<Subscriber> subscribers) {
          checkNotNull(event);
          while (subscribers.hasNext()) {
            // 先将所有发布的事件放入队列中
            queue.add(new EventWithSubscriber(event, subscribers.next()));
          }

          EventWithSubscriber e;
          while ((e = queue.poll()) != null) {
            // 消费队列中的消息
            e.subscriber.dispatchEvent(e.event);
          }
        }
    }

    接着看subscriber.dispatchEvent()方法实现:

    final void dispatchEvent(final Object event) {
        executor.execute(
            new Runnable() {
              @Override
              public void run() {
                try {
                  invokeSubscriberMethod(event);
                } catch (InvocationTargetException e) {
                  bus.handleSubscriberException(e.getCause(), context(event));
                }
              }
            });
    }

    执行订阅方法都是异步实现,我们在上面初始化AsyncEventBus的时候有为其构造线程池,就是在这里使用的。

    在看invokeSubscriberMethod()具体代码之前,我们先来看看@AllowConcurrentEvents,我们在订阅方法上有加这个注解,来看看这个注解的作用吧:

    image.pngimage.png

    在我们执行register()方法的时候,会为每一个订阅者构造一个Subscriber对象,如果配置了@AllowConcurrentEvents注解,就会为它配置一个允许并发的Subscriber对象。

    class Subscriber {

      /**
       * Creates a {@code Subscriber} for {@code method} on {@code listener}.
       */
      static Subscriber create(EventBus bus, Object listener, Method method) {
        return isDeclaredThreadSafe(method)
            ? new Subscriber(bus, listener, method)
            : new SynchronizedSubscriber(bus, listener, method);
      }

      private static boolean isDeclaredThreadSafe(Method method) {
        // 如果有AllowConcurrentEvents注解,则返回true
        return method.getAnnotation(AllowConcurrentEvents.class) != null;
      }

      @VisibleForTesting
      void invokeSubscriberMethod(Object event) throws InvocationTargetException {
        try {
          // 通过反射直接执行订阅者中方法
          method.invoke(target, checkNotNull(event));
        } catch (IllegalArgumentException e) {
          throw new Error("Method rejected target/argument: " + event, e);
        } catch (IllegalAccessException e) {
          throw new Error("Method became inaccessible: " + event, e);
        } catch (InvocationTargetException e) {
          if (e.getCause() instanceof Error) {
            throw (Error) e.getCause();
          }
          throw e;
        }
      }

      @VisibleForTesting
      static final class SynchronizedSubscriber extends Subscriber {

        private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
          super(bus, target, method);
        }

        @Override
        void invokeSubscriberMethod(Object event) throws InvocationTargetException {
          // SynchronizedSubscriber不支持并发,这里用synchronized修饰,所有执行都串行化执行
          synchronized (this) {
            super.invokeSubscriberMethod(event);
          }
        }
      }
    }

    这里面包含了invokeSubscriberMethod()方法的实现原理,其实就是通过反射去执行订阅者中的方法。

    还有就是如果没有添加注解,就会走SynchronizedSubscriberinvokeSubscriberMethod()逻辑,添加了synchronized关键字,不支持并发执行。

    总结

    这里主要是整理了guava 中实现观察者模式的使用及原理。

    大家如果有类似的业务场景也可以使用到自己项目中。

  • 相关阅读:
    python 中多个装饰器的执行顺序:
    Python基础思维导图
    怎样写出靠谱的RESUTful API接口?
    python中yield()的用法详解
    Flask思维导图
    Django的设计模式
    MySQL
    MySQL
    Linux
    zsh oh-my-zsh 插件推荐
  • 原文地址:https://www.cnblogs.com/wang-meng/p/12777546.html
Copyright © 2011-2022 走看看