zoukankan      html  css  js  c++  java
  • Spring Boot整合RabbitMQ

    Spring Boot整合RabbitMQ

    写在开头

    最近在搭一套基于SpringBoot的项目,用到了ssm+mysql+rabbitmq+redis。除了rabbitmq之外,其他几个都很快整合好了,唯独rabbitmq找了不少资料,才最终整合好,达到了预期。特此将过程记录下来,供参考。

    整合流程

    整合流程中的代码都为整合的关键配置及其使用。至于SpringBoot的基本配置,请参考Spring Boot Quick Start

    配置文件

    • pom.xml
    <!-- rabbit-mq -->
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • application.yml
    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        virtual-host: /
        listener:
          simple:
            acknowledge-mode: manual # 手动应答
            concurrency: 5 # 消费端最小并发数
            max-concurrency: 10 # 消费端最大并发数
            prefetch: 5 # 一次请求中预处理的消息数量
        cache:
          channel:
            size: 50 # 缓存的channel数量
    ### 自定义配置
    mq:
      defaultExchange: amqpExchange # 默认交换器
      queue: queue # 队列名
      routeKey: queue_key # 路由key
    
    • MQProperties
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    @Component
    @ConfigurationProperties(prefix = "mq")
    public class MQProperties {
        private String defaultExchange;
        private String routeKey;
        private String queue;
    
        public String getDefaultExchange() {
            return defaultExchange;
        }
    
        public void setDefaultExchange(String defaultExchange) {
            this.defaultExchange = defaultExchange;
        }
    
        public String getRouteKey() {
            return routeKey;
        }
    
        public void setRouteKey(String routeKey) {
            this.routeKey = routeKey;
        }
    
        public String getQueue() {
            return queue;
        }
    
        public void setQueue(String queue) {
            this.queue = queue;
        }
    }
    

    RabbitMQ配置

    import com.switchvov.rabbitmq.constant.MQProperties;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @EnableRabbit
    public class RabbitMQConfig {
        @Autowired
        private MQProperties mqProperties;
    
        @Bean
        public Queue queue() {
            boolean durable = true;
            boolean exclusive = false;
            boolean autoDelete = false;
            return new Queue(mqProperties.getQueue(), durable, exclusive, autoDelete);
        }
    
        @Bean
        public DirectExchange defaultExchange() {
            boolean durable = true;
            boolean autoDelete = false;
            return new DirectExchange(mqProperties.getDefaultExchange(), durable, autoDelete);
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue())
                    .to(defaultExchange())
                    .with(mqProperties.getRouteKey());
        }
    }
    

    RabbitMQ生产者

    import com.switchvov.rabbitmq.constant.MQProperties;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMQTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        private MQProperties mqProperties;
    
        @Test
        public void testSendMessage() {
            rabbitTemplate.convertAndSend(mqProperties.getDefaultExchange(), 
    	        mqProperties.getRouteKey(), "发送了一条信息");
        }
    }
    

    RabbitMQ消费者

    import com.switchvov.rabbitmq.common.RabbitMQUtils;
    import com.rabbitmq.client.Channel;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Service;
    
    @Service
    public class RabbitMQService {
        private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQService.class);
    
        @RabbitListener(queues = "${mq.queue}")
        public void receive(String payload, Channel channel,
                            @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
            LOGGER.info("消费内容为:{}", payload);
            RabbitMQUtils.askMessage(channel, tag, LOGGER);
        }
    }
    

    手动应答简单工具类

    import com.rabbitmq.client.Channel;
    import org.slf4j.Logger;
    
    import java.io.IOException;
    
    public final class RabbitMQUtils {
    
        public static void askMessage(Channel channel, long tag, final Logger logger) {
            askMessage(channel, tag, logger, false);
        }
    
        public static void askMessage(Channel channel, long tag, final Logger logger, boolean multiple) {
            try {
                channel.basicAck(tag, multiple);
            } catch (IOException e) {
                logger.error("RabbitMQ,IO异常,异常原因为:{}", e.getMessage());
            }
        }
    
        public static void rejectMessage(Channel channel, long tag, final Logger logger) {
            rejectMessage(channel, tag, logger, false, false);
        }
    
        public static void rejectAndBackMQ(Channel channel, long tag, final Logger logger) {
            rejectMessage(channel, tag, logger, false, true);
        }
    
        public static void rejectMessage(Channel channel, long tag, final Logger logger, boolean multiple, boolean request) {
            try {
                channel.basicNack(tag, multiple, request);
            } catch (IOException e) {
                logger.error("RabbitMQ,IO异常,异常原因为:{}", e.getMessage());
            }
        }
    }
    

    参考文档

    1. RabbitMQ消息队列(一): Detailed Introduction 详细介绍
      • 简单的了解下RabbitMQ的系统架构和各种术语
    2. Spring Boot中使用RabbitMQ
      • 简单的Spring Boot和RabbitMQ整合资料
    3. Spring boot集成RabbitMQ
      • 弄清楚了自定义配置queueexchangebinding
    4. Spring RabbitMQ - using manual channel acknowledgement on a service with @RabbitListener configuration
      • 接收者部分基本上就是按照这上面的回答实现的
    5. spring boot / cloud (十九) 并发消费消息,如何保证入库的数据是最新的?
      • 并发消费配置

    分享并记录所学所见

  • 相关阅读:
    染色法判定二分图
    Kruskal算法求最小生成树
    Prim算法求最小生成树
    Floyd算法求多源最短路
    spfa判断负环
    java 线程的使用
    java IO基础
    数据库 EXISTS与NOT EXISTS
    数据库 何为相关查询和不相关查询?
    数据库的基础知识
  • 原文地址:https://www.cnblogs.com/switchvov/p/15070110.html
Copyright © 2011-2022 走看看