zoukankan      html  css  js  c++  java
  • 从spring源码汲取营养:模仿spring事件发布机制,解耦业务代码

    前言

    最近在项目中做了一项优化,对业务代码进行解耦。我们部门做的是警用系统,通俗的说,可理解为110报警。一条警情,会先后经过接警员、处警调度员、一线警员,警情是需要记录每一步的日志,是要可追溯的,比如报警人张小三在2019-12-02 00:02:01时间报警,接警员A在1分钟后,将该警情记录完成,并分派给处警调度员B,调度员B在5分钟后,分派给一线警员C,C赶到现场后,花了1个小时处理完成。

    这中间,每一个接口,需要做的事情,可能就包括了:警情日志记录;警员当前任务数统计,包括待处置的任务和已经处置完成的任务;我们其实还有一个操作,就是发mq,去通知其他相关人,比如接警员A接警完成后,要发mq通知其主管。

    以前的代码可能是这样的:

    ## 接口1里, 接收警情service里完成以下操作
    void 接收警情(xxxReqVo reqVo){
      1:写库
      2:记录警情跟踪日志
      3:增加当前接警员的接警数
      4:发mq通知其他相关人
    }
    
    ##接口2里,分派警情的service里完成以下操作
    void 分派警情(xxxReqVo reqVo){
      1:写库
      2:记录警情跟踪日志
      3:增加当前处警调度警员的处警数
      4:发mq通知其他相关人
    }
    

    这样的问题是什么呢?

    1. 在每一个相关接口里,都要“显式”调用:记录跟踪日志的相关方法、统计相关的方法、发mq相关的方法;但凡有一个地方忘记了,都会导致问题,比如统计数量不准,mq忘发,跟踪日志遗漏等。
    2. 业务逻辑和这类通用业务揉在一起,假设下次又需要给报警人发个短信,岂不是又得去改核心代码?这不符合我们“对修改关闭,对扩展开放”的开闭原则啊;假设脑残的产品经理,这次说要给报警人发短信,过两天又不要了,难道每个接口,挨个挨个改吗,想想都想打死产品经理,但是这个又犯法,还是想想其他办法?

    这个问题,我们可以用类似mq的方法来解决,即,发送消息,各个消费者去消费。一般,mq的方式适用于微服务之间,而我们这里,将使用事件-发布机制来解决这个问题。
    源码地址(直接dubug跟一下,很简单,比看文章来得快):
    https://gitee.com/ckl111/spring-event-publish-demo

    先说说ApplicationListener

    spring boot之前的spring 时代,想必一些同学用过org.springframework.context.ApplicationListener,正好我手里有一个老项目,也用到了这个东西,我就拿这个举个例子:

    在我们的项目中,需要在启动后,初始化一些东西,比如预热缓存,最早的代码呢,可能是大家各自实现org.springframework.beans.factory.InitializingBean,但是这样呢,初始化代码散落在各个service中;还有一些直接使用@PostContruct注解,然后在对应方法里去完成一些初始化操作。但是总体来说,这些方式,在spring的启动过程中,被调用的时机比较靠前,有些候某些bean可能还没初始化完成,而导致一些奇怪的问题。

    所以,我们后来统一去掉了这些初始化代码,全部采用以下机制来实现:

    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextRefreshedEvent;
    
    @Service
    public class InitRunner implements ApplicationListener<ContextRefreshedEvent> {
    
        @Autowired
        private InitService initService;
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
            //root application context,因为是web项目,
            if (contextRefreshedEvent.getApplicationContext().getParent() == null) {
                  initService.init();
            }
     }
    

    在这个类中,我们实现了org.springframework.context.ApplicationListener<ContextRefreshedEvent> ,这个 listener的定义如下:

    public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
    
    	/**
    	 * Handle an application event.
    	 * @param event the event to respond to
    	 */
    	void onApplicationEvent(E event);
    
    }
    
    

    接口 EventListener 是 jdk 的一个 marker interface:

    package java.util;
    
    /**
    
    - A tagging interface that all event listener interfaces must extend.
    - @since JDK1.1
      */
      public interface EventListener {
      }
    	
    

    我们在实现listener时,指定了本listener感兴趣的事件:ContextRefreshedEvent,这个事件的类继承关系如下:

    那么,这个事件是什么意思呢?

    /**
     * Event raised when an {@code ApplicationContext} gets initialized or refreshed.
     *
     * @author Juergen Hoeller
     * @since 04.03.2003
     * @see ContextClosedEvent
     */
    @SuppressWarnings("serial")
    public class ContextRefreshedEvent extends ApplicationContextEvent {
    
    	/**
    	 * Create a new ContextRefreshedEvent.
    	 * @param source the {@code ApplicationContext} that has been initialized
    	 * or refreshed (must not be {@code null})
    	 */
    	public ContextRefreshedEvent(ApplicationContext source) {
    		super(source);
    	}
    
    }
    

    注释说:Event raised when an {@code ApplicationContext} gets initialized or refreshed.,那么意思就是,该事件,是在上下文初始化完成后被发布。

    这样的话,就能保证,在我们listener监听到这个事件的时候,整个应用上下文已经可以使用了。

    一览Spring事件监听机制

    我们再通过debug,来看看其真实的调用时机:

    上图红框处,对spring上下文进行refresh,refresh就是spring 最核心的部分了,基本上,看懂了这个函数,就懂了一半:

    // Prepare this context for refreshing.
    prepareRefresh();
    
    // Tell the subclass to refresh the internal bean factory.
    ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
    
    // Prepare the bean factory for use in this context. 
    // 实例化beanFactory
    prepareBeanFactory(beanFactory);
    
    try {
      // Allows post-processing of the bean factory in context subclasses.
      // 对beanFactory进行处理
      postProcessBeanFactory(beanFactory);
    
      // Invoke factory processors registered as beans in the context.
      // BeanFactoryPostProcessor开始作用的地方,这里会调用所有的beanFactory后置处理器
      invokeBeanFactoryPostProcessors(beanFactory);
    
      // Register bean processors that intercept bean creation.
      // 注册 bean的后置处理器到beanFactory,注意,截止目前,还没开始实例化bean(除了少数几个内部bean)
      registerBeanPostProcessors(beanFactory);
    
      // Initialize message source for this context. 
      // 注册国际化相关bean
      initMessageSource();
    
      // Initialize event multicaster for this context.
      // 注册事件发布器,这个和本文主题大有关系
      initApplicationEventMulticaster();
    
      // Initialize other special beans in specific context subclasses.
      //注意上面这行注释,这个类是交给子类覆盖的,比如,在 org.springframework.web.context.support.AbstractRefreshableWebApplicationContext中,实例化了	org.springframework.ui.context.ThemeSource
      onRefresh();
    
      // Check for listener beans and register them.
      // 从spring容器上下文中,查找ApplicationListener类型的监听器,添加到前两步,初始化的事件发布器中
      registerListeners();
    
      // Instantiate all remaining (non-lazy-init) singletons.
      //注意:截止到目前为止,beanFactory里面基本还是空空如也,没有bean,只有BeanDefinition,在这一步才会   //根据那些BeanDefinition来实例化那些:非lazy-init的bean
      finishBeanFactoryInitialization(beanFactory);
    
      // Last step: publish corresponding event.
      // 发布:容器完成初始化的事件
      finishRefresh();
    }
    

    上面基本都加了注释,比较容易懂,需要重点关注的是:

    1. 事件发布器初始化
    initApplicationEventMulticaster();
    

    这一步会生成一个org.springframework.context.event.ApplicationEventMulticaster,存储在org.springframework.context.support.AbstractApplicationContext#applicationEventMulticaster

    该事件发布器的接口主要有(去除了无关方法):

        /**
    	 * Add a listener to be notified of all events.
    	 * @param listener the listener to add
    	 */
    	void addApplicationListener(ApplicationListener<?> listener);
    
    	/**
    	 * Multicast the given application event to appropriate listeners.
    	 * <p>Consider using {@link #multicastEvent(ApplicationEvent, ResolvableType)}
    	 * if possible as it provides a better support for generics-based events.
    	 * @param event the event to multicast
    	 */
    	void multicastEvent(ApplicationEvent event);
    

    从上面可以看出,该接口主要是维护监听器ApplicationListener,以及进行事件发布。

    1. 注册监听器

      // 注册listeners
      protected void registerListeners() {
        // Register statically specified listeners first.
        for (ApplicationListener<?> listener : getApplicationListeners()) {
          getApplicationEventMulticaster().addApplicationListener(listener);
        }
      
        // Do not initialize FactoryBeans here: We need to leave all regular beans
        // uninitialized to let post-processors apply to them!
        //这里的这句注释也很魔性,哈哈,侧面说明了,截至目前,beanFactory都是没有bean实例存在的,bean还没   //有实例化
        String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
        for (String listenerBeanName : listenerBeanNames) {
          getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
        }
       
      
    2. beanFactory初始化完成后,发布事件

      protected void finishRefresh() {
      
        // Publish the final event.
        // 发布上下文refresh完毕的事件,通知listener
        publishEvent(new ContextRefreshedEvent(this));
      }
      

      这里,publishEvent实现如下:

      	protected void publishEvent(Object event, ResolvableType eventType) {
      		// Decorate event as an ApplicationEvent if necessary
      		ApplicationEvent applicationEvent;
      		if (event instanceof ApplicationEvent) {
      			applicationEvent = (ApplicationEvent) event;
      		}
      
      		getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
      	}
      

    方案1:参考spring,实现自己的事件监听机制,解耦业务代码

    项目源码地址:https://gitee.com/ckl111/spring-event-publish-demo.git

    项目结构如下:

    1. 定义与实现事件发布器

      package com.ceiec.base.event;
      
      import org.springframework.context.event.ApplicationEventMulticaster;
      
      /**
       * desc:
       * 参考spring的设计
       * {@link ApplicationEventMulticaster}
       * @author : ckl
       * creat_date: 2019/11/16 0016
       * creat_time: 10:40
       **/
      public interface ICommonApplicationEventMulticaster {
          /**
           * Add a listener to be notified of all events.
           * @param listener the listener to add
           */
          void addApplicationListener(ICommonApplicationEventListener<?> listener);
      
      
          /**
           * Multicast the given application event to appropriate listeners.
           * @param event the event to multicast
           */
          void multicastEvent(CommonApplicationEvent event);
      
      
      }
      
      
      package com.ceiec.base.event;
      
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.context.event.SimpleApplicationEventMulticaster;
      import org.springframework.stereotype.Component;
      
      import java.util.LinkedHashSet;
      import java.util.Set;
      
      /**
       * desc:
       * 参考spring
       * {@link SimpleApplicationEventMulticaster}
       *
       * @author : ckl
       * creat_date: 2019/11/16 0016
       * creat_time: 10:40
       **/
      @Slf4j
      @Component
      public class CommonApplicationEventMulticaster implements ICommonApplicationEventMulticaster {
          public final Set<ICommonApplicationEventListener<?>> applicationListeners = new LinkedHashSet<>();
      
          @Override
          public void addApplicationListener(ICommonApplicationEventListener<?> listener) {
              applicationListeners.add(listener);
          }
      
          @Override
          public void removeApplicationListener(ICommonApplicationEventListener<?> listener) {
              applicationListeners.remove(listener);
          }
      
          @Override
          public void removeAllListeners() {
              applicationListeners.clear();
          }
      
          @Override
          @SuppressWarnings({"rawtypes", "unchecked"})
          public void multicastEvent(CommonApplicationEvent event) {
              try {
                  for (ICommonApplicationEventListener applicationListener : applicationListeners) {
                      //判断listener是否支持处理该事件,如果支持,则丢给listener处理
                      if (applicationListener.supportsEventType(event)) {
                          applicationListener.onApplicationEvent(event);
                      }
                  }
              } catch (Exception e) {
                  log.error("{}",e);
              }
          }
      }
      
      
    2. 定义listener

      package com.ceiec.base.event;
      
      import org.springframework.context.ApplicationListener;
      
      import java.util.EventListener;
      
      /**
       * desc:
       * 参考spring
       * {@link ApplicationListener}
       * @author : ckl
       * creat_date: 2019/11/16 0016
       * creat_time: 10:45
       **/
      public interface ICommonApplicationEventListener<E extends CommonApplicationEvent> extends EventListener {
      
      
          boolean supportsEventType(E event );
      
      
          /**
           * Handle an application event.
           * @param event the event to respond to
           */
          void onApplicationEvent(E event);
      }
      
      
    3. 定义事件类

      package com.ceiec.base.event;
      
      import lombok.AllArgsConstructor;
      import lombok.Data;
      import lombok.experimental.Accessors;
      
      import java.util.EventObject;
      
      /**
       * desc:
       * 参考spring的设计
       * {@link org.springframework.context.ApplicationEvent}
       **/
      @Data
      @AllArgsConstructor
      @Accessors(chain = true)
      public  class  CommonApplicationEvent<T>{
          /**
           * 事件类型
           */
          private  IEventType iEventType;
      
          /**
           * 事件携带的数据
           */
          private T data;
      
      }
      
      
      
    4. listener的样例实现,下面的实现,用于警情的跟踪日志记录

      package com.ceiec.base.listener;
      
      import com.ceiec.base.applicationevent.SystemEventType;
      import com.ceiec.base.event.CommonApplicationEvent;
      import com.ceiec.base.event.ICommonApplicationEventListener;
      import com.ceiec.base.eventmsg.*;
      import com.ceiec.base.service.IIncidentTraceService;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Component;
      
      /**
       * desc:
       * 接警统计listener
       * @author : ckl
       * creat_date: 2019/11/16 0016
       * creat_time: 9:56
       **/
      @Component
      @Slf4j
      public class IncidentTraceListener implements ICommonApplicationEventListener{
          @Autowired
          private IIncidentTraceService iIncidentTraceService;
      
          @Override
          public boolean supportsEventType(CommonApplicationEvent event) {
              return true;
          }
      
          @Override
          public void onApplicationEvent(CommonApplicationEvent event) {
              log.info("{}",event);
              Object data = event.getData();
              if (event.getIEventType() == SystemEventType.FINISH_INCIDENT_APPEAL) {
                  FinishIncidentDisposalEventMsg msg = (FinishIncidentDisposalEventMsg) data;
                  iIncidentTraceService.finishIncidentDisposal(msg);
              }
          }
      }
      
      
    5. 启动程序时,注册listener到事件发布器

      package com.ceiec.base.init;
      
      import com.ceiec.base.event.CommonApplicationEventMulticaster;
      import com.ceiec.base.event.ICommonApplicationEventListener;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.beans.BeansException;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.boot.CommandLineRunner;
      import org.springframework.context.ApplicationContext;
      import org.springframework.context.ApplicationContextAware;
      import org.springframework.stereotype.Component;
      
      import java.util.Collection;
      import java.util.Map;
      
      /**
       * desc:
       *
       * @author : ckl
       * creat_date: 2019/11/11 0011
       * creat_time: 15:46
       **/
      @Component
      @Slf4j
      public class InitRunner implements CommandLineRunner,ApplicationContextAware {
      
      
          private ApplicationContext applicationContext;
      
          @Autowired
          private CommonApplicationEventMulticaster commonApplicationEventMulticaster;
      
          @Override
          public void run(String... args) throws Exception {
              Map<String, ICommonApplicationEventListener> map = applicationContext.getBeansOfType(ICommonApplicationEventListener.class);
              Collection<ICommonApplicationEventListener> listeners = map.values();
              for (ICommonApplicationEventListener listener : listeners) {
                  /**
                   * 注册事件listener到事件发布器
                   */
                  log.info("register listener:{}",listener);
                  commonApplicationEventMulticaster.addApplicationListener(listener);
              }
      
          }
      
          @Override
          public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
              this.applicationContext = applicationContext;
          }
      }
      
      
    6. 定义endpoint,在service中进行事件发布

      controller:

      @Autowired
      private IIncidentService iIncidentService;
      
      @RequestMapping("/test.do")
      public String finishIncident() {
        iIncidentService.finishIncident();
        return "success";
      }
      

      service:

      @Slf4j
      @Service
      public class IIncidentServiceImpl implements IIncidentService {
      
          @Autowired
          private CommonApplicationEventMulticaster commonApplicationEventMulticaster;
      
      
          @Override
          public void finishIncident() {
              FinishIncidentDisposalEventMsg msg = new FinishIncidentDisposalEventMsg();
              msg.setIncidentInformationId(1111L);
              msg.setDesc("处置完成");
      
              CommonApplicationEvent event = new CommonApplicationEvent(SystemEventType.FINISH_INCIDENT_APPEAL,msg);
              commonApplicationEventMulticaster.multicastEvent(event);
          }
      }
      
    7. 效果展示

      启动时,注册listener:

      2019-12-03 16:49:47.477  INFO 493432 --- [           main] com.ceiec.base.BootStrap                 : Started BootStrap in 1.436 seconds (JVM running for 2.22)
      2019-12-03 16:49:47.478  INFO 493432 --- [           main] com.ceiec.base.init.InitRunner           : register listener:com.ceiec.base.listener.IncidentStatisticsListener@c6b2dd9
      2019-12-03 16:49:47.479  INFO 493432 --- [           main] com.ceiec.base.init.InitRunner           : register listener:com.ceiec.base.listener.IncidentTraceListener@3f985a86
      2019-12-03 16:49:47.479  INFO 493432 --- [           main] com.ceiec.base.init.InitRunner           : register listener:com.ceiec.base.listener.MqListener@57a2ed35
      

      浏览器中,请求http://localhost:8081/test.do,日志如下:

    方案2:直接使用spring内置的事件发布器,解耦业务代码

    源码:https://gitee.com/ckl111/spring-event-publish-demo/tree/master/spring-event-use-builtin-multicaster

    这部分,比上面的方案相比,少了很多东西,只包含如下部分:

    总的来说,listener直接继承spring的ApplicationListener,事件发布器直接使用spring的org.springframework.context.ApplicationEventPublisher

    核心代码:

    package com.ceiec.base.service.impl;
    
    import com.ceiec.base.applicationevent.SystemEventType;
    import com.ceiec.base.event.CommonApplicationEvent;
    import com.ceiec.base.eventmsg.FinishIncidentDisposalEventMsg;
    import com.ceiec.base.service.IIncidentService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationEvent;
    import org.springframework.context.ApplicationEventPublisher;
    import org.springframework.stereotype.Service;
    
    /**
     * desc:
     * 发布事件的业务代码示例
     * @author : ckl
     * creat_date: 2019/12/2 0002
     * creat_time: 14:27
     **/
    @Slf4j
    @Service
    public class IIncidentServiceImpl implements IIncidentService {
    
        @Autowired
        private ApplicationEventPublisher applicationEventPublisher;
    
    
        @Override
        public void finishIncident() {
            FinishIncidentDisposalEventMsg msg = new FinishIncidentDisposalEventMsg();
            msg.setIncidentInformationId(1111L);
            msg.setDesc("处置完成");
    
            CommonApplicationEvent event = new CommonApplicationEvent(SystemEventType.FINISH_INCIDENT_APPEAL,msg);
            applicationEventPublisher.publishEvent(event);
        }
    }
    
    
    package com.ceiec.base.listener;
    
    import com.ceiec.base.applicationevent.SystemEventType;
    import com.ceiec.base.event.CommonApplicationEvent;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.ApplicationListener;
    import org.springframework.stereotype.Component;
    
    /**
     * desc:
     * 这里,直接继承 spring 的listener
     * @author : ckl
     * creat_date: 2019/11/16 0016
     * creat_time: 9:56
     **/
    @Component
    @Slf4j
    public class IncidentStatisticsListener implements ApplicationListener<CommonApplicationEvent> {
    
    
    
        @Override
        public void onApplicationEvent(CommonApplicationEvent event) {
            log.info("receive event:{}",event);
        }
    }
    
    

    总结

    以上两种都可以用,一个是自己仿的,定制性强一点;一个直接用spring的。大家自由选择即可。

    通过这样的方式,我们的业务代码,可以做到解耦,大体和mq其实是类似的。

  • 相关阅读:
    chrome——关于chrome浏览器的奇葩问题
    vscode——配置终端集成bash和cmd
    AndroidStudio——Android SDK
    Navicat——如何导出所有的查询数据
    mpvue——实现点击数组内的某一元素进行置顶(排序第一)操作
    TP5.x——开启跨域访问
    TP5.x——聊天列表查询
    MarkDowm——语法篇
    写一个整数四则运算的解析器——语法分析部分
    写一个整数四则运算的解析器——词法分析部分
  • 原文地址:https://www.cnblogs.com/grey-wolf/p/11978552.html
Copyright © 2011-2022 走看看