zoukankan      html  css  js  c++  java
  • springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制,延时队列的实现

    1.引入maven依赖
      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    2.在application.yml的配置:
    spring:
    rabbitmq:
    host: 106.52.82.241
    port: 5672
    username: yang
    password: Yangxiaohui227
    virtual-host: /
    publisher-confirms: true #消息发送后,如果发送成功到队列,则会回调成功信息
    publisher-returns: true #消息发送后,如果发送失败,则会返回失败信息信息
    listener: #加了2下面2个属性,消费消息的时候,就必须发送ack确认,不然消息永远还在队列中
    direct:
    acknowledge-mode: manual
    simple:
    acknowledge-mode: manual
    //为了统一管理所有的Mq消息,建一个类存储常量,消息的设计都基本会涉及(队列(queue),交换机(exchange),路由键(route)三个值)
    public class RabbitMqConstant {
    
        //下单发送消息 队列名,交换机名,路由键的配置
        public final static String SHOP_ORDER_CREATE_EXCHANGE="shop.order.create.exchange";
        public final static String SHOP_ORDER_CREATE_ROUTE="shop.order.create.route";
        public final static String SHOP_ORDER_CREATE_QUEUE="shop.order.create.queue";
    }
    
    
    package com.example.demo.mq;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    //该类是mq最重要的一个类,所有队列的创建,交换机的创建,队列和交换机的绑定都在这里实现
    @Configuration
    public class RabbitMqConfig {
        private final static Logger log = LoggerFactory.getLogger(RabbitMqConfig.class);
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
        @Autowired
        private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    
        /**
         * 单一消费者
         *
         * @return
         */
    
        @Bean(name = "singleListenerContainer")
        public SimpleRabbitListenerContainerFactory listenerContainer() {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            factory.setConcurrentConsumers(1);
            factory.setMaxConcurrentConsumers(1);
            factory.setPrefetchCount(1);
            factory.setTxSize(1);
            return factory;
        }
    
        /**
         * 多个消费者
         *
         * @return
         */
        @Bean(name = "multiListenerContainer")
        public SimpleRabbitListenerContainerFactory multiListenerContainer() {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factoryConfigurer.configure(factory, connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            factory.setConcurrentConsumers(20);
            factory.setMaxConcurrentConsumers(20);
            factory.setPrefetchCount(20);
            return factory;
        }
    
        /**
         * 模板的初始化配置
         *
         * @return
         */
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean sucess, String cause) {
                    if (sucess) {
                        log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, sucess, cause);
                    }
    
                }
            });
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    log.warn("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
                }
            });
            return rabbitTemplate;
        }
    
    
    
        //消息的创建设计三个步骤:队列的创建,交换机创建(direct类型,topic类型,fanout类型),队列和交换机的通过路由键的绑定
    
    
        //--------- 下单消息配置
        //队列
        @Bean
        public Queue shopOrderCreateQueue() {
            return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_QUEUE, true);
        }
    
        //Direct交换机(一对一关系,一个direct交换机只能绑定一个队列,当有2个相同消费者时,如项目部署2台机,只有一个消费者能消费,)
        @Bean
        DirectExchange shopOrderCreateExchange() {
            return new DirectExchange(RabbitMqConstant.SHOP_ORDER_CREATE_EXCHANGE);
        }
    
        //绑定
        @Bean
        Binding bindShopOrderCreateQueue() {
            return BindingBuilder.bind(shopOrderCreateQueue()).to(shopOrderCreateExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_ROUTE);
        }
    }
    
    
    
     
    import com.alibaba.fastjson.JSON;
    import com.example.demo.domain.ShopOrderMast;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    //专门用一个类作为消息的生产者
    @Service
    public class ShopMessagePublisher {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void sendCreateOrderMessage(ShopOrderMast orderMast){
            CorrelationData correlationData=new CorrelationData(); //该参数可以传,可以不传,不传时,correlationData的id值默认是null,消息发送成功后,在RabbitMqConfig类的rabbitTemplate类的confirm方法会接收到该值
            correlationData.setId(orderMast.getCodOrderId()); 
            String msg = JSON.toJSONString(orderMast);
            //convertAndSend该方法有非常多的重构方法,找到适合自己的业务方法就行了,这里我用的是其中一个,发送时指定exchange和route值,这样就会发到对应的队列去了
            rabbitTemplate.convertAndSend(RabbitMqConstant.SHOP_ORDER_CREATE_EXCHANGE,RabbitMqConstant.SHOP_ORDER_CREATE_ROUTE,msg,correlationData);
    
        }
    }
    
    
    
    
    
    //所有的消费都写在一个消费类中
    @Service
    public class ShopMessageComsumer {
        //监听下单消息
        @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_QUEUE)
        public void createOrderMesaageComsumer(String msg, Channel channel, Message message) {
            try {
                    //消息可以通过msg获取也可以通过message的body属性获取
                    System.out.println("开始消费了");
                    ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class);
    
    
                /**
                 * 因为我在application.yml那里配置了消息手工确认也就是传说中的ack,所以消息消费后必须发送确认给mq
                 * 很多人不理解ack(消息消费确认),以为这个确认是告诉消息发送者的,这个是错的,这个ack是告诉mq服务器,
                 * 消息已经被我消费了,你可以删除它了
                 * 如果没有发送basicAck的后果是:每次重启服务,你都会接收到该消息
                 * 如果你不想用确认机制,就去掉application.yml的acknowledge-mode: manual配置,该配置默认
                 * 是自动确认auto,去掉后,下面的channel.basicAck就不用写了
                 *
                 */
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    
                } catch (Exception e) {
                try {
                    //出现异常,告诉mq抛弃该消息
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                    e.printStackTrace();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
    
            }
    
        }
    }
    //这里我发送了一条消息,orderId我设置为555556666666,在消息发送时,存到了CorrelationData对象中,因此,发送成功后,在confirm方法可以接收到该值了
    //消息发送成功后,在控制台会看到有成功的回调信息,也就是回调了rabbitTemplate的:
    confirm(CorrelationData correlationData, boolean sucess, String cause)

     //上面测试的下单消息是direct类型消息的,现在创建一个topic消息

    //RabbitMqConstant新增topic的配置信息
    //下单topic消息:路由键的名字 星号* 代表多个字符,#号代表一个字符
        //topic交换机,发送消息时,发送到指定shop.order.create.topic.exchange和shop.order.create.topic.route中
        public final static String SHOP_ORDER_CREATE_TOPIC_EXCHANGE="shop.order.create.topic.exchange";
        public final static String SHOP_ORDER_CREATE_TOPIC_TOUTE="shop.order.create.topic.route";
    
    
        //队列1,通过shop.order.create.topic.*与交换机绑定
        public final static String SHOP_ORDER_CREATE_TOPIC_ROUTE_ONE="shop.order.create.topic.*";
        public final static String SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE="shop.order.create.topic.queue.one";
    
    
        //队列2 通过shop.order.create.topic.*与交换机绑定shop.order.create.topic.#
        public final static String SHOP_ORDER_CREATE_TOPIC_ROUTE_TWO="shop.order.create.topic.#";
        public final static String SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO="shop.order.create.topic.queue.two";


    //在RabbitMqConfig新增topic队列的基本信息
    //-------------------------下单TOPIC消息的创建
    
        //创建TOPIC交换机
        @Bean
        TopicExchange shopOrderCreateTopicExchange() {
            return new TopicExchange(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_EXCHANGE);
        }
        //---------------------------//队列1使用自己的route和交换机绑定
        //创建队列1
        @Bean
        public Queue shopOrderCreateQueueOne() {
            return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE, true);
        }
        //绑定
        @Bean
        Binding bindShopOrderCreateQueueOne() {
            return BindingBuilder.bind(shopOrderCreateQueueOne()).to(shopOrderCreateTopicExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_ROUTE_ONE);
        }
    
        //---------------------------//队列2用自己的route和交换机绑定
    
        //创建队列2
        @Bean
        public Queue shopOrderCreateQueueTWO() {
            return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO, true);
        }
    
        //绑定
        @Bean
        Binding bindShopOrderCreateQueueTWO() {
            return BindingBuilder.bind(shopOrderCreateQueueTWO()).to(shopOrderCreateTopicExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_ROUTE_TWO);
        }
    //消息的发送方新增
      //发送TOPIC消息
        public void sendCreateOrderTOPICMessage(ShopOrderMast orderMast){
            CorrelationData correlationData=new CorrelationData(); //该参数可以传,可以不传,不传时,correlationData的id值默认是null,消息发送成功后,在RabbitMqConfig类的rabbitTemplate类的confirm方法会接收到该值
            correlationData.setId(orderMast.getCodOrderId());
            String msg = JSON.toJSONString(orderMast);
            //消息发送使用公共route而不是某个队列自己的route
            rabbitTemplate.convertAndSend(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_EXCHANGE,RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_TOUTE,msg,correlationData);
    
        }
    //消息的消费方新增
    //消费者1
        @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE)
        public void createOrderMesaageComsumerOne(String msg, Channel channel, Message message) {
            try {
                    //消息可以通过msg获取也可以通过message对象的body值获取
                    System.out.println("我是消费者1");
                    ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class);
    
    
                /**
                 * 因为我在application.yml那里配置了消息手工确认也就是传说中的ack,所以消息消费后必须发送确认给mq
                 * 很多人不理解ack(消息消费确认),以为这个确认是告诉消息发送者的,这个是错的,这个ack是告诉mq服务器,
                 * 消息已经被我消费了,你可以删除它了
                 * 如果没有发送basicAck的后果是:每次重启服务,你都会接收到该消息
                 * 如果你不想用确认机制,就去掉application.yml的acknowledge-mode: manual配置,该配置默认
                 * 是自动确认auto,去掉后,下面的channel.basicAck就不用写了
                 *
                 */
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    
                } catch (Exception e) {
                try {
                    //出现异常,告诉mq抛弃该消息
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                    e.printStackTrace();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
    
            }
    
        }
        //消费者2
        @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO)
        public void createOrderMesaageComsumerTWO(String msg, Channel channel, Message message) {
            try {
                    //消息可以通过msg获取也可以通过message对象的body值获取
                    System.out.println("我是消费者2");
                    ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class);
    
    
                /**
                 * 因为我在application.yml那里配置了消息手工确认也就是传说中的ack,所以消息消费后必须发送确认给mq
                 * 很多人不理解ack(消息消费确认),以为这个确认是告诉消息发送者的,这个是错的,这个ack是告诉mq服务器,
                 * 消息已经被我消费了,你可以删除它了
                 * 如果没有发送basicAck的后果是:每次重启服务,你都会接收到该消息
                 * 如果你不想用确认机制,就去掉application.yml的acknowledge-mode: manual配置,该配置默认
                 * 是自动确认auto,去掉后,下面的channel.basicAck就不用写了
                 *
                 */
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    
                } catch (Exception e) {
                try {
                    //出现异常,告诉mq抛弃该消息
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                    e.printStackTrace();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
    
            }
    
        }

     //测试结果:

     //延时队列:将消息发送到一个队列,等过了一段时间后,该队列会将消息转发到真正的队列消费,业务场景可以用于订单定时取消

    //在RabbitMqConstant类添加如下内容
     //延时队列,消息先发到延时队列中,到时间后,再发送到真正的队列
    
        public final static String SHOP_ORDER_CREATE_DELAY_EXCHANGE="shop.order.create.delay.exchange";
        public final static String SHOP_ORDER_CREATE_DELAY_ROUTE="shop.order.create.delay.route";
        public final static String SHOP_ORDER_CREATE_DELAY_QUEUE="shop.order.create.delay.queue";
    
        //真正的队列
    
        public final static String SHOP_ORDER_CREATE_REAL_EXCHANGE="shop.order.create.real.exchange";
        public final static String SHOP_ORDER_CREATE_REAL_ROUTE="shop.order.create.real.route";
        public final static String SHOP_ORDER_CREATE_REAL_QUEUE="shop.order.create.real.queue";
    //在RabbitMqConfig加上
     //----------------------- 延时队列的配置
    
        //延时队列
        @Bean
        public Queue shopOrderCreateDelayQueue() {
            Map<String, Object> argsMap= Maps.newHashMap();
            argsMap.put("x-dead-letter-exchange",RabbitMqConstant.SHOP_ORDER_CREATE_REAL_EXCHANGE); //真正的交换机
            argsMap.put("x-dead-letter-routing-key",RabbitMqConstant.SHOP_ORDER_CREATE_REAL_ROUTE); //真正的路由键
            return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_QUEUE,true,false,false,argsMap);
    
        }
        //延时交换机
        @Bean
        DirectExchange shopOrderCreateDelayExchange() {
            return new DirectExchange(RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_EXCHANGE);
        }
    
        //延时队列绑定延时交换机
        @Bean
        Binding bindShopOrderCreateDelayQueue() {
            return BindingBuilder.bind(shopOrderCreateDelayQueue()).to(shopOrderCreateDelayExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_ROUTE);
        }
    
    
        //真正的队列配置-------------------------------------
    
    
        //真正的队列
        @Bean
        public Queue shopOrderCreateRealQueue() {
    
            return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_REAL_QUEUE,true);
    
        }
        //真正的交换机
        @Bean
        DirectExchange shopOrderCreateRealExchange() {
            return new DirectExchange(RabbitMqConstant.SHOP_ORDER_CREATE_REAL_EXCHANGE);
        }
    
        //绑定真正的交换机
        @Bean
        Binding bindShopOrderCreateRealQueue() {
            return BindingBuilder.bind(shopOrderCreateRealQueue()).to(shopOrderCreateRealExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_REAL_ROUTE);
        }
    //在消息发送类(ShopMessagePublisher)新增
     //发送延时消息
        public void sendCreateOrderDelayMessage(ShopOrderMast orderMast){
            CorrelationData correlationData=new CorrelationData(); //该参数可以传,可以不传,不传时,correlationData的id值默认是null,消息发送成功后,在RabbitMqConfig类的rabbitTemplate类的confirm方法会接收到该值
            correlationData.setId(orderMast.getCodOrderId());
            String msg = JSON.toJSONString(orderMast);
            // convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData)
            rabbitTemplate.convertAndSend(RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_EXCHANGE, RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_ROUTE, msg, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    MessageProperties messageProperties = message.getMessageProperties();
                    messageProperties.setExpiration("60000");//单位是毫秒
                    return message;
                }
            }, correlationData);
    
        }
    //在消费类(ShopMessageComsumer) 新增
     //延迟队列中真正队列监听
        @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_REAL_QUEUE)
        public void createOrderRealMesaageComsumer(String msg, Channel channel, Message message) {
            try {
    
                    System.out.println("这是真正的队列,在监听延时队列发送的消息");
                    ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class);
    
    
    
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    
                } catch (Exception e) {
                try {
                    //出现异常,告诉mq抛弃该消息
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                    e.printStackTrace();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
    
            }
    
        }

    //注意,如果同时使用了延时队列的queue去接收,那么消息会被延迟队列的消费者消费,而不是被真正的queue消费

    //如果在延迟队列消费时,加了下面这个队列,上面那个真正的消费者就接收不到消息了
        @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_QUEUE)
        public void createOrderDelayMesaageComsumer(String msg, Channel channel, Message message) {
            try {
                    System.out.println("测试延迟队列自己能否接收");
                    ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class);
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    
                } catch (Exception e) {
                try {
                    //出现异常,告诉mq抛弃该消息
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                    e.printStackTrace();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
    
            }
    
        }

    //补充:对于direct和topic交换机,如果部署多台相同queue的消费者,消息也只会消费一次,通过轮询的方式进行负债均衡

     //如何在rabbitMq管理页面查看没有还没被消费的消息信息:

     通过界面发送Mq消息,场景,如日志发现某条消息没有发送,可以在这里发送回去:

  • 相关阅读:
    【caffe】create_mnist.sh在windows下的解决方案
    【caffe】loss function、cost function和error
    【caffe】未定义函数或变量caffe_
    【caffe】无法找到gpu/mxGPUArray.h: No such file or directory
    maven常见问题处理(3-1)修改maven 默认使用的 jdk 版本
    SpringCloud是什么?
    SpringCloud的服务网关zuul
    SpringCloud的EurekaClient : 客户端应用访问注册的微服务(有断路器场景)
    SpringBoot 概念和起步
    YML(1)什么是 YML
  • 原文地址:https://www.cnblogs.com/yangxiaohui227/p/11331086.html
Copyright © 2011-2022 走看看