zoukankan      html  css  js  c++  java
  • Guava EventBus集成spring

    EventBus 不是通用的消息系统,也不是用来做进程间的通信的,而是在进程内,用于解耦两段直接调用的业务逻辑;

    1、代码结构

    • event:eventbus中流转的事件(消息),包结构按照业务模块在细分(比如应用部署模块就是deployment);
    • subscriber:消费者,和event 是一一对应的,一个event 对应一个消费者,包结构按照业务模块在细分(比如应用部署模块就是deployment);
    • poster:生产者,这边把生产者单独出来是为了收敛入口,这样可以方便的知道有哪些地方在生产消息,按照业务模块分为不同的类(因为生产消息的功能比较单薄);

    2、代码实现

    在applicationContext.xml 中定义好EventBus

    asyncEventBus
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" lazy-init="true">
        <property name="corePoolSize" value="10"/>
        <property name="maxPoolSize" value="50"/>
        <property name="queueCapacity" value="10000"/>
        <property name="keepAliveSeconds" value="300"/>
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
    </bean>
    <bean id="asyncEventBus" class="com.google.common.eventbus.AsyncEventBus">
        <constructor-arg name="executor" ref="taskExecutor"/>
    </bean>

    2.1、标准化subscriber

    所有的subscriber都要实现 BaseSubscriber这个 interface

    BaseSubscriber
    public interface BaseSubscriber<E> {
     
        /**
         * event 处理逻辑入口
         **/
        void subscribe(E event);
    }

    所有的subscriber在类上加上EventBusRegister 这个annotation

    EventBusRegister
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface EventBusRegister {
    }

    实现EventBusAdapter用于自动注册subscriber

    EventBusAdapter
    @Component
    public class EventBusAdapter implements ApplicationContextAware, InitializingBean {
        @Autowired
        private AsyncEventBus asyncEventBus;
     
        private ApplicationContext applicationContext;
     
        @Override
        public void afterPropertiesSet() throws Exception {
            this.applicationContext.getBeansWithAnnotation(EventBusRegister.class).forEach((name, bean) -> {
                asyncEventBus.register(bean);
            });
        }
     
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }

    举个例子

    BuildUpdateSubscriber
    @Component
    @EventBusRegister
    public class BuildUpdateSubscriber implements BaseSubscriber<BuildUpdateEvent> {
        @Autowired
        private BuildService buildService;
     
        @Subscribe
        @Override
        public void subscribe(BuildUpdateEvent event) {
            switch (event.getEventType()) {
                case BUILD_CONNECTED:
                    List<BuildVo> buildVos = (List<BuildVo>) event.getData();
                    buildService.addBuildVosAndTriggerConnectEvent(buildVos);
                    break;
                case BUILD_ADD:
                    BuildVo addedBuildVo = (BuildVo) event.getData();
                    buildService.addBuildVoAndTriggerClientEvent(addedBuildVo);
                    break;
                case BUILD_MODIFY:
                    BuildVo modifiedBuildVo = (BuildVo) event.getData();
                    buildService.modifyBuildVoAndTriggerEvent(modifiedBuildVo);
                    break;
                case BUILD_DELETE:
                    BuildVo deletedBuildVo = (BuildVo) event.getData();
                    buildService.deleteBuildVoAndTriggerClientEvent(deletedBuildVo);
                    break;
                default:
                    // ignore
                    break;
            }
        }
    }

    3、代码实现改进

    前面通过规范代码的包结构、加了一些trick使得我们可以方便的使用eventbus解耦我们的业务逻辑,但是有时候我们需要的bean被注册 的前后做一些业务逻辑,所以我们在bean 被注册到eventbus前后加了两个hook:AfterRegisterProcessor、BeforeRegisterProcessor;实现这两个interface并且实现对于的方法,会在bean 被注册前后被调用

    bean 注册到eventbus前的hook

    BeforeRegisterProcessor
    public interface BeforeRegisterProcessor {
        void beforeRegister();
    }

    bean 注册到eventbus后的hook

    AfterRegisterProcessor
    public interface AfterRegisterProcessor {
        void afterRegister();
    }

    实现:保证在 client.watch 之前,注册已经完成,这样watch产生的消息就能够保证被成功消费

    GlueService
    @Service
    public class GlueService implements AfterRegisterProcessor {
        @Autowired
        private PodListener podListener;
     
        @Autowired
        private RouteListener routerListener;
     
        @Autowired
        private BuildListener buildListener;
     
        @Autowired
        private DeploymentListener deploymentListener;
     
        @Autowired
        private OpenShiftClient openShiftClient;
     
        @Override
        public void afterRegister() {
            IClient client = openShiftClient.getClient();
            podWatch = client.watch(podListener, ResourceKind.POD);
            routeWatch = client.watch(routerListener, ResourceKind.ROUTE);
            buildWatch = client.watch(buildListener, ResourceKind.BUILD);
            deploymentWatch = client.watch(deploymentListener, ResourceKind.REPLICATION_CONTROLLER);
        }
    }
  • 相关阅读:
    不会写研发部门OKR?来这里看看吧
    HR 必须了解的绩效考核
    成功的OKR复盘会需要注意什么
    effective解读-第一条 静态工厂创建对象代替构造器
    灾难恢复:邮箱数据库操作总结:整理 查询邮箱数据库大小和空白数据大小(重要文档)
    Exchange传输队列queue数据库mail.que文件越来越大(重要文档)
    Exchange2016邮件流(重要文档)
    Exchange2016服务器使用到的9个端口
    VMware EXSI 启用vMotion
    Exchange2016DAG的配置
  • 原文地址:https://www.cnblogs.com/clovejava/p/9114895.html
Copyright © 2011-2022 走看看