zoukankan      html  css  js  c++  java
  • Stream、Bus

    配置中心通过Bus消息总线刷新配置的详细流程

    整体流程:

    访问端点->容器发布事件->容器监听到事件->通过channel通道发送给bus->bus通过topic发布给各应用input通道->各应用监听到input通道消息->针对消息事件分类处理后发布事件到容器->容器中针对各事件的监听器监听到后进行相应处理

    配置中心和其他服务都引入了bus,即在bus面前他们都是平等的,都是总线上的一个节点,都可以向bus发消息,也都可以从bus接收消息

    端点的原理:服务引入bus后,会生成两个控制端点:RefreshBusEndPoint、EnvironmentBusEndPoint,这些端点有ApplicationEventPublisher属性,从而具备事件发布能力,如RefreshBusEndPoint

    1 public class RefreshBusEndpoint extends AbstractBusEndpoint {
    2     @RequestMapping(value = "refresh", method = RequestMethod.POST)
    3     @ResponseBody
    4     public void refresh(
    5             @RequestParam(value = "destination", required = false) String destination) {
    6         publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
    7     }
    8 }

    在消息总线的某一个节点上访问/bus/refresh就会发布一个RefreshRemoteApplicationEvent事件(继承RemoteApplicationEvent)到容器

    容器中针对该事件的监听器是在bus的自动加载配置类BusAutoConfiguration中指定的,容器启动时会自动生成这个监听器来监听RemoteApplicationEvent事件(注意:这里只会监听自身容器发布的事件,比如通过监听输入通道发送过来的事件时,这个监听器是无效的,)

    1     @EventListener(classes = RemoteApplicationEvent.class)
    2     public void acceptLocal(RemoteApplicationEvent event) {
    3         if (this.serviceMatcher.isFromSelf(event)
    4                 && !(event instanceof AckRemoteApplicationEvent)) {
    5             this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
    6         }
    7     }
    1 cloudBusOutboundChannel是MessageChannel类的实例
    2 Stream中有一个绑定器的概念,其他应用通过输入和输出通道与不同消息中间件的绑定器进行交互,这个绑定器负责与消息中间件的交互,比如kafka和rabbitMQ的绑定器实现是不一样的,但是应用不需要去关心这些,应用只需要向绑定器的通道里写和读就行了,如果中间件从kafka换成了RabbitMq,只需要更换绑定器就行了,服务仍然至于绑定器的通道打交道
    3 MessageChannel类是应用通过通道向绑定器发送消息,MessageChannel.send();
    4 SubscribableChannel类是接受绑定器通过输出通道发送给应用的消息,SubscribableChannel.subscribe();

    该节点的容器通过MessageChannel发送消息给bus的绑定器

    Stream中有topic的概念,发送的消息中是一个RemoteApplicationEvent 事件,这个事件有需要接收通知的服务名

    Stream中通过@StreamListener监听通道中有消息发送过来,它会将这个事件通过topic发送给所有监听这个topic的应用的输入通道

    应用的bus自动加载配置类BusAutoConfiguration中指定了一个监听输入通道的监听器,这个监听器的工作就是:接收消息-》按消息事件分类-》发布不同事件到容器

     1     @StreamListener(SpringCloudBusClient.INPUT)
     2     public void acceptRemote(RemoteApplicationEvent event) {
     3         if (event instanceof AckRemoteApplicationEvent) {
     4             if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
     5                     && this.applicationEventPublisher != null) {
     6                 this.applicationEventPublisher.publishEvent(event);
     7             }
     8             // If it's an ACK we are finished processing at this point
     9             return;
    10         }
    11         if (this.serviceMatcher.isForSelf(event)
    12                 && this.applicationEventPublisher != null) {
    13             if (!this.serviceMatcher.isFromSelf(event)) {
    14                 this.applicationEventPublisher.publishEvent(event);
    15             }
    16             if (this.bus.getAck().isEnabled()) {
    17                 AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
    18                         this.serviceMatcher.getServiceId(),
    19                         this.bus.getAck().getDestinationService(),
    20                         event.getDestinationService(), event.getId(), event.getClass());
    21                 this.cloudBusOutboundChannel
    22                         .send(MessageBuilder.withPayload(ack).build());
    23                 this.applicationEventPublisher.publishEvent(ack);
    24             }
    25         }
    26         if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
    27             // We are set to register sent events so publish it for local consumption,
    28             // irrespective of the origin
    29             this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
    30                     event.getOriginService(), event.getDestinationService(),
    31                     event.getId(), event.getClass()));
    32         }
    33     }
    通道监听器

    容器中针对这各种事件又有各自监听器在监听,如RefreshListener监听RefreshRemoteApplicationEvent,EnvironmentChangeListener监听EnvironmentChangeRemoteApplicationEvent

    当监听器监听到之后进对具体事件具体处理

    
    
    
  • 相关阅读:
    [AngularJS] angular-schema-form -- 1
    [MODx] 10. Using Babel for Muti-languages support
    [AngularJS] Using AngularJS interceptors with $http
    [AngularJS] Best Practise
    [Node.js] Creating Demo APIs with json-server
    [HTML5] Input accepts only 6 number characters
    [Heroku] How to pull, push changes
    Runoob-Java-Maven:Maven 引入外部依赖
    Runoob-Java-Maven:Maven 构建 & 项目测试
    Runoob-Java-Maven:Maven 构建 Java 项目
  • 原文地址:https://www.cnblogs.com/yfzhou528/p/13545892.html
Copyright © 2011-2022 走看看