zoukankan      html  css  js  c++  java
  • springboot 集成 RabbitMQ

    springboot 集成 RabbitMQ

    一、简单使用

    添加依赖

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    添加配置

    ## RabbitMQ 配置
    spring.application.name=spirng-boot-rabbitmq
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    

    PS:Windows搭建RabbitMQ环境

    添加队列配置

    @Configuration
    public class RabbitConfig {
    
    	@Bean
    	public Queue helloQueue() {
        	return new Queue("hello");
    	}
    }
    

    发送者(生产者)

    @Component
    public class HelloSender {
    
    	@Autowired
    	private AmqpTemplate rabbitTemplate;
    
    	public void send() {
        	String context = "hello " + new Date();
        	this.rabbitTemplate.convertAndSend("hello", context);
    	}
    }
    

    接收者(消费者)

    @Component
    @RabbitListener(queues = "hello")
    public class HelloReceiver {
    
    	@RabbitHandler
    	public void process(String hello) {
        	System.out.println("Receiver  : " + hello);
    	}
    }
    

    测试

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqHelloTest {
    
    	@Autowired
    	private HelloSender helloSender;
    
    	@Test
    	public void hello() throws Exception {
        	helloSender.send();
    	}
    }
    

    二、RabbitMQ介绍

    用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。

    • 消息(message)

      消息。消息头可包括routing-key属性。

    • 交换机(Exchange)

      用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

    • 绑定(Binding)

      绑定交换机和队列。

    • 队列(Queue)

      消息队列。用来存储消息,等待消费者连接,将消息取走。

    • 生产者(Publisher)

      消息发送者。

    • 消费者(Consumer)

      消息接收者。

    Exchange 类型

    • direct exchange(默认实现方式)

      根据key全文匹配队列。

    • topic exchange

      可根据routing-key自由绑定不同的队列。

    • fanout exchange

      向所有绑定该交换机的队列发送消息。

    三、测试使用不同Exchange

    添加配置

    @Configuration
    public class RabbitConfig {
    
        //创建队列
        @Bean
        public Queue queueRed() {
            return new Queue("queue.color.red");
        }
    
        @Bean
        public Queue queueYellow() {
            return new Queue("queue.color.yellow");
        }
    
        @Bean
        public Queue queueBlue() {
            return new Queue("queue.color.blue");
        }
    
        @Bean
        public Queue queueGreen() {
            return new Queue("queue.color.green");
        }
    
        // 创建交换机
        // fanout exchange
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
    
        // topic exchange
        @Bean
        TopicExchange topicExchange() {
            return new TopicExchange("topicExchange");
        }
    
        // 创建绑定。配置交换机与队列的绑定关系。
        // fanout exchange
    	// 在交换机接收消息后,会向与自己绑定的所有队列发送消息。
        @Bean
        Binding bindingFanoutExchange1(Queue queueRed, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueRed).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingFanoutExchange2(Queue queueYellow, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueYellow).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingFanoutExchange3(Queue queueBlue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueBlue).to(fanoutExchange);
        }
    
        // topic exchange
    	// 除将队列与交换机绑定外,并规定绑定规则(binding-key)。在交换机接收消息后,会根据消息的路由键(routing-key)依照绑定规则(binding-key)向与自己绑定的队列发送消息。
        @Bean
        Binding bindingTopicExchange1(Queue queueRed, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueRed).to(topicExchange).with("queue.color.#");
        }
    
        @Bean
        Binding bindingTopicExchange2(Queue queueYellow, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueYellow).to(topicExchange).with("queue.color.#");
        }
    
        @Bean
        Binding bindingTopicExchange3(Queue queueBlue, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueBlue).to(topicExchange).with("queue.color.#");
        }
    
    }
    

    通配符:

    • # 表示0个或多个词
    • * 表示1个词

    添加消息发送者(生产者)

    @Component
    public class RabbitSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void defaultSender() {
            String context = "使用默认交换机(direct)发送消息!routing-key='queue.color.red'。";
            this.rabbitTemplate.convertAndSend("queue.color.red", context);
        }
    
        public void fanoutSender() {
            String context = "使用fanout交换机发送消息!";
            this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
        }
    
        public void topicSender() {
            String context = "使用topic交换机发送消息!routing-key='queue.color.green'";
            this.rabbitTemplate.convertAndSend("topicExchange", "queue.color.green", context);
        }
    }
    

    添加消息接收者(消费者)

    @Component
    public class RabbitReceiver {
    
        @RabbitHandler
        @RabbitListener(queues = "queue.color.red")
        public void processRed(String msg) {
            System.out.println("Receiver queue.color.red : " + msg);
        }
    
        @RabbitHandler
        @RabbitListener(queues = "queue.color.yellow")
        public void processYellow(String msg) {
            System.out.println("Receiver queue.color.yellow : " + msg);
        }
    
        @RabbitHandler
        @RabbitListener(queues = "queue.color.blue")
        public void processBlue(String msg) {
            System.out.println("Receiver queue.color.blue : " + msg);
        }
    
        @RabbitHandler
        @RabbitListener(queues = "queue.color.green")
        public void processGreen(String msg) {
            System.out.println("Receiver queue.color.green : " + msg);
        }
    }
    

    添加测试

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class TestRabbitMQ {
    
        @Autowired
        private RabbitSender rabbitSender;
    
        @Test
        public void directMQ() throws Exception {
            this.rabbitSender.defaultSender();
        }
    
        @Test
        public void fanoutMQ() throws Exception {
            this.rabbitSender.fanoutSender();
        }
    
        @Test
        public void topicMQ() throws Exception {
            this.rabbitSender.topicSender();
        }
    
    }
    

    PS:可访问http://127.0.0.1:15672查看队列、交换机的状态。在修改rabbitMQ配置后,并不能清除之前的绑定关系,可能会引发问题。

  • 相关阅读:
    Apache Spark 2.2.0 中文文档
    Apache Spark 2.2.0 中文文档
    Apache Spark 2.2.0 中文文档
    Apache Spark 2.2.0 中文文档
    Apache Spark 2.2.0 中文文档
    Apache Spark RDD(Resilient Distributed Datasets)论文
    Apache Spark 2.2.0 中文文档
    Apache Spark 2.2.0 中文文档
    【机器学习实战】第10章 K-Means(K-均值)聚类算法
    [译]flexbox全揭秘
  • 原文地址:https://www.cnblogs.com/wscy/p/9261753.html
Copyright © 2011-2022 走看看