zoukankan      html  css  js  c++  java
  • Spring Cloud Bus 消息总线 RabbitMQ

    Spring Cloud Bus将分布式系统中各节点通过轻量级消息代理连接起来。 从而实现例如广播状态改变(例如配置改变)或其他的管理指令。 目前唯一的实现是使用AMQP代理作为传输对象。

    Spring Cloud Bus又被称为消息总线,负责管理和传播所有分布式系统中的消息,通过集成MQ实现广播机制,目前常用的有Kafka和RabbitMQ。利用bus的机制可以做很多的事情,其中配置中心客户端刷新就是典型的应用场景。及利用消息总线,可以实现当配置中心的refresh命令触发后,通过广播发送到每一个配置使用端。 

    Spring已经在Config包中完成了大部分的工作,我们只需要在之前的配置中心的基础上,做如下修改:

    1.给config-server和config-client增加依赖:

    <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-starter-bus-amqp</artifactId>
    </dependency>

    2.给config-server和config-client配置文件中增加MQ

    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=lg
    spring.rabbitmq.password=lg

    3.依次启动RabbitMQ 、 Eureka 、config-server 、config-client1、config-client2

    4.在github上修改配置文件,刷新config-client查看配置,此时未更新

    然后执行 curl -X POST http://localhost:8011/bus/refresh 后,再次查看config-client配置,此时已经更新 

    5.登录RabbitMQ 管理页面,可查看到connecting,queue等信息。

    有时候可能需要部分刷新,即只刷新指定的config-client,spring同样支持,只需要在调用/bus/refresh接口时传递destination参数即可,如:

    /bus/refresh?destination=config-client:8031 或 /bus/refresh?destination=config-client:**

    自定义消息并通过消息总线发送

    在之前几个项目的基础上,我们做如下修改:

    1.config-server类增加Sender类

    @Component
    public class Sender {
    
        private static AtomicInteger count = new AtomicInteger(1);
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String msg = "msg" + count.getAndIncrement() + " " + new Date();
            System.out.println("Sender : " + msg);
            this.rabbitTemplate.convertAndSend("hello", msg);
        }
    }

    2.config-server类增加定时任务,给主类加注解@EnableScheduling,每3秒发一次消息:

    @Component
    public class ScheduledTasks {
    
        @Autowired
        private Sender sender;
    
        @Scheduled(initiaDelay=5000, fixedRate = 3000)
        public void reportCurrentTime() {
            sender.send();
        }
    
    }

    3.分别给config-client1和config-client2增加接受类Receiver

    @Component
    @RabbitListener(queues = "hello")
    public class Receiver {
    
        @RabbitHandler
        public void process(String msg) {
            System.out.println("Receiver : " + msg);
        }
    
    }

    4.依次启动 eureka config-server config-client

    5.观察控制台输出如下:

    config-server:

    Sender : msg1 Sun Aug 06 21:56:03 CST 2017
    Sender : msg2 Sun Aug 06 21:56:06 CST 2017
    Sender : msg3 Sun Aug 06 21:56:09 CST 2017
    Sender : msg4 Sun Aug 06 21:56:12 CST 2017
    Sender : msg5 Sun Aug 06 21:56:15 CST 2017
    Sender : msg6 Sun Aug 06 21:56:18 CST 2017
    Sender : msg7 Sun Aug 06 21:56:21 CST 2017
    Sender : msg8 Sun Aug 06 21:56:24 CST 2017
    Sender : msg9 Sun Aug 06 21:56:27 CST 2017

    config-client1:

    Receiver : msg2 Sun Aug 06 21:56:06 CST 2017
    Receiver : msg4 Sun Aug 06 21:56:12 CST 2017
    Receiver : msg6 Sun Aug 06 21:56:18 CST 2017
    Receiver : msg8 Sun Aug 06 21:56:24 CST 2017

    config-client2:

    Receiver : msg1 Sun Aug 06 21:56:03 CST 2017
    Receiver : msg3 Sun Aug 06 21:56:09 CST 2017
    Receiver : msg5 Sun Aug 06 21:56:15 CST 2017
    Receiver : msg7 Sun Aug 06 21:56:21 CST 2017
    Receiver : msg9 Sun Aug 06 21:56:27 CST 2017

    可见client从单个queue中依次取消息

    参考:http://projects.spring.io/spring-cloud/spring-cloud.html 

     

    end

  • 相关阅读:
    xgboost
    GBDT 梯度提升决策树简述
    minimal pairs
    Describe your hometown
    英语短句
    英汉翻译
    英语音译词
    power的读音
    英语口语(英语词根与单词的说文解字(李平武 2008版)读书笔记)
    Jar包转成Dll的方式(带嵌套的jar也能做)
  • 原文地址:https://www.cnblogs.com/luangeng/p/7291770.html
Copyright © 2011-2022 走看看