zoukankan      html  css  js  c++  java
  • Springboot整合RabbitMQ

    1.pom修改

    引入如下依赖:

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    2.修改application.properties 文件

    spring.rabbitmq.host=192.168.99.100
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest

    查看rabbitmq的配置如下:(还可以指定vhost等参数)

    @ConfigurationProperties(prefix = "spring.rabbitmq")
    public class RabbitProperties {
    
        /**
         * RabbitMQ host.
         */
        private String host = "localhost";
    
        /**
         * RabbitMQ port.
         */
        private int port = 5672;
    
        /**
         * Login user to authenticate to the broker.
         */
        private String username;
    
        /**
         * Login to authenticate against the broker.
         */
        private String password;
    
        /**
         * SSL configuration.
         */
        private final Ssl ssl = new Ssl();
    
        /**
         * Virtual host to use when connecting to the broker.
         */
        private String virtualHost;
    
        /**
         * Comma-separated list of addresses to which the client should connect.
         */
        private String addresses;
    
        /**
         * Requested heartbeat timeout, in seconds; zero for none.
         */
        private Integer requestedHeartbeat;
    
        /**
         * Enable publisher confirms.
         */
        private boolean publisherConfirms;
    
        /**
         * Enable publisher returns.
         */
        private boolean publisherReturns;
    
        /**
         * Connection timeout, in milliseconds; zero for infinite.
         */
        private Integer connectionTimeout;
    
        /**
         * Cache configuration.
         */
        private final Cache cache = new Cache();
    
        /**
         * Listener container configuration.
         */
        private final Listener listener = new Listener();
    
        private final Template template = new Template();
    
        private List<Address> parsedAddresses;
    
            ...
    }

    3.direct类型的消息的发送和接收

    1. 消息发送

    1. 增加配置类,声明交换机和队列、以及将队列和交换机绑定

    package cn.qlq.rabbitmq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitConfig {
    
        /**
         * 配置一个directExchange
         * 
         * @return
         */
        @Bean
        public DirectExchange directExchange() {return new DirectExchange("bootDirectExchange");
        }
    
        /**
         * 配置一个队列
         * 
         * @return
         */
        @Bean
        public Queue directQueue() {
            return new Queue("bootDirectQueue");
        }
    
        /**
         * 建立一个绑定:队列和交换机绑定
         * 
         * @param directExchange
         *            交换机,上面的bean,用于自动注入
         * @param directQueue
         *            队列,上面的队列,自动注入
         * @return
         */
        @Bean
        public Binding directBinding(DirectExchange directExchange, Queue directQueue) {return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRoutingKey");
        }
    
    }

    2.生产者:

    接口

    package cn.qlq.rabbitmq;
    
    public interface MessageService {
    
        void sendDirectMsg(String msg);
    
    }

    实现类:

    package cn.qlq.rabbitmq;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    @Service
    public class MessageServiceImpl implements MessageService {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        @Override
        public void sendDirectMsg(String msg) {
            // 第一种发送方式
            // MessageProperties property = null;
            // Message message = new Message(msg.getBytes(), property );
            // amqpTemplate.send(message );
    
            // 第二种:
            amqpTemplate.convertSendAndReceive("bootDirectExchange", "bootDirectRoutingKey", msg);
        }
    
    }

    3. 测试类

    package rabbitmq;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import cn.qlq.MySpringBootApplication;
    import cn.qlq.rabbitmq.MessageService;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = MySpringBootApplication.class)
    public class MQTest {
    
        @Autowired
        private MessageService messageService;
    
        @Test
        public void sendDirectMsgTest() {
            messageService.sendDirectMsg("bootDirect msg===123456");
        }
    
    }

    4.测试:

    测试后可以到rabbitmq查看队列中有一条消息

     2.消息接收

    消息接收有两种方式:

    方式一:

        @Override
        public void receiveDirectMsg() {
            String msg = (String) amqpTemplate.receiveAndConvert("bootDirectQueue");
            System.out.println("接收到d消息: " + msg);
        }

    这种方式只能接收到一次消息,无法持续性的接收消息。

    方式二:使用监听器的方式持续性接收消息

        /**
         * 监听器接收消息。不需要手动调用,springboot会自动监听
         */
        @RabbitListener(queues = { "bootDirectQueue" })
        @Override
        public void receiveDirectMsg(String msg) {
            System.out.println("监听器接收到的消息: " + msg);
        }

      这种方式会持续性监听,并且监听完会删除消息,自动应答。查看@RabbitListener注解的源码如下,

    /** <a href="http://www.cpupk.com/decompiler">Eclipse Class Decompiler</a> plugin, Copyright (c) 2017 Chen Chao. **/
    package org.springframework.amqp.rabbit.annotation;
    
    import java.lang.annotation.Documented;
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Repeatable;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListeners;
    import org.springframework.messaging.handler.annotation.MessageMapping;
    
    @Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @MessageMapping
    @Documented
    @Repeatable(RabbitListeners.class)
    public @interface RabbitListener {
        String id() default "";
    
        String containerFactory() default "";
    
        String[] queues() default {};
    
        Queue[] queuesToDeclare() default {};
    
        boolean exclusive() default false;
    
        String priority() default "";
    
        String admin() default "";
    
        QueueBinding[] bindings() default {};
    
        String group() default "";
    
        String returnExceptions() default "";
    
        String errorHandler() default "";
    
        String concurrency() default "";
    
        String autoStartup() default "";
    
        String executor() default "";
    
        String ackMode() default "";
    
        String replyPostProcessor() default "";
    }

    测试:使用@RabbitListeners注解可以监听多个队列,指定队列的应答方式为手动应答

    接口:

    package cn.qlq.rabbitmq;
    
    import org.springframework.amqp.core.Message;
    
    import com.rabbitmq.client.Channel;
    
    public interface MessageService {
    
        void receiveDirectMsg(Message message, Channel channel);
    }

    实现类:

    package cn.qlq.rabbitmq;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.annotation.RabbitListeners;
    import org.springframework.stereotype.Service;
    
    import com.rabbitmq.client.Channel;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Service
    @Slf4j
    public class MessageServiceImpl implements MessageService {
    
        /**
         * 监听器接收消息。不需要手动调用,springboot会自动监听。设置应答模式为手动
         */
        @RabbitListeners({ @RabbitListener(queues = { "bootDirectQueue" }, ackMode = "MANUAL") })
        public void receiveDirectMsg(Message message, Channel channel) {
            try {
                log.info("basicReject, 监听器接收到的消息: " + new String(message.getBody()));
                // 当消费者把消息消费成功,再手动应答RabbitMQ
                // channel.basicAck(message.getMessageProperties().getDeliveryTag(),
                // false);
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    查看日志: 一直接收消息,而且队列不会删除消息。

    2020-11-07 21:05:20.625  INFO 22044 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : basicReject, 监听器接收到的消息: bootDirect msg===123456
    2020-11-07 21:05:20.626  INFO 22044 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : basicReject, 监听器接收到的消息: bootDirect msg===123456
    2020-11-07 21:05:20.629  INFO 22044 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : basicReject, 监听器接收到的消息: bootDirect msg===123456
    2020-11-07 21:05:20.632  INFO 22044 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : basicReject, 监听器接收到的消息: bootDirect msg===123456
    2020-11-07 21:05:20.634  INFO 22044 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : basicReject, 监听器接收到的消息: bootDirect msg===123456
    。。。

    4.fanout消息类型的发送和接收

      因为是fanout消息类型的广播形式,可以用上面的bean形式来进行声明队列、绑定交换机,这种形式可以保证分布式应用一个应用多实例部署的情况下,只有一个队列,消息不会被多个应用重复消费。也可以用spring随机生成队列的形式来进行绑定生成队列。

    1.消费者:

      两个方法,模拟两个消费者,直接绑定队列和交换机。队列名称用随机名称,而且自动删除(没有消费者的时候自动删除)。

        @RabbitListeners({ @RabbitListener(bindings = {
                @QueueBinding(value = @Queue(), exchange = @Exchange(name = "fanoutExchange", type = "fanout")) }) })
        @Override
        public void receiveFanoutMsg(Message message, Channel channel) {
            log.info("receiveFanoutMsg, 监听器接收到的消息: " + new String(message.getBody()));
        }
        
        @RabbitListeners({ @RabbitListener(bindings = {
                @QueueBinding(value = @Queue(), exchange = @Exchange(name = "fanoutExchange", type = "fanout")) }) })
        @Override
        public void receiveFanoutMsg2(Message message, Channel channel) {
            log.info("receiveFanoutMsg2, 监听器接收到的消息: " + new String(message.getBody()));
        }

    启动应用后查看队列:(spring创建的默认队列。)

    2.生产者:

        @Override
        public void sendFanoutMsg(String msg) {
            // 发送fanout消息,routingKey可以不指定
            amqpTemplate.convertSendAndReceive("fanoutExchange", "", msg);
        }

    3.测试:

        @Test
        public void sendFanoutMsg() {
            for (int i= 0; i< 5; i ++) {
                messageService.sendFanoutMsg("bootFanout msg===" + i);
            }
        }

    4.查看消费者控制台结果:

    2020-11-07 21:45:49.255  INFO 20244 --- [ntContainer#3-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg, 监听器接收到的消息: bootFanout msg===0
    2020-11-07 21:45:49.259  INFO 20244 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg2, 监听器接收到的消息: bootFanout msg===0
    2020-11-07 21:45:54.042  INFO 20244 --- [ntContainer#3-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg, 监听器接收到的消息: bootFanout msg===1
    2020-11-07 21:45:54.051  INFO 20244 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg2, 监听器接收到的消息: bootFanout msg===1
    2020-11-07 21:45:59.040  INFO 20244 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg2, 监听器接收到的消息: bootFanout msg===2
    2020-11-07 21:45:59.040  INFO 20244 --- [ntContainer#3-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg, 监听器接收到的消息: bootFanout msg===2
    2020-11-07 21:46:04.061  INFO 20244 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg2, 监听器接收到的消息: bootFanout msg===3
    2020-11-07 21:46:04.064  INFO 20244 --- [ntContainer#3-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg, 监听器接收到的消息: bootFanout msg===3
    2020-11-07 21:46:09.044  INFO 20244 --- [ntContainer#3-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg, 监听器接收到的消息: bootFanout msg===4
    2020-11-07 21:46:09.054  INFO 20244 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveFanoutMsg2, 监听器接收到的消息: bootFanout msg===4

    5.Topic类型交换机的实使用

      其实各种类型交换机的消费者都一样,只需要监听队列就可以了。只是交换机类型不同,所以需要绑定的时候指定不同的routingKey。

    1.消费者

        /**
         * 这个注解声明队列与交换机并且进行绑定
         */
        @RabbitListeners({ @RabbitListener(bindings = { @QueueBinding(value = @Queue("topic01"), key = {
                "aa" }, exchange = @Exchange(name = "topicExchange", type = "topic")) }) })
        public void receiveTopicMsg(Message message, Channel channel) {
            log.info("receiveTopicMsg, 监听器接收到的消息: " + new String(message.getBody()));
        }
    
        @RabbitListeners({ @RabbitListener(bindings = { @QueueBinding(value = @Queue("topic02"), key = {
                "aa.*" }, exchange = @Exchange(name = "topicExchange", type = "topic")) }) })
        public void receiveTopicMsg2(Message message, Channel channel) {
            log.info("receiveTopicMsg2, 监听器接收到的消息: " + new String(message.getBody()));
        }
    
        @RabbitListeners({ @RabbitListener(bindings = { @QueueBinding(value = @Queue("topic03"), key = {
                "aa.#" }, exchange = @Exchange(name = "topicExchange", type = "topic")) }) })
        public void receiveTopicMsg3(Message message, Channel channel) {
            log.info("receiveTopicMsg3, 监听器接收到的消息: " + new String(message.getBody()));
        }

      声明了3个队列。topic01通过routingkey为"aa"的与topicExchange绑定;topic02通过routingkey为"aa.*"的与topicExchange绑定;topic03通过routingkey为"aa.#"的与topicExchange绑定。

    2.生产者:

        @Override
        public void sendTopicMsg(String msg, String routingKey) {
            amqpTemplate.convertSendAndReceive("topicExchange", routingKey, msg);
        }

    3.测试类:

        @Test
        public void sendTopicMsg() {
            messageService.sendTopicMsg("bootTopic msg=== aa", "aa");
            messageService.sendTopicMsg("bootTopic msg=== aa.bb", "aa.bb");
            messageService.sendTopicMsg("bootTopic msg=== aa.bb.cc", "aa.bb.cc");
        }

    4.查看消费者控制台:

    2020-11-07 22:35:27.569  INFO 22764 --- [ntContainer#1-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveTopicMsg, 监听器接收到的消息: bootTopic msg=== aa
    2020-11-07 22:35:27.574  INFO 22764 --- [ntContainer#2-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveTopicMsg3, 监听器接收到的消息: bootTopic msg=== aa
    2020-11-07 22:35:32.547  INFO 22764 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveTopicMsg2, 监听器接收到的消息: bootTopic msg=== aa.bb
    2020-11-07 22:35:37.531  INFO 22764 --- [ntContainer#2-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveTopicMsg3, 监听器接收到的消息: bootTopic msg=== aa.bb.cc

    6.direct类型的交换机实现延迟队列

      简单的实现设置队列中消息的生存时间是1分钟,超时后自动转换路由器进入死信队列。

    1.配置类:

        /**** S 实现延迟队列 **/
        public static final String ORDER_DIRECT_EXCHANGE = "order.exchange";
        public static final String ORDER_DIRECT_ROUTING_KEY = "order.routingKey";
        public static final String ORDER_DIRECT_QUEUE = "order.queue";
    
        public static final String ORDER_DIRECT_EXCHANGE_DEAD = "order.dead.exchange";
        public static final String ORDER_DIRECT_QUEUE_DEAD = "order.dead.queue";
    
        @Bean
        public DirectExchange orderExchange() {
            System.out.println("==============orderExchange");
            return new DirectExchange(ORDER_DIRECT_EXCHANGE);
        }
    
        @Bean
        public DirectExchange orderDeadExchange() {
            return new DirectExchange(ORDER_DIRECT_EXCHANGE_DEAD);
        }
    
        @Bean
        public Queue orderQueue() {
            Queue queue = new Queue(ORDER_DIRECT_QUEUE);
            // 设置消息生存时间内60s
            Map<String, Object> arguments = queue.getArguments();
            arguments.put("x-message-ttl", 60000);
            // 声明DLX名称(死信队列名称)
            arguments.put("x-dead-letter-exchange", ORDER_DIRECT_EXCHANGE_DEAD);
    
            return queue;
        }
    
        @Bean
        public Queue orderDeadQueue() {
            Queue queue = new Queue(ORDER_DIRECT_QUEUE_DEAD);
            return queue;
        }
    
        @Bean
        public Binding orderQueueExchangeBinding(DirectExchange orderExchange, Queue orderQueue) {
            return BindingBuilder.bind(orderQueue).to(orderExchange).with(ORDER_DIRECT_ROUTING_KEY);
        }
    
        @Bean
        public Binding orderDeadQueueExchangeBinding(DirectExchange orderDeadExchange, Queue orderDeadQueue) {
            return BindingBuilder.bind(orderDeadQueue).to(orderDeadExchange).with(ORDER_DIRECT_ROUTING_KEY);
        }
        /**** E 实现延迟队列 **/

    2.消费者:

        /**
         * 监听器接收消息。不需要手动调用,springboot会自动监听
         */
        @RabbitListener(queues = { "order.dead.queue" })
        public void receiveExpiredOrderMsg(Message message, Channel channel) {
            log.info("receiveExpiredOrderMsg, 监听器接收到的消息: " + new String(message.getBody()));
        }

    3.生产者:

        @Override
        public void sendOrderMsg(String msg) {
            amqpTemplate.convertSendAndReceive("order.exchange", "order.routingKey", msg);
        }

    4.测试类:

        @Test
        public void sendOrderMsgTest() {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            for (int i = 0; i < 5; i++) {
                messageService.sendOrderMsg("order " + i + ", time: " + simpleDateFormat.format(new Date()));
            }
        }

    5.结果(可以看到接到消息的时候和订单生成时间相差1min) 

    2020-11-07 23:06:18.042  INFO 23200 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveExpiredOrderMsg, 监听器接收到的消息: order 0, time: 2020-11-07 23:05:17
    2020-11-07 23:06:22.981  INFO 23200 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveExpiredOrderMsg, 监听器接收到的消息: order 1, time: 2020-11-07 23:05:22
    2020-11-07 23:06:27.963  INFO 23200 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveExpiredOrderMsg, 监听器接收到的消息: order 2, time: 2020-11-07 23:05:27
    2020-11-07 23:06:32.962  INFO 23200 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveExpiredOrderMsg, 监听器接收到的消息: order 3, time: 2020-11-07 23:05:32
    2020-11-07 23:06:37.966  INFO 23200 --- [ntContainer#0-1] cn.qlq.rabbitmq.MessageServiceImpl       : receiveExpiredOrderMsg, 监听器接收到的消息: order 4, time: 2020-11-07 23:05:37

    补充:一般生产者和发送者不在一个服务中,所以在MQ的声明中一般是建议消费者服务和生产者服务都声明交换机、队列、以及binding,防止报错。但是如果消息接收者只接收消息的话,也可以在消息接收者端只声明队列;交换机、队列、绑定等在生产者声明,如下:

    (1)生产者端配置: 声明交换机、队列、绑定

    package cn.qz.cloud.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    @Configuration
    public class RabbitConfig {
    
        /**
         * 配置一个directExchange
         *
         * @return
         */
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange("bootDirectExchange");
        }
    
        /**
         * 配置一个队列
         *
         * @return
         */
        @Bean
        public Queue directQueue() {
            return new Queue("bootDirectQueue");
        }
    
        /**
         * 建立一个绑定:队列和交换机绑定
         *
         * @param directExchange 交换机,上面的bean,用于自动注入
         * @param directQueue    队列,上面的队列,自动注入
         * @return
         */
        @Bean
        public Binding directBinding(DirectExchange directExchange, Queue directQueue) {
            return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRoutingKey");
        }
    }

    (2)消费者端配置:声明队列

    package cn.qz.cloud.config;
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    @Configuration
    public class RabbitConfig {
    
        /**
         * 配置一个队列
         *
         * @return
         */
        @Bean
        public Queue directQueue() {
            return new Queue("bootDirectQueue");
        }
    }

      这样无论是先启动生产者服务还是消费者服务都不会报错。

    补充:SimpleMessageListenerContainer 和 MQMessageListener消息监听器的用法

    SimpleMessageListenerContainer:简单消息监听容器。这个类有很多设置:监听队列(多个队列)、自动启动、自动声明功能、设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等、设置消费者数量、最大最小数量、批量消费、设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数等。

    如下:

    (1) 声明监听器:  实现ChannelAwareMessageListener接口

    package cn.qz.cloud.mq;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    
    public class MyChannelAwareMessageListener implements ChannelAwareMessageListener {
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("---------->>消息为:" + msg);
    
            // 最后要应答或拒绝消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    //        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }

    (2) 配置类中声明

    package cn.qz.cloud.config;
    
    import cn.qz.cloud.mq.MyChannelAwareMessageListener;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitConfig {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 配置一个队列
         *
         * @return
         */
        @Bean
        public Queue directQueue() {
            return new Queue("bootDirectQueue");
        }
    
        @Bean
        public SimpleMessageListenerContainer myChannelAwareMessageListenerContainer(ConnectionFactory connectionFactory) {
            SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
            // 添加队列监听
            messageListenerContainer.setQueues(directQueue());
            // 设置监听数据
            messageListenerContainer.setConcurrentConsumers(1);
            // 设置最大监听数据数量
            messageListenerContainer.setMaxConcurrentConsumers(5);
            //设置是否重回队列
            messageListenerContainer.setDefaultRequeueRejected(false);
            //手动确认消息
            messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            // 设置消息监听器
            messageListenerContainer.setMessageListener(new MyChannelAwareMessageListener());
            return messageListenerContainer;
        }
    }

      启动服务即可实现消息的监听。

    补充:关于rabbittemplate的消息转换机器

      默认使用的消息转换器是org.springframework.amqp.support.converter.SimpleMessageConverter。 这个抓换器发送消息时,发送的信息必须实现Serializable接口,就是一个普通的序列化。可以使用Jackson2JsonMessageConverter。

    1. 默认的消息转换器

    (1)存放一条消息,内容如下:

            Map<String, Object> map = new HashMap<>();
            map.put("name", "张三");

    (2)控制台查看消息如下:

     (3) 消费者接收:

            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("---------->>消息为:" + msg);

    结果:

    ---------->>消息为:�� sr java.util.HashMap���`� F 
    loadFactorI     thresholdxp?@           t namet 张三x

    (4) 正确处理,反序列一下:

            Map<String, Object> msg = (Map<String, Object>) SerializationUtils.deserialize(message.getBody());
            System.out.println("---------->>消息为:" + msg);

    结果:

    ---------->>消息为:{name=张三}

    2. 使用Jackson2JsonMessageConverter--实际就是转为JSON,在作为普通的JSON字符串序列化到mq中

    (1) 设置RebbitTemplate的消息转换器

    package cn.qz.cloud.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.PostConstruct;
    
    @Configuration
    public class RabbitConfig {
    
        /**
         * 配置一个directExchange
         *
         * @return
         */
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange("bootDirectExchange");
        }
    
        /**
         * 配置一个队列
         *
         * @return
         */
        @Bean
        public Queue directQueue() {
            return new Queue("bootDirectQueue");
        }
    
        /**
         * 建立一个绑定:队列和交换机绑定
         *
         * @param directExchange 交换机,上面的bean,用于自动注入
         * @param directQueue    队列,上面的队列,自动注入
         * @return
         */
        @Bean
        public Binding directBinding(DirectExchange directExchange, Queue directQueue) {
            return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRoutingKey");
        }
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void changeRabbitConverter() {
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        }
    }

    (2)  发送消息同上

    (3) 控制台查看

     (4) 消费者端: 

            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("---------->>消息为:" + msg);

    结果:

    ---------->>消息为:{"name":"张三"}

    (5) 也可以将读取到的消息转为JSON处理

            String msg = new String(message.getBody(), "UTF-8");
            HashMap hashMap = JSONObject.parseObject(msg, HashMap.class);
            System.out.println(hashMap);

      按理说消息接收者端应该是不用给 messageListenerContainer 设置消息转换器的,也看到代码有给消息的监听器容器设置消息转换器。。。。

    补充 :也可以在发送消息的时候手动转换消息,如下:

    package cn.qz.cloud.service;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.SerializerMessageConverter;
    import org.springframework.amqp.support.converter.SimpleMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.util.Map;
    @Service
    public class MessageServiceImpl implements MessageService {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        @Override
        public void sendDirectMsg(Map<String, Object> msg) {
            // 第一种发送方式
            // MessageProperties property = null;
            // Message message = new Message(msg.getBytes(), property );
            // amqpTemplate.send(message );
    
            // 第二种:
    //        msg.put("origin", "system");
    //        amqpTemplate.convertSendAndReceive("bootDirectExchange", "bootDirectRoutingKey", msg);
    
            SimpleMessageConverter simpleMessageConverter = new SimpleMessageConverter();
            msg.put("origin", "SimpleMessageConverter");
            Message message = simpleMessageConverter.toMessage(msg, null);
            System.out.println("SimpleMessageConverter");
            System.out.println(message);
            amqpTemplate.convertSendAndReceive("bootDirectExchange", "bootDirectRoutingKey", message);
    
            SerializerMessageConverter converter2 = new SerializerMessageConverter();
            msg.put("origin", "SerializerMessageConverter");
            Message message1 = converter2.toMessage(msg, null);
            System.out.println("SerializerMessageConverter");
            System.out.println(message1);
            amqpTemplate.convertSendAndReceive("bootDirectExchange", "bootDirectRoutingKey", message1);
    
            Jackson2JsonMessageConverter converter3 = new Jackson2JsonMessageConverter();
            msg.put("origin", "Jackson2JsonMessageConverter");
            Message message2 = converter3.toMessage(msg, null);
            System.out.println("Jackson2JsonMessageConverter");
            System.out.println(message2);
            amqpTemplate.convertSendAndReceive("bootDirectExchange", "bootDirectRoutingKey", message2);
        }
    
    }

    查看源码当是Message及其子类不进行转换。 RabbitTemplate:

        public void convertAndSend(String exchange, String routingKey, Object object) throws AmqpException {
            this.convertAndSend(exchange, routingKey, object, (CorrelationData)null);
        }
        public void convertAndSend(String exchange, String routingKey, Object object, CorrelationData correlationData) throws AmqpException {
            this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
        }
        protected Message convertMessageIfNecessary(Object object) {
            return object instanceof Message ? (Message)object : this.getRequiredMessageConverter().toMessage(object, new MessageProperties());
        }
        private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
            MessageConverter converter = this.getMessageConverter();
            if (converter == null) {
                throw new AmqpIllegalStateException("No 'messageConverter' specified. Check configuration of RabbitTemplate.");
            } else {
                return converter;
            }
        }    

    控制台输出:

    SimpleMessageConverter
    (Body:'[B@5b4d25e7(byte[238])' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=238, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])
    SerializerMessageConverter
    (Body:'[B@64355120(byte[242])' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=242, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])
    Jackson2JsonMessageConverter
    (Body:'{"origin":"Jackson2JsonMessageConverter","name":"张三","user":{"username":"zs","address":"北京"}}' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __KeyTypeId__=java.lang.Object, __TypeId__=java.util.HashMap}, contentType=application/json, contentEncoding=UTF-8, contentLength=101, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])

    查看MQ如下

     

     

     

    补充:消息接收者也可以指定转换器进行转换 

    package cn.qz.cloud.mq;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    
    import java.util.HashMap;
    import java.util.Map;
    public class MyChannelAwareMessageListener implements ChannelAwareMessageListener {
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
            Map map = (HashMap) messageConverter.fromMessage(message);
            System.out.println(map);
    
    //        Map<String, Object> msg = (Map<String, Object>) SerializationUtils.deserialize(message.getBody());
    //        String msg = new String(message.getBody(), "UTF-8");
    //        HashMap hashMap = JSONObject.parseObject(msg, HashMap.class);
    
            // 最后要应答或拒绝消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    //        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }

    补充:RabbitTemplate 提供了消息发送前的前置处理器,MessagePostProcessor, 用于在进行发送消息操作之前进行操作 

    接口如下:

    package org.springframework.amqp.core;
    
    import org.springframework.amqp.AmqpException;
    
    public interface MessagePostProcessor {
        Message postProcessMessage(Message var1) throws AmqpException;
    }

    常见场景在发送消息的时候我们希望将sleuth 生成的traceId信息和spanId信息存到message的head中

    package xxx;
    
    import lombok.extern.slf4j.Slf4j;
    import org.slf4j.MDC;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.cloud.sleuth.Span;
    import org.springframework.cloud.sleuth.Tracer;
    
    @Slf4j
    public class MqMessageTraceidProcessor implements MessagePostProcessor {
    
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            Tracer tracer = SpringContextHolder.getBean(Tracer.class);
            if (tracer == null) {
                log.debug("IoC container not exists tracer, not handle!");
                return message;
            }
    
            // 获取到当前的Span
            Span currentSpan = tracer.getCurrentSpan();
            if (currentSpan != null) {
                MessageProperties messageProperties = message.getMessageProperties();
                messageProperties.setHeader(Span.SPAN_ID_NAME, MDC.get(Span.SPAN_ID_NAME));
                messageProperties.setHeader(Span.SPAN_EXPORT_NAME, MDC.get(Span.SPAN_EXPORT_NAME));
                messageProperties.setHeader(Span.TRACE_ID_NAME, MDC.get(Span.TRACE_ID_NAME));
            }
            return message;
        }
    }

    注入到Spring以及设置给rabbitTemplate

    package xxx;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.PostConstruct;
    
    @Configuration
    @Slf4j
    public class MqMessageTraceConfiguration {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        @Bean
        public MqMessageTraceidProcessor mqMessageTraceidProcessor() {
            return new MqMessageTraceidProcessor();
        }
    
        @PostConstruct
        public void registerProcessor() {
            log.info("registerMessagePostProcessor start*********************");
            rabbitTemplate.setBeforePublishPostProcessors(mqMessageTraceidProcessor());
        }
    }

    这里需要注意如果设置多个Processor,必须在一个地方设置,设置是覆盖效果,如下:

    org.springframework.amqp.rabbit.core.RabbitTemplate#setBeforePublishPostProcessors

        public void setBeforePublishPostProcessors(MessagePostProcessor... beforePublishPostProcessors) {
            Assert.notNull(beforePublishPostProcessors, "'beforePublishPostProcessors' cannot be null");
            Assert.noNullElements(beforePublishPostProcessors, "'beforePublishPostProcessors' cannot have null elements");
            this.beforePublishPostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(beforePublishPostProcessors));
        }

    查看其发送前的处理逻辑如下: org.springframework.amqp.rabbit.core.RabbitTemplate#doSend

        protected void doSend(Channel channel, String exchange, String routingKey, Message message, boolean mandatory, CorrelationData correlationData) throws Exception {
            if (exchange == null) {
                exchange = this.exchange;
            }
    
            if (routingKey == null) {
                routingKey = this.routingKey;
            }
    
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Publishing message on exchange [" + exchange + "], routingKey = [" + routingKey + "]");
            }
    
            Message messageToUse = message;
            MessageProperties messageProperties = message.getMessageProperties();
            if (mandatory) {
                messageProperties.getHeaders().put("spring_listener_return_correlation", this.uuid);
            }
    
            MessagePostProcessor processor;
            if (this.beforePublishPostProcessors != null) {
                for(Iterator var9 = this.beforePublishPostProcessors.iterator(); var9.hasNext(); messageToUse = processor instanceof CorrelationAwareMessagePostProcessor ? ((CorrelationAwareMessagePostProcessor)processor).postProcessMessage(messageToUse, correlationData) : processor.postProcessMessage(messageToUse)) {
                    processor = (MessagePostProcessor)var9.next();
                }
            }
    
            this.setupConfirm(channel, messageToUse, correlationData);
            if (this.userIdExpression != null && messageProperties.getUserId() == null) {
                String userId = (String)this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class);
                if (userId != null) {
                    messageProperties.setUserId(userId);
                }
            }
    
            BasicProperties convertedMessageProperties = this.messagePropertiesConverter.fromMessageProperties(messageProperties, this.encoding);
            channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, messageToUse.getBody());
            if (this.isChannelLocallyTransacted(channel)) {
                RabbitUtils.commitIfNecessary(channel);
            }
    
        }
    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    重新整理 .net core 实践篇————配置系统之盟约[五]
    重新整理 .net core 实践篇————依赖注入应用之援军[四]
    重新整理 .net core 实践篇————依赖注入应用之生命法则[三]
    重新整理 .net core 实践篇————依赖注入应用[二]
    重新整理 .net core 实践篇————配置应用[一]
    spring cloud 学习笔记 客户端(本地)均衡负载(三)
    Leetcode之插入区间
    Leetcode之两棵二叉搜索树中的所有元素
    Leetcode之二叉树的层序遍历
    LeetCode之验证二叉搜索树
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/14566946.html
Copyright © 2011-2022 走看看