zoukankan      html  css  js  c++  java
  • # SpringBoot + Spring AMQP 整合 RabbitMQ

    SpringAMQP介绍

    • 官网:https://spring.io/projects/spring-amqp
    • Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等
    • 提供不依赖于任何特定的AMQP代理实现或客户端库通用的抽象,最终用户代码将很容易实现更易替换、添加和删除AMQP,因为它可以只针对抽象层来开发
    • 总之就是提高我们的框架整合消息队列的效率,SpringBoot为更方便开发RabbitMQ推出了starter

    新建SpringBoot 2.X 项目

    创建 SpringBoot 项目这里就不赘述了,我们直接看下面步骤

    添加 spring-boot-starter-amqp 依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    spring-boot-starter-amqp 依赖底层默认支持 rabbitmq,在 springboot 项目中如果要使用 rabbitmq,只需要应为 amqp 依赖即可,使用 rabbitmq 更加抽象,更加方便。

    如下图所示:amqp 底层支持 rabbitmq 依赖

    amqp支持rabbitmq依赖

    配置 application.yml 文件

    spring:
      rabbitmq:
        host: "localhost"
        port: 5672
        username: "admin"
        password: "secret"
    

    rabbitmq 相关配置都会被 RabbitProperties 配置类读取使用,并最终由 RabbitAutoConfiguration 自动配置类 使用,以上配置用于建立 CachingConnectionFactory 连接工厂的基础配置,更多配置在后面会一 一讲解使用。

    CachingConnectionFactory配置

    创建 RabbitMQConfig配置类

    RabbitMQ 相关配置以及 Exchange、Queue、Binding关系 等创建都建议放在配置类中统一管理,并且被 Spring 管理。这样项目在启动是这些配置就会生效。

    @Configuration
    public class RabbitMQConfig {
    
        public static final String EXCHANGE_NAME = "order_exchange";
        public static final String QUEUE_NAME = "order_queue";
        
       /**
         * 使用json序列化机制,进行消息转换
         *
         * @return
         */
        @Bean
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
        
        /**
         * 交换机
         * @return
         */
        @Bean
        public Exchange orderExchange() {
            return new TopicExchange(EXCHANGE_NAME, true, false);
        }
    
        /**
         * 队列
         * @return
         */
        @Bean
        public Queue orderQueue() {
            return new Queue(QUEUE_NAME, true, false, false, null);
        }
    
        /**
         * 交换机和队列绑定关系
         */
        @Bean
        public Binding orderBinding(Queue queue, Exchange exchange) {
            return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "order.#", null);
        }
    }
    

    消息生产者 - 测试类

    @SpringBootTest
    class DemoApplicationTests {
    
      @Autowired
      private RabbitTemplate template;
    
      @Test
      void send() {
        template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","订单来啦");
      }
    }
    

    消息消费者

    在 SpringBoot 中消息消息非常简单,只需要将 Rabbit 相关注解添加在类和方法上就可以消费消息

    @RabbitListener 和 @RabbitHandler 注解使用

    • @RabbitListener 可以标注在方法上单独使用,也可以标注在类上,配合 @RabbitHandler 注解一起使用
    • @RabbitListener 标注在方法上时需要指明消费的队列名称,当监听到队列中有消息时,就进行处理
    • @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型

    代码实现

    @Component
    @RabbitListener(queues = "order_queue")
    public class OrderMQListener {
    
        /**
         * RabbitHandler 会自动匹配 消息类型(消息自动确认)
         * @param msg
         * @param message
         * @throws IOException
         */
        @RabbitHandler
        public void releaseCouponRecord(String msg, Message message) throws IOException {
            long msgTag = message.getMessageProperties().getDeliveryTag();
            System.out.println("msgTag="+msgTag);
            System.out.println("message="+message.toString());
            System.out.println("监听到消息:消息内容:"+message.getBody());
        }
    }
    

    参考资料

    小D课堂 - 玩转新版高性能RabbitMQ容器化分布式集群实战

  • 相关阅读:
    计算公司下班时间(娱乐)
    设计模式-观察者模式
    vue 打包后白屏的问题
    HashSet实现原理
    LinkedList实现原理
    ArrayList的实现原理
    HashMap实现原理分析
    深入理解java动态代理机制
    Spring工作原理
    数据库SQL优化大总结之 百万级数据库优化方案
  • 原文地址:https://www.cnblogs.com/dtdx/p/14371873.html
Copyright © 2011-2022 走看看