zoukankan      html  css  js  c++  java
  • springboot 集成 RabbitMQ

    springboot 集成 RabbitMQ

    一、简单使用

    添加依赖

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    添加配置

    ## RabbitMQ 配置
    spring.application.name=spirng-boot-rabbitmq
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    

    PS:Windows搭建RabbitMQ环境

    添加队列配置

    @Configuration
    public class RabbitConfig {
    
    	@Bean
    	public Queue helloQueue() {
        	return new Queue("hello");
    	}
    }
    

    发送者(生产者)

    @Component
    public class HelloSender {
    
    	@Autowired
    	private AmqpTemplate rabbitTemplate;
    
    	public void send() {
        	String context = "hello " + new Date();
        	this.rabbitTemplate.convertAndSend("hello", context);
    	}
    }
    

    接收者(消费者)

    @Component
    @RabbitListener(queues = "hello")
    public class HelloReceiver {
    
    	@RabbitHandler
    	public void process(String hello) {
        	System.out.println("Receiver  : " + hello);
    	}
    }
    

    测试

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqHelloTest {
    
    	@Autowired
    	private HelloSender helloSender;
    
    	@Test
    	public void hello() throws Exception {
        	helloSender.send();
    	}
    }
    

    二、RabbitMQ介绍

    用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。

    • 消息(message)

      消息。消息头可包括routing-key属性。

    • 交换机(Exchange)

      用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

    • 绑定(Binding)

      绑定交换机和队列。

    • 队列(Queue)

      消息队列。用来存储消息,等待消费者连接,将消息取走。

    • 生产者(Publisher)

      消息发送者。

    • 消费者(Consumer)

      消息接收者。

    Exchange 类型

    • direct exchange(默认实现方式)

      根据key全文匹配队列。

    • topic exchange

      可根据routing-key自由绑定不同的队列。

    • fanout exchange

      向所有绑定该交换机的队列发送消息。

    三、测试使用不同Exchange

    添加配置

    @Configuration
    public class RabbitConfig {
    
        //创建队列
        @Bean
        public Queue queueRed() {
            return new Queue("queue.color.red");
        }
    
        @Bean
        public Queue queueYellow() {
            return new Queue("queue.color.yellow");
        }
    
        @Bean
        public Queue queueBlue() {
            return new Queue("queue.color.blue");
        }
    
        @Bean
        public Queue queueGreen() {
            return new Queue("queue.color.green");
        }
    
        // 创建交换机
        // fanout exchange
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
    
        // topic exchange
        @Bean
        TopicExchange topicExchange() {
            return new TopicExchange("topicExchange");
        }
    
        // 创建绑定。配置交换机与队列的绑定关系。
        // fanout exchange
    	// 在交换机接收消息后,会向与自己绑定的所有队列发送消息。
        @Bean
        Binding bindingFanoutExchange1(Queue queueRed, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueRed).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingFanoutExchange2(Queue queueYellow, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueYellow).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingFanoutExchange3(Queue queueBlue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueBlue).to(fanoutExchange);
        }
    
        // topic exchange
    	// 除将队列与交换机绑定外,并规定绑定规则(binding-key)。在交换机接收消息后,会根据消息的路由键(routing-key)依照绑定规则(binding-key)向与自己绑定的队列发送消息。
        @Bean
        Binding bindingTopicExchange1(Queue queueRed, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueRed).to(topicExchange).with("queue.color.#");
        }
    
        @Bean
        Binding bindingTopicExchange2(Queue queueYellow, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueYellow).to(topicExchange).with("queue.color.#");
        }
    
        @Bean
        Binding bindingTopicExchange3(Queue queueBlue, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueBlue).to(topicExchange).with("queue.color.#");
        }
    
    }
    

    通配符:

    • # 表示0个或多个词
    • * 表示1个词

    添加消息发送者(生产者)

    @Component
    public class RabbitSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void defaultSender() {
            String context = "使用默认交换机(direct)发送消息!routing-key='queue.color.red'。";
            this.rabbitTemplate.convertAndSend("queue.color.red", context);
        }
    
        public void fanoutSender() {
            String context = "使用fanout交换机发送消息!";
            this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
        }
    
        public void topicSender() {
            String context = "使用topic交换机发送消息!routing-key='queue.color.green'";
            this.rabbitTemplate.convertAndSend("topicExchange", "queue.color.green", context);
        }
    }
    

    添加消息接收者(消费者)

    @Component
    public class RabbitReceiver {
    
        @RabbitHandler
        @RabbitListener(queues = "queue.color.red")
        public void processRed(String msg) {
            System.out.println("Receiver queue.color.red : " + msg);
        }
    
        @RabbitHandler
        @RabbitListener(queues = "queue.color.yellow")
        public void processYellow(String msg) {
            System.out.println("Receiver queue.color.yellow : " + msg);
        }
    
        @RabbitHandler
        @RabbitListener(queues = "queue.color.blue")
        public void processBlue(String msg) {
            System.out.println("Receiver queue.color.blue : " + msg);
        }
    
        @RabbitHandler
        @RabbitListener(queues = "queue.color.green")
        public void processGreen(String msg) {
            System.out.println("Receiver queue.color.green : " + msg);
        }
    }
    

    添加测试

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class TestRabbitMQ {
    
        @Autowired
        private RabbitSender rabbitSender;
    
        @Test
        public void directMQ() throws Exception {
            this.rabbitSender.defaultSender();
        }
    
        @Test
        public void fanoutMQ() throws Exception {
            this.rabbitSender.fanoutSender();
        }
    
        @Test
        public void topicMQ() throws Exception {
            this.rabbitSender.topicSender();
        }
    
    }
    

    PS:可访问http://127.0.0.1:15672查看队列、交换机的状态。在修改rabbitMQ配置后,并不能清除之前的绑定关系,可能会引发问题。

  • 相关阅读:
    《玩转.NET Micro Framework 移植基于STM32F10x处理器》内容介绍
    《玩转.NET Micro Framework 移植基于STM32F10x处理器》前言
    《玩转.NET Micro Framework 移植基于STM32F10x处理器》内容介绍
    《玩转.NET Micro Framework 移植基于STM32F10x处理器》微软中国.NET Micro Framework项目组工程师所作之序
    《玩转.NET Micro Framework 移植基于STM32F10x处理器》资源汇总
    《玩转.NET Micro Framework 移植基于STM32F10x处理器》微软中国.NET Micro Framework项目组工程师所作之序
    《玩转.NET Micro Framework 移植基于STM32F10x处理器》前言
    Windows、Linux、ARM、Android、iOS全平台支持的RTMP推流组件libEasyRTMP库接口调用说明
    简单高效易用Windows/Linux/ARM/Android/iOS平台实现RTMP推送组件EasyRTMPAndroid MediaCodec硬编码流程介绍
    RTSP网络监控摄像头如何实现Windows、Linux、ARM、Android、iOS全平台支持的拉RTSP流推出RTMP直播流?
  • 原文地址:https://www.cnblogs.com/wscy/p/9261753.html
Copyright © 2011-2022 走看看