zoukankan      html  css  js  c++  java
  • RabbitMQ整合Spring

    之前我们使用 RabbitMQ 原生的 API 方法来实现MQ的使用,Spring 也提供了 RabbitMQ 的集成,让我们更方便的使用MQ,让我们来学习下吧。

    Spring AMQP 是基于 Spring 框架的 AMQP 消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO 的消息监听等,很大方便我们使用 RabbitMQ 程序的相关开发。

    一、RabbitAdmin 管理组件

    1.1 准备工作:

    1. 添加 Spring AMQP 依赖
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.3.1.RELEASE</version>
    </dependency>
    
    1. 声明 Bean 对象
    @Configuration
    public class RabbitMQConfig {
        /**
         * 注入连接工厂对象
         * @return
         */
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setHost("111.231.83.100");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            return connectionFactory;
        }
        
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            // 必须显式设置为 True ,否则 Spring 容器不会加载
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    }
    

    1.2 Exchange 操作

    相关方法:

    方法定义 作用
    void declareExchange(Exchange exchange) 声明交换机
    boolean deleteExchange(String exchange) 删除交换机

    添加交换机

    @SpringBootTest
    public class ExchangeAddTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
    
        @Test
        public void shouldAddExchangeSuccess() {
            rabbitAdmin.declareExchange(new DirectExchange("admin.direct", true, false));
            rabbitAdmin.declareExchange(new TopicExchange("admin.topic", false, true));
            rabbitAdmin.declareExchange(new FanoutExchange("admin.fanout", false, false));
        }
    }    
    

    删除交换机

    @SpringBootTest
    public class ExchangeAddTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
        
        @Test
        public void shouldDeleteExchangeSuccess() {
            boolean result = rabbitAdmin.deleteExchange("admin.direct");
            Assert.assertTrue(result);
        }
    }    
    

    1.3 Queue 操作

    方法定义 作用
    Queue declareQueue() 声明默认队列
    String declareQueue(Queue queue) 申明给定的队列
    boolean deleteQueue(String queueName) 删除队列
    void deleteQueue(String queueName, boolean unused, boolean empty) 删除队列
    void purgeQueue(String queueName, boolean noWait) 清除队列信息,noWait = true 时异步执行
    int purgeQueue(String queueName) 清除队列信息
    Properties getQueueProperties(String queueName) 获取指定队列的属性

    声明队列

    @SpringBootTest
    public class QueueTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
    
        @Test
        public void shouldAddQueueSuccess() {
            // 创建默认队列
            Queue defaultQueue = rabbitAdmin.declareQueue();
            Assert.assertNotNull(defaultQueue);
            Assert.assertEquals(false,defaultQueue.isDurable());
    
            // 创建指定名称和是否持久化属性的队列
            String  queueName =  rabbitAdmin.declareQueue(new Queue("orderQueue",true));
            Assert.assertNotNull(queueName);
            Assert.assertEquals("orderQueue",queueName);
        }
    }    
    

    注: 默认的队列因为设置 exclusive = true ,导致在其连接断开的时候自动删除,所以图中看不到。

    删除队列

    @SpringBootTest
    public class QueueTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
       
        @Test
        public void shouldDeleteQueueSuccess() {
            boolean result = rabbitAdmin.deleteQueue("orderQueue");
            Assert.assertTrue(result);
        }   
    }    
    

    1.4 Binding 绑定

    方法定义 作用
    void declareBinding(Binding binding) 声明队列与交换机的绑定
    void removeBinding(Binding binding) 删除队列与交换机的绑定

    声明队列与交换机的绑定

    @SpringBootTest
    public class BindingTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
    
        @Test
        public void shouldBindingSuccess() {
            // 交换机名称
            String exchange = "admin.topic";
            // 队列名称
            String queueName = "orderQueue";
            // 1.创建绑定关系对象
            Binding binding =
                    BindingBuilder
                            // 创建队列
                            .bind(new Queue(queueName, true))
                            // 创建交换机
                            .to(new TopicExchange(exchange, true, false))
                            // 指定路由 Key
                            .with("order#");
            // 2.进行绑定
            rabbitAdmin.declareBinding(binding);
        }
    
    }
    

    删除队列与交换机的绑定

    @SpringBootTest
    public class BindingTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
    	
        @Test
        public void shouldUnBindingSuccess() {
            // 交换机名称
            String exchange = "admin.topic";
            // 队列名称
            String queueName = "orderQueue";
            Binding binding =
                    new Binding(queueName, Binding.DestinationType.QUEUE, exchange, "order#", null);
            rabbitAdmin.removeBinding(binding);
        }
    }    
    

    1.5 bean 注入

    除了上面的通过代码显式申明交换机、队列、路由 之外,还可以通过 Bena 注入的形式申明。

    @Configuration
    public class RabbitMQConfig {
    
        /**
         * 注入连接工厂对象
         *
         * @return
         */
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setHost("111.231.83.100");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            return connectionFactory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            // 必须显式设置为 True ,否则 Spring 容器不会加载
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    
        @Bean
        public TopicExchange beanExchange() {
            return new TopicExchange("beanExchange", true, false);
        }
    
        @Bean
        public Queue beanQueue() {
            return new Queue("beanQueue", true);
        }
    
        @Bean
        public Binding beanBinding(TopicExchange beanExchange, Queue beanQueue) {
            return BindingBuilder
                    // 创建队列
                    .bind(beanQueue)
                    // 创建交换机
                    .to(beanExchange)
                    // 指定路由 Key
                    .with("bean#");
        }
    }
    
    @SpringBootTest
    public class BeanInjectionBindingTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
        @Autowired
        private Binding beanBinding;
    
        @Test
        public void shouldBindingSuccess() {
            rabbitAdmin.declareBinding(beanBinding);
        }
    
    }
    

    二、RabbitTemplate 模板组件

    如果你看过 RabbitAdmin 的源码,可以看到里面使用到了一个叫做 RabbitTemplate 的对象,它就是 Spring 提供的消息模板,封装了 RabbitMQ 核心 API 的一系列方法,而 RabbitAdmin 是在它之上的另一层封装。

    2.1 常用方法

    方法定义 作用
    void send(Message message) 发送消息
    void convertAndSend(Object object) 将 Java 对象包装成 Message 对象并发送 ,Java 对象需要实现 Serializable 序列化接口
    Message receive(String queueName) 接收消息
    receiveAndConvert(String queueName) 接收消息并将 Message 转换成 Java 对

    2.2 发送消息

    @SpringBootTest
    public class MessageTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void shouldSendMessageSuccess(){
            // 创建消息属性对象
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.getHeaders().put("desc", "信息描述..");
            messageProperties.getHeaders().put("type", "自定义消息类型..");
            // 创建消息对象
            Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
            // 发送消息
            rabbitTemplate.send("beanExchange","bean#",message);
    
            // 发送消息时额外增加属性
            Message newMessage = new Message("newMessage".getBytes(), messageProperties);
            rabbitTemplate.convertAndSend("beanExchange", "bean#", newMessage, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                    message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                    return message;
                }
            });
        }
    }    
    

    2.3 手动接收消息

    @SpringBootTest
    public class MessageTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;  
    	
        @Test
        public void shouldConsumeMessageSuccess() {
            Message msg = rabbitTemplate.receive("beanQueue", 2000l);
            System.out.println("消息内容:" + new String(msg.getBody()));
            final Map<String, Object> headers = msg.getMessageProperties().getHeaders();
            System.out.println("=======消息头属性=======");
            for (String key : headers.keySet()) {
                System.out.println("key =" + key + " ; value =" + headers.get(key));
            }
        }
    }    
    

    执行方法,观察控制台输出:

    消息内容:Hello RabbitMQ
    =======消息头属性=======
    key =type ; value =自定义消息类型..
    key =desc ; value =信息描述..
    

    再次执行方法,观察控制台输出:

    消息内容:newMessage
    =======消息头属性=======
    key =type ; value =自定义消息类型..
    key =attr ; value =额外新加的属性
    key =desc ; value =额外修改的信息描述
    

    2.4 消息监听容器

    在实际项目中我们不可能采用手动接收消息的形式来消费消息,这个时候 Spring 就为我们提供了一个消息监听容器 SimpleMessageListenerContainer

    它的功能如下:

    * 监听多个队列
    * 设置消费者消费数量
    * 设置消息确认和自动确认模式
    * 是否重回队列
    * 异常捕获 handel 函数
    * 设置消费者属性
    * 设置具体的监听器和消息转换器
    

    SimpleMessageListenerContainer 可以在运行过程中动态修改属性,如修改消费者消费数量大小、接收消息的模式等

    @Configuration
    public class RabbitMQConfig {
        ......
    	@Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,Queue beanQueue) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            // 设置监听队列,可以有多个
            container.setQueues(beanQueue);
            // 设置并发消费者数量
            container.setConcurrentConsumers(1);
            // 设置最大并发消费者数量
            container.setMaxConcurrentConsumers(5);
            // 设置是否重回队列
            container.setDefaultRequeueRejected(false);
            // 设置签收模式,这里为了演示使用自动签收,实际项目中需要使用手动签收 AcknowledgeMode.MANUAL
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            // 设置消费端标签策略
            container.setConsumerTagStrategy(new ConsumerTagStrategy() {
                @Override
                public String createConsumerTag(String queue) {
                    return queue + "_" + UUID.randomUUID().toString();
                }
            });
            // 设置消息监听
            container.setMessageListener(new ChannelAwareMessageListener() {
                @Override
                public void onMessage(Message message, Channel channel) throws Exception {
                    String msg = new String(message.getBody());
                    System.err.println("消费端监听:" + msg);
                }
            });
            return container;
        }
    }
    

    启动应用后,执行发送消息测试方法 shouldSendMessageSuccess ,观察控制台输出:

    消费端监听:Hello RabbitMQ
    

    证明消费端监听并消费成功。

    2.5 消息监听适配器

    除了直接使用 ChannelAwareMessageListener 实现消息事件监听外,还可以通过消息监听适配器(MessageListenerAdapter),通过反射将消息处理委托给目标监听器的处理方法,并进行灵活的消息类型转换。允许监听器方法对消息内容类型进行操作,完全独立于 Rabbit API

    实际上就是相当于自己实现 ChannelAwareMessageListener 功能。

    1. 新建 MessageDelegate
    public class MessageDelegate {
        public void consumeMessage(byte[] messageBody) {
            System.err.println("默认方法, 消息内容:" + new String(messageBody));
        }
    }
    
    1. 替换原 ChannelAwareMessageListener 事件
    //   container.setMessageListener(new ChannelAwareMessageListener() {
    //       @Override
    //       public void onMessage(Message message, Channel channel) throws Exception {
    //           String msg = new String(message.getBody());
    //           System.err.println("消费端监听:" + msg);
    //       }
    //   });
    // 设置消息监听
    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    // adapter 默认执行方法是 handleMessage,这里我们设置自定义方法名
    adapter.setDefaultListenerMethod("consumeMessage");
    container.setMessageListener(adapter);
    
    1. 启动应用后,执行发送消息测试方法 shouldSendMessageSuccess ,观察控制台输出:
    默认方法, 消息内容:Hello RabbitMQ
    

    证明消费端监听并消费成功。

    我们还可以将队列名和方法做绑定,实现转发功能:

    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    // adapter 默认执行方法是 handleMessage,这里我们设置自定义方法名
    // adapter.setDefaultListenerMethod("consumeMessage");
    Map<String, String> queueOrTagToMethodName = new HashMap<>();
    queueOrTagToMethodName.put(beanQueue.getName(), "consumeMessage");
    adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
    container.setMessageListener(adapter);    
    

    2.6 消息转换器

    我们现在发送和接受消息的类型都是二进制形式传输,我们可以通过 MessageConverter 进行转换。

    1. 新建 TextMessageConverter
    public class TextMessageConverter implements MessageConverter {
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            return new Message(object.toString().getBytes(), messageProperties);
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            String contentType = message.getMessageProperties().getContentType();
            if (null != contentType && contentType.contains("text")) {
                return new String(message.getBody());
            }
            return message.getBody();
        }
    }
    
    1. adapter 设置转换类
    adapter.setMessageConverter(new TextMessageConverter());
    
    1. MessageDelegate 新增 字符串参数的方法
    public class MessageDelegate {
    	......
        public void consumeMessage(String messageBody){
            System.err.println("字符串类型, 消息内容:" + new String(messageBody));
        }
    }
    
    1. 新增发送文本消息测试方法
    @Test
    public void shouldSendTextMessageSuccess() {
        // 创建消息属性对象
        MessageProperties messageProperties = new MessageProperties();
        // 通过设置属性,让消费端知道要将消息内容转换成文本类型
        messageProperties.setContentType("text");
        // 创建消息对象
        Message message = new Message("字符串消息".getBytes(), messageProperties);
        // 发送消息
        rabbitTemplate.send("beanExchange", "bean#", message);
    }
    

    启动应用后,执行发送消息测试方法,观察控制台输出:

    字符串类型, 消息内容:字符串消息
    
  • 相关阅读:
    python排序函数sort()与sorted()区别
    python中lambda的用法
    Python中如何获取类属性的列表
    百度编辑器UEditor源码模式下过滤div/style等html标签
    【Flask】关于Flask的request属性
    python json.dumps() json.dump()的区别
    SQLAlchemy技术文档(中文版)(全)
    Flask中'endpoint'(端点)的理解
    SqlAlchemy个人学习笔记完整汇总-转载
    MySQL数据类型和常用字段属性总结
  • 原文地址:https://www.cnblogs.com/markLogZhu/p/13273809.html
Copyright © 2011-2022 走看看