zoukankan      html  css  js  c++  java
  • springboot 集成 RabbitMQ

    springboot 集成 RabbitMQ

    一、简单使用

    添加依赖

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    添加配置

    ## RabbitMQ 配置
    spring.application.name=spirng-boot-rabbitmq
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    

    PS:Windows搭建RabbitMQ环境

    添加队列配置

    @Configuration
    public class RabbitConfig {
    
    	@Bean
    	public Queue helloQueue() {
        	return new Queue("hello");
    	}
    }
    

    发送者(生产者)

    @Component
    public class HelloSender {
    
    	@Autowired
    	private AmqpTemplate rabbitTemplate;
    
    	public void send() {
        	String context = "hello " + new Date();
        	this.rabbitTemplate.convertAndSend("hello", context);
    	}
    }
    

    接收者(消费者)

    @Component
    @RabbitListener(queues = "hello")
    public class HelloReceiver {
    
    	@RabbitHandler
    	public void process(String hello) {
        	System.out.println("Receiver  : " + hello);
    	}
    }
    

    测试

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqHelloTest {
    
    	@Autowired
    	private HelloSender helloSender;
    
    	@Test
    	public void hello() throws Exception {
        	helloSender.send();
    	}
    }
    

    二、RabbitMQ介绍

    用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。

    • 消息(message)

      消息。消息头可包括routing-key属性。

    • 交换机(Exchange)

      用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

    • 绑定(Binding)

      绑定交换机和队列。

    • 队列(Queue)

      消息队列。用来存储消息,等待消费者连接,将消息取走。

    • 生产者(Publisher)

      消息发送者。

    • 消费者(Consumer)

      消息接收者。

    Exchange 类型

    • direct exchange(默认实现方式)

      根据key全文匹配队列。

    • topic exchange

      可根据routing-key自由绑定不同的队列。

    • fanout exchange

      向所有绑定该交换机的队列发送消息。

    三、测试使用不同Exchange

    添加配置

    @Configuration
    public class RabbitConfig {
    
        //创建队列
        @Bean
        public Queue queueRed() {
            return new Queue("queue.color.red");
        }
    
        @Bean
        public Queue queueYellow() {
            return new Queue("queue.color.yellow");
        }
    
        @Bean
        public Queue queueBlue() {
            return new Queue("queue.color.blue");
        }
    
        @Bean
        public Queue queueGreen() {
            return new Queue("queue.color.green");
        }
    
        // 创建交换机
        // fanout exchange
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
    
        // topic exchange
        @Bean
        TopicExchange topicExchange() {
            return new TopicExchange("topicExchange");
        }
    
        // 创建绑定。配置交换机与队列的绑定关系。
        // fanout exchange
    	// 在交换机接收消息后,会向与自己绑定的所有队列发送消息。
        @Bean
        Binding bindingFanoutExchange1(Queue queueRed, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueRed).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingFanoutExchange2(Queue queueYellow, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueYellow).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingFanoutExchange3(Queue queueBlue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueBlue).to(fanoutExchange);
        }
    
        // topic exchange
    	// 除将队列与交换机绑定外,并规定绑定规则(binding-key)。在交换机接收消息后,会根据消息的路由键(routing-key)依照绑定规则(binding-key)向与自己绑定的队列发送消息。
        @Bean
        Binding bindingTopicExchange1(Queue queueRed, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueRed).to(topicExchange).with("queue.color.#");
        }
    
        @Bean
        Binding bindingTopicExchange2(Queue queueYellow, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueYellow).to(topicExchange).with("queue.color.#");
        }
    
        @Bean
        Binding bindingTopicExchange3(Queue queueBlue, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueBlue).to(topicExchange).with("queue.color.#");
        }
    
    }
    

    通配符:

    • # 表示0个或多个词
    • * 表示1个词

    添加消息发送者(生产者)

    @Component
    public class RabbitSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void defaultSender() {
            String context = "使用默认交换机(direct)发送消息!routing-key='queue.color.red'。";
            this.rabbitTemplate.convertAndSend("queue.color.red", context);
        }
    
        public void fanoutSender() {
            String context = "使用fanout交换机发送消息!";
            this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
        }
    
        public void topicSender() {
            String context = "使用topic交换机发送消息!routing-key='queue.color.green'";
            this.rabbitTemplate.convertAndSend("topicExchange", "queue.color.green", context);
        }
    }
    

    添加消息接收者(消费者)

    @Component
    public class RabbitReceiver {
    
        @RabbitHandler
        @RabbitListener(queues = "queue.color.red")
        public void processRed(String msg) {
            System.out.println("Receiver queue.color.red : " + msg);
        }
    
        @RabbitHandler
        @RabbitListener(queues = "queue.color.yellow")
        public void processYellow(String msg) {
            System.out.println("Receiver queue.color.yellow : " + msg);
        }
    
        @RabbitHandler
        @RabbitListener(queues = "queue.color.blue")
        public void processBlue(String msg) {
            System.out.println("Receiver queue.color.blue : " + msg);
        }
    
        @RabbitHandler
        @RabbitListener(queues = "queue.color.green")
        public void processGreen(String msg) {
            System.out.println("Receiver queue.color.green : " + msg);
        }
    }
    

    添加测试

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class TestRabbitMQ {
    
        @Autowired
        private RabbitSender rabbitSender;
    
        @Test
        public void directMQ() throws Exception {
            this.rabbitSender.defaultSender();
        }
    
        @Test
        public void fanoutMQ() throws Exception {
            this.rabbitSender.fanoutSender();
        }
    
        @Test
        public void topicMQ() throws Exception {
            this.rabbitSender.topicSender();
        }
    
    }
    

    PS:可访问http://127.0.0.1:15672查看队列、交换机的状态。在修改rabbitMQ配置后,并不能清除之前的绑定关系,可能会引发问题。

  • 相关阅读:
    构造函数产生的点及原因
    关于未捕获异常的处理(WPF)
    消息协定
    为outlook增加“邮件召回”功能
    MHA故障切换和在线手工切换原理
    Delphi 类型转换函数(有几个函数没见过,FloatToStrF,FloatToText等等)
    Delphi 常用属性说明(超长)
    Delphi程序自删除的几种方法
    CreateFile,ReadFile等API详解(或者说MSDN的翻译)
    去除文件属性(使用SetFileAttributes API函数)
  • 原文地址:https://www.cnblogs.com/wscy/p/9261753.html
Copyright © 2011-2022 走看看