zoukankan      html  css  js  c++  java
  • RabbitMQ整合Spring

    之前我们使用 RabbitMQ 原生的 API 方法来实现MQ的使用,Spring 也提供了 RabbitMQ 的集成,让我们更方便的使用MQ,让我们来学习下吧。

    Spring AMQP 是基于 Spring 框架的 AMQP 消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO 的消息监听等,很大方便我们使用 RabbitMQ 程序的相关开发。

    一、RabbitAdmin 管理组件

    1.1 准备工作:

    1. 添加 Spring AMQP 依赖
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.3.1.RELEASE</version>
    </dependency>
    
    1. 声明 Bean 对象
    @Configuration
    public class RabbitMQConfig {
        /**
         * 注入连接工厂对象
         * @return
         */
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setHost("111.231.83.100");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            return connectionFactory;
        }
        
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            // 必须显式设置为 True ,否则 Spring 容器不会加载
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    }
    

    1.2 Exchange 操作

    相关方法:

    方法定义 作用
    void declareExchange(Exchange exchange) 声明交换机
    boolean deleteExchange(String exchange) 删除交换机

    添加交换机

    @SpringBootTest
    public class ExchangeAddTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
    
        @Test
        public void shouldAddExchangeSuccess() {
            rabbitAdmin.declareExchange(new DirectExchange("admin.direct", true, false));
            rabbitAdmin.declareExchange(new TopicExchange("admin.topic", false, true));
            rabbitAdmin.declareExchange(new FanoutExchange("admin.fanout", false, false));
        }
    }    
    

    删除交换机

    @SpringBootTest
    public class ExchangeAddTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
        
        @Test
        public void shouldDeleteExchangeSuccess() {
            boolean result = rabbitAdmin.deleteExchange("admin.direct");
            Assert.assertTrue(result);
        }
    }    
    

    1.3 Queue 操作

    方法定义 作用
    Queue declareQueue() 声明默认队列
    String declareQueue(Queue queue) 申明给定的队列
    boolean deleteQueue(String queueName) 删除队列
    void deleteQueue(String queueName, boolean unused, boolean empty) 删除队列
    void purgeQueue(String queueName, boolean noWait) 清除队列信息,noWait = true 时异步执行
    int purgeQueue(String queueName) 清除队列信息
    Properties getQueueProperties(String queueName) 获取指定队列的属性

    声明队列

    @SpringBootTest
    public class QueueTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
    
        @Test
        public void shouldAddQueueSuccess() {
            // 创建默认队列
            Queue defaultQueue = rabbitAdmin.declareQueue();
            Assert.assertNotNull(defaultQueue);
            Assert.assertEquals(false,defaultQueue.isDurable());
    
            // 创建指定名称和是否持久化属性的队列
            String  queueName =  rabbitAdmin.declareQueue(new Queue("orderQueue",true));
            Assert.assertNotNull(queueName);
            Assert.assertEquals("orderQueue",queueName);
        }
    }    
    

    注: 默认的队列因为设置 exclusive = true ,导致在其连接断开的时候自动删除,所以图中看不到。

    删除队列

    @SpringBootTest
    public class QueueTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
       
        @Test
        public void shouldDeleteQueueSuccess() {
            boolean result = rabbitAdmin.deleteQueue("orderQueue");
            Assert.assertTrue(result);
        }   
    }    
    

    1.4 Binding 绑定

    方法定义 作用
    void declareBinding(Binding binding) 声明队列与交换机的绑定
    void removeBinding(Binding binding) 删除队列与交换机的绑定

    声明队列与交换机的绑定

    @SpringBootTest
    public class BindingTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
    
        @Test
        public void shouldBindingSuccess() {
            // 交换机名称
            String exchange = "admin.topic";
            // 队列名称
            String queueName = "orderQueue";
            // 1.创建绑定关系对象
            Binding binding =
                    BindingBuilder
                            // 创建队列
                            .bind(new Queue(queueName, true))
                            // 创建交换机
                            .to(new TopicExchange(exchange, true, false))
                            // 指定路由 Key
                            .with("order#");
            // 2.进行绑定
            rabbitAdmin.declareBinding(binding);
        }
    
    }
    

    删除队列与交换机的绑定

    @SpringBootTest
    public class BindingTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
    	
        @Test
        public void shouldUnBindingSuccess() {
            // 交换机名称
            String exchange = "admin.topic";
            // 队列名称
            String queueName = "orderQueue";
            Binding binding =
                    new Binding(queueName, Binding.DestinationType.QUEUE, exchange, "order#", null);
            rabbitAdmin.removeBinding(binding);
        }
    }    
    

    1.5 bean 注入

    除了上面的通过代码显式申明交换机、队列、路由 之外,还可以通过 Bena 注入的形式申明。

    @Configuration
    public class RabbitMQConfig {
    
        /**
         * 注入连接工厂对象
         *
         * @return
         */
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setHost("111.231.83.100");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            return connectionFactory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            // 必须显式设置为 True ,否则 Spring 容器不会加载
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    
        @Bean
        public TopicExchange beanExchange() {
            return new TopicExchange("beanExchange", true, false);
        }
    
        @Bean
        public Queue beanQueue() {
            return new Queue("beanQueue", true);
        }
    
        @Bean
        public Binding beanBinding(TopicExchange beanExchange, Queue beanQueue) {
            return BindingBuilder
                    // 创建队列
                    .bind(beanQueue)
                    // 创建交换机
                    .to(beanExchange)
                    // 指定路由 Key
                    .with("bean#");
        }
    }
    
    @SpringBootTest
    public class BeanInjectionBindingTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
        @Autowired
        private Binding beanBinding;
    
        @Test
        public void shouldBindingSuccess() {
            rabbitAdmin.declareBinding(beanBinding);
        }
    
    }
    

    二、RabbitTemplate 模板组件

    如果你看过 RabbitAdmin 的源码,可以看到里面使用到了一个叫做 RabbitTemplate 的对象,它就是 Spring 提供的消息模板,封装了 RabbitMQ 核心 API 的一系列方法,而 RabbitAdmin 是在它之上的另一层封装。

    2.1 常用方法

    方法定义 作用
    void send(Message message) 发送消息
    void convertAndSend(Object object) 将 Java 对象包装成 Message 对象并发送 ,Java 对象需要实现 Serializable 序列化接口
    Message receive(String queueName) 接收消息
    receiveAndConvert(String queueName) 接收消息并将 Message 转换成 Java 对

    2.2 发送消息

    @SpringBootTest
    public class MessageTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void shouldSendMessageSuccess(){
            // 创建消息属性对象
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.getHeaders().put("desc", "信息描述..");
            messageProperties.getHeaders().put("type", "自定义消息类型..");
            // 创建消息对象
            Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
            // 发送消息
            rabbitTemplate.send("beanExchange","bean#",message);
    
            // 发送消息时额外增加属性
            Message newMessage = new Message("newMessage".getBytes(), messageProperties);
            rabbitTemplate.convertAndSend("beanExchange", "bean#", newMessage, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                    message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                    return message;
                }
            });
        }
    }    
    

    2.3 手动接收消息

    @SpringBootTest
    public class MessageTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;  
    	
        @Test
        public void shouldConsumeMessageSuccess() {
            Message msg = rabbitTemplate.receive("beanQueue", 2000l);
            System.out.println("消息内容:" + new String(msg.getBody()));
            final Map<String, Object> headers = msg.getMessageProperties().getHeaders();
            System.out.println("=======消息头属性=======");
            for (String key : headers.keySet()) {
                System.out.println("key =" + key + " ; value =" + headers.get(key));
            }
        }
    }    
    

    执行方法,观察控制台输出:

    消息内容:Hello RabbitMQ
    =======消息头属性=======
    key =type ; value =自定义消息类型..
    key =desc ; value =信息描述..
    

    再次执行方法,观察控制台输出:

    消息内容:newMessage
    =======消息头属性=======
    key =type ; value =自定义消息类型..
    key =attr ; value =额外新加的属性
    key =desc ; value =额外修改的信息描述
    

    2.4 消息监听容器

    在实际项目中我们不可能采用手动接收消息的形式来消费消息,这个时候 Spring 就为我们提供了一个消息监听容器 SimpleMessageListenerContainer

    它的功能如下:

    * 监听多个队列
    * 设置消费者消费数量
    * 设置消息确认和自动确认模式
    * 是否重回队列
    * 异常捕获 handel 函数
    * 设置消费者属性
    * 设置具体的监听器和消息转换器
    

    SimpleMessageListenerContainer 可以在运行过程中动态修改属性,如修改消费者消费数量大小、接收消息的模式等

    @Configuration
    public class RabbitMQConfig {
        ......
    	@Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,Queue beanQueue) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            // 设置监听队列,可以有多个
            container.setQueues(beanQueue);
            // 设置并发消费者数量
            container.setConcurrentConsumers(1);
            // 设置最大并发消费者数量
            container.setMaxConcurrentConsumers(5);
            // 设置是否重回队列
            container.setDefaultRequeueRejected(false);
            // 设置签收模式,这里为了演示使用自动签收,实际项目中需要使用手动签收 AcknowledgeMode.MANUAL
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            // 设置消费端标签策略
            container.setConsumerTagStrategy(new ConsumerTagStrategy() {
                @Override
                public String createConsumerTag(String queue) {
                    return queue + "_" + UUID.randomUUID().toString();
                }
            });
            // 设置消息监听
            container.setMessageListener(new ChannelAwareMessageListener() {
                @Override
                public void onMessage(Message message, Channel channel) throws Exception {
                    String msg = new String(message.getBody());
                    System.err.println("消费端监听:" + msg);
                }
            });
            return container;
        }
    }
    

    启动应用后,执行发送消息测试方法 shouldSendMessageSuccess ,观察控制台输出:

    消费端监听:Hello RabbitMQ
    

    证明消费端监听并消费成功。

    2.5 消息监听适配器

    除了直接使用 ChannelAwareMessageListener 实现消息事件监听外,还可以通过消息监听适配器(MessageListenerAdapter),通过反射将消息处理委托给目标监听器的处理方法,并进行灵活的消息类型转换。允许监听器方法对消息内容类型进行操作,完全独立于 Rabbit API

    实际上就是相当于自己实现 ChannelAwareMessageListener 功能。

    1. 新建 MessageDelegate
    public class MessageDelegate {
        public void consumeMessage(byte[] messageBody) {
            System.err.println("默认方法, 消息内容:" + new String(messageBody));
        }
    }
    
    1. 替换原 ChannelAwareMessageListener 事件
    //   container.setMessageListener(new ChannelAwareMessageListener() {
    //       @Override
    //       public void onMessage(Message message, Channel channel) throws Exception {
    //           String msg = new String(message.getBody());
    //           System.err.println("消费端监听:" + msg);
    //       }
    //   });
    // 设置消息监听
    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    // adapter 默认执行方法是 handleMessage,这里我们设置自定义方法名
    adapter.setDefaultListenerMethod("consumeMessage");
    container.setMessageListener(adapter);
    
    1. 启动应用后,执行发送消息测试方法 shouldSendMessageSuccess ,观察控制台输出:
    默认方法, 消息内容:Hello RabbitMQ
    

    证明消费端监听并消费成功。

    我们还可以将队列名和方法做绑定,实现转发功能:

    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    // adapter 默认执行方法是 handleMessage,这里我们设置自定义方法名
    // adapter.setDefaultListenerMethod("consumeMessage");
    Map<String, String> queueOrTagToMethodName = new HashMap<>();
    queueOrTagToMethodName.put(beanQueue.getName(), "consumeMessage");
    adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
    container.setMessageListener(adapter);    
    

    2.6 消息转换器

    我们现在发送和接受消息的类型都是二进制形式传输,我们可以通过 MessageConverter 进行转换。

    1. 新建 TextMessageConverter
    public class TextMessageConverter implements MessageConverter {
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            return new Message(object.toString().getBytes(), messageProperties);
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            String contentType = message.getMessageProperties().getContentType();
            if (null != contentType && contentType.contains("text")) {
                return new String(message.getBody());
            }
            return message.getBody();
        }
    }
    
    1. adapter 设置转换类
    adapter.setMessageConverter(new TextMessageConverter());
    
    1. MessageDelegate 新增 字符串参数的方法
    public class MessageDelegate {
    	......
        public void consumeMessage(String messageBody){
            System.err.println("字符串类型, 消息内容:" + new String(messageBody));
        }
    }
    
    1. 新增发送文本消息测试方法
    @Test
    public void shouldSendTextMessageSuccess() {
        // 创建消息属性对象
        MessageProperties messageProperties = new MessageProperties();
        // 通过设置属性,让消费端知道要将消息内容转换成文本类型
        messageProperties.setContentType("text");
        // 创建消息对象
        Message message = new Message("字符串消息".getBytes(), messageProperties);
        // 发送消息
        rabbitTemplate.send("beanExchange", "bean#", message);
    }
    

    启动应用后,执行发送消息测试方法,观察控制台输出:

    字符串类型, 消息内容:字符串消息
    
  • 相关阅读:
    Window 窗口类
    使用 Bolt 实现 GridView 表格控件
    lua的table库
    Windows编程总结之 DLL
    lua 打印 table 拷贝table
    使用 xlue 实现简单 listbox 控件
    使用 xlue 实现 tips
    extern “C”
    COleVariant如何转换为int double string cstring
    原来WIN32 API也有GetOpenFileName函数
  • 原文地址:https://www.cnblogs.com/markLogZhu/p/13273809.html
Copyright © 2011-2022 走看看