zoukankan      html  css  js  c++  java
  • Spring Boot整合RabbitMQ

    1.引入pom.xml

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    2.application.yml配置

    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: admin
        password: admin
        publisher-confirms: true
        publisher-returns: true
        template:
          mandatory: true
        listener:
          concurrency: 2
          #最小消息监听线程数
          max-concurrency: 2
          #最大消息监听线程数

    3.创建MQ配置文件

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.beans.factory.config.ConfigurableBeanFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Scope;
    
    @Slf4j
    @Configuration
    public class RabbitConfig {
    
        @Value("${spring.rabbitmq.host}")
        private String host;
    
        @Value("${spring.rabbitmq.port}")
        private int port;
    
        @Value("${spring.rabbitmq.username}")
        private String username;
    
        @Value("${spring.rabbitmq.password}")
        private String password;
    
        public static final String EXCHANGE_A = "my-mq-exchange_A";
    
        public static final String QUEUE_A = "QUEUE_A";
    
        public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
    
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setPublisherConfirms(true);
            return connectionFactory;
        }
    
        @Bean
        @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(connectionFactory());
            return template;
        }
    
        @Bean
        public DirectExchange defaultExchange() {
            return new DirectExchange(EXCHANGE_A);
        }
    
        /**
         * 获取队列A
         *
         * @return
         */
        @Bean
        public Queue queueA() {
            // 队列持久
            return new Queue(QUEUE_A, true);
        }
    
        /**
         * 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。
         *
         * @return
         */
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
        }
    }

    4.创建生产者

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import java.util.UUID;
    
    @Slf4j
    @Component
    public class MsgProducer implements RabbitTemplate.ConfirmCallback {
    
        /**
         * 由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入
         */
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 构造方法注入rabbitTemplate
         */
        @Autowired
        public MsgProducer(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
            rabbitTemplate.setConfirmCallback(this);
        }
    
        public void sendMsg(String content) {
            CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
            System.out.println("队列的值: " + content);
            System.out.println(correlationId.toString());
            //把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);
        }
    
        /**
         * 回调
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            log.info(" 回调id:" + correlationData);
            if (ack) {
                log.info("消息成功消费");
            } else {
                log.info("消息消费失败:" + cause);
            }
        }
    }

    5.创建消费者,监听队列

    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = RabbitConfig.QUEUE_A)
    public class MsgReceiver {
    
        @RabbitHandler
        public void process(String content) {
            System.out.println("接收处理队列A当中的消息: " + content);
        }
    }

    6.配置spring.factories,设置SPI自动加载

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=
        com.pig4cloud.pigx.common.data.cache.RedisTemplateConfig,
        com.pig4cloud.pigx.common.data.cache.RedisCacheManagerConfig,
        com.pig4cloud.pigx.common.data.cache.RedisCacheAutoConfiguration,
        com.pig4cloud.pigx.common.data.rabbitmq.RabbitConfig,
        com.pig4cloud.pigx.common.data.rabbitmq.MsgProducer,
        com.pig4cloud.pigx.common.data.rabbitmq.MsgReceiver,
        com.pig4cloud.pigx.common.data.mybatis.MybatisPlusConfig

    7.创建Junit测试类

    import cn.hutool.core.date.DateUtil;
    import com.pig4cloud.pigx.common.data.rabbitmq.MsgProducer;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqTest {
    
        @Autowired
        private MsgProducer sender;
    
        @Before
        public void init() {
            System.out.println("开始测试-----------------");
        }
        @After
        public void after() {
            System.out.println("测试结束-----------------");
        }
    
        @Test
        public void sendTest() throws Exception {
            String msg = DateUtil.date().toString();
            sender.sendMsg(msg);
            Thread.sleep(10000);
        }
    }

  • 相关阅读:
    Saltstack module acl 详解
    Saltstack python client
    Saltstack简单使用
    P5488 差分与前缀和 NTT Lucas定理 多项式
    CF613D Kingdom and its Cities 虚树 树形dp 贪心
    7.1 NOI模拟赛 凸包套凸包 floyd 计算几何
    luogu P5633 最小度限制生成树 wqs二分
    7.1 NOI模拟赛 dp floyd
    springboot和springcloud
    springboot集成mybatis
  • 原文地址:https://www.cnblogs.com/fangts/p/10723113.html
Copyright © 2011-2022 走看看