配置中心通过Bus消息总线刷新配置的详细流程
整体流程:
访问端点->容器发布事件->容器监听到事件->通过channel通道发送给bus->bus通过topic发布给各应用input通道->各应用监听到input通道消息->针对消息事件分类处理后发布事件到容器->容器中针对各事件的监听器监听到后进行相应处理
配置中心和其他服务都引入了bus,即在bus面前他们都是平等的,都是总线上的一个节点,都可以向bus发消息,也都可以从bus接收消息
端点的原理:服务引入bus后,会生成两个控制端点:RefreshBusEndPoint、EnvironmentBusEndPoint,这些端点有ApplicationEventPublisher属性,从而具备事件发布能力,如RefreshBusEndPoint
1 public class RefreshBusEndpoint extends AbstractBusEndpoint { 2 @RequestMapping(value = "refresh", method = RequestMethod.POST) 3 @ResponseBody 4 public void refresh( 5 @RequestParam(value = "destination", required = false) String destination) { 6 publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination)); 7 } 8 }
在消息总线的某一个节点上访问/bus/refresh就会发布一个RefreshRemoteApplicationEvent事件(继承RemoteApplicationEvent)到容器
容器中针对该事件的监听器是在bus的自动加载配置类BusAutoConfiguration中指定的,容器启动时会自动生成这个监听器来监听RemoteApplicationEvent事件(注意:这里只会监听自身容器发布的事件,比如通过监听输入通道发送过来的事件时,这个监听器是无效的,)
1 @EventListener(classes = RemoteApplicationEvent.class) 2 public void acceptLocal(RemoteApplicationEvent event) { 3 if (this.serviceMatcher.isFromSelf(event) 4 && !(event instanceof AckRemoteApplicationEvent)) { 5 this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); 6 } 7 }
1 cloudBusOutboundChannel是MessageChannel类的实例 2 Stream中有一个绑定器的概念,其他应用通过输入和输出通道与不同消息中间件的绑定器进行交互,这个绑定器负责与消息中间件的交互,比如kafka和rabbitMQ的绑定器实现是不一样的,但是应用不需要去关心这些,应用只需要向绑定器的通道里写和读就行了,如果中间件从kafka换成了RabbitMq,只需要更换绑定器就行了,服务仍然至于绑定器的通道打交道 3 MessageChannel类是应用通过通道向绑定器发送消息,MessageChannel.send(); 4 SubscribableChannel类是接受绑定器通过输出通道发送给应用的消息,SubscribableChannel.subscribe();
该节点的容器通过MessageChannel发送消息给bus的绑定器
Stream中有topic的概念,发送的消息中是一个RemoteApplicationEvent 事件,这个事件有需要接收通知的服务名
Stream中通过@StreamListener监听通道中有消息发送过来,它会将这个事件通过topic发送给所有监听这个topic的应用的输入通道
应用的bus自动加载配置类BusAutoConfiguration中指定了一个监听输入通道的监听器,这个监听器的工作就是:接收消息-》按消息事件分类-》发布不同事件到容器
1 @StreamListener(SpringCloudBusClient.INPUT) 2 public void acceptRemote(RemoteApplicationEvent event) { 3 if (event instanceof AckRemoteApplicationEvent) { 4 if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event) 5 && this.applicationEventPublisher != null) { 6 this.applicationEventPublisher.publishEvent(event); 7 } 8 // If it's an ACK we are finished processing at this point 9 return; 10 } 11 if (this.serviceMatcher.isForSelf(event) 12 && this.applicationEventPublisher != null) { 13 if (!this.serviceMatcher.isFromSelf(event)) { 14 this.applicationEventPublisher.publishEvent(event); 15 } 16 if (this.bus.getAck().isEnabled()) { 17 AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this, 18 this.serviceMatcher.getServiceId(), 19 this.bus.getAck().getDestinationService(), 20 event.getDestinationService(), event.getId(), event.getClass()); 21 this.cloudBusOutboundChannel 22 .send(MessageBuilder.withPayload(ack).build()); 23 this.applicationEventPublisher.publishEvent(ack); 24 } 25 } 26 if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) { 27 // We are set to register sent events so publish it for local consumption, 28 // irrespective of the origin 29 this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this, 30 event.getOriginService(), event.getDestinationService(), 31 event.getId(), event.getClass())); 32 } 33 }
容器中针对这各种事件又有各自监听器在监听,如RefreshListener监听RefreshRemoteApplicationEvent,EnvironmentChangeListener监听EnvironmentChangeRemoteApplicationEvent
当监听器监听到之后进对具体事件具体处理