EventBus 不是通用的消息系统,也不是用来做进程间的通信的,而是在进程内,用于解耦两段直接调用的业务逻辑;
1、代码结构
- event:eventbus中流转的事件(消息),包结构按照业务模块在细分(比如应用部署模块就是deployment);
- subscriber:消费者,和event 是一一对应的,一个event 对应一个消费者,包结构按照业务模块在细分(比如应用部署模块就是deployment);
- poster:生产者,这边把生产者单独出来是为了收敛入口,这样可以方便的知道有哪些地方在生产消息,按照业务模块分为不同的类(因为生产消息的功能比较单薄);
2、代码实现
在applicationContext.xml 中定义好EventBus
asyncEventBus
< bean id = "taskExecutor" class = "org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" lazy-init = "true" > < property name = "corePoolSize" value = "10" /> < property name = "maxPoolSize" value = "50" /> < property name = "queueCapacity" value = "10000" /> < property name = "keepAliveSeconds" value = "300" /> < property name = "rejectedExecutionHandler" > < bean class = "java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </ property > </ bean > < bean id = "asyncEventBus" class = "com.google.common.eventbus.AsyncEventBus" > < constructor-arg name = "executor" ref = "taskExecutor" /> </ bean > |
2.1、标准化subscriber
所有的subscriber都要实现 BaseSubscriber这个 interface
BaseSubscriber
public interface BaseSubscriber<E> { /** * event 处理逻辑入口 **/ void subscribe(E event); } |
所有的subscriber在类上加上EventBusRegister 这个annotation
EventBusRegister
@Target ({ElementType.TYPE}) @Retention (RetentionPolicy.RUNTIME) @Documented public @interface EventBusRegister { } |
实现EventBusAdapter用于自动注册subscriber
EventBusAdapter
@Component public class EventBusAdapter implements ApplicationContextAware, InitializingBean { @Autowired private AsyncEventBus asyncEventBus; private ApplicationContext applicationContext; @Override public void afterPropertiesSet() throws Exception { this .applicationContext.getBeansWithAnnotation(EventBusRegister. class ).forEach((name, bean) -> { asyncEventBus.register(bean); }); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this .applicationContext = applicationContext; } } |
举个例子
BuildUpdateSubscriber
@Component @EventBusRegister public class BuildUpdateSubscriber implements BaseSubscriber<BuildUpdateEvent> { @Autowired private BuildService buildService; @Subscribe @Override public void subscribe(BuildUpdateEvent event) { switch (event.getEventType()) { case BUILD_CONNECTED: List<BuildVo> buildVos = (List<BuildVo>) event.getData(); buildService.addBuildVosAndTriggerConnectEvent(buildVos); break ; case BUILD_ADD: BuildVo addedBuildVo = (BuildVo) event.getData(); buildService.addBuildVoAndTriggerClientEvent(addedBuildVo); break ; case BUILD_MODIFY: BuildVo modifiedBuildVo = (BuildVo) event.getData(); buildService.modifyBuildVoAndTriggerEvent(modifiedBuildVo); break ; case BUILD_DELETE: BuildVo deletedBuildVo = (BuildVo) event.getData(); buildService.deleteBuildVoAndTriggerClientEvent(deletedBuildVo); break ; default : // ignore break ; } } } |
3、代码实现改进
前面通过规范代码的包结构、加了一些trick使得我们可以方便的使用eventbus解耦我们的业务逻辑,但是有时候我们需要的bean被注册 的前后做一些业务逻辑,所以我们在bean 被注册到eventbus前后加了两个hook:AfterRegisterProcessor、BeforeRegisterProcessor;实现这两个interface并且实现对于的方法,会在bean 被注册前后被调用
bean 注册到eventbus前的hook
BeforeRegisterProcessor
public interface BeforeRegisterProcessor { void beforeRegister(); } |
bean 注册到eventbus后的hook
AfterRegisterProcessor
public interface AfterRegisterProcessor { void afterRegister(); } |
实现:保证在 client.watch 之前,注册已经完成,这样watch产生的消息就能够保证被成功消费
GlueService
@Service public class GlueService implements AfterRegisterProcessor { @Autowired private PodListener podListener; @Autowired private RouteListener routerListener; @Autowired private BuildListener buildListener; @Autowired private DeploymentListener deploymentListener; @Autowired private OpenShiftClient openShiftClient; @Override public void afterRegister() { IClient client = openShiftClient.getClient(); podWatch = client.watch(podListener, ResourceKind.POD); routeWatch = client.watch(routerListener, ResourceKind.ROUTE); buildWatch = client.watch(buildListener, ResourceKind.BUILD); deploymentWatch = client.watch(deploymentListener, ResourceKind.REPLICATION_CONTROLLER); } } |