zoukankan      html  css  js  c++  java
  • SpringBoot整合RabbitMQ

    文末有源码地址

    添加依赖

    <!--amqp依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    创建所需常量类

    public class RabbitMQConstant {
    
        //简单模式
        public static final String QUEUE_EASY = "easy.queue";
        //work模式
        public static final String QUEUE_WORK = "work.queue";
        //topic模式
        public static final String QUEUE_TOPIC_FIRST = "topic.queue.first";
        public static final String QUEUE_TOPIC_SECOND = "topic.queue.second";
        //发布订阅模式
        public static final String QUEUE_FANOUT = "fanout.queue";
        public static final String QUEUE_FANOUT_SECOND = "fanout.queue.second";
    
        //路由key
        public static final String ROUTING_KEY_EASY = "routing.key.easy";
        public static final String ROUTING_KEY_WORK = "routing.key.work";
        public static final String ROUTING_KEY_TOPIC_FIRST = "routing.key.topic.first";
        public static final String ROUTING_KEY_TOPIC_SECOND = "routing.key.topic.second";
    
    
        // direct交换机
        public static final String EXCHANGE_DIRECT = "direct_exchange";
        // work交换机
        public static final String EXCHANGE_WORK = "work_exchange";
        // topic交换机
        public static final String EXCHANGE_TOPIC = "topic_exchange";
        // fanout交换机
        public static final String EXCHANGE_FANOUT = "fanout_exchange";
    
    }

    创建交换机

    @Configuration
    public class ExchangeConfig {
    
        /**
         * 交换机说明:
         * durable="true" rabbitmq重启的时候不需要创建新的交换机
         * auto-delete 表示交换机没有在使用时将被自动删除 默认是false
         * direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
         * topic交换器你采用模糊匹配路由键的原则进行转发消息到队列中
         * fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。
         */
    
        @Bean(name = RabbitMQConstant.EXCHANGE_DIRECT)
        public DirectExchange directExchange() {
            return new DirectExchange(RabbitMQConstant.EXCHANGE_DIRECT, true, false);
        }
    
        @Bean(name = RabbitMQConstant.EXCHANGE_WORK)
        public DirectExchange workExchange() {
            return new DirectExchange(RabbitMQConstant.EXCHANGE_WORK, true, false);
        }
    
        @Bean(name = RabbitMQConstant.EXCHANGE_TOPIC)
        public TopicExchange topicExchange() {
            return new TopicExchange(RabbitMQConstant.EXCHANGE_TOPIC, true, false);
        }
    
        @Bean(name = RabbitMQConstant.EXCHANGE_FANOUT)
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange(RabbitMQConstant.EXCHANGE_FANOUT, true, false);
        }
    
    }

    创建队列

    @Configuration
    public class QueueConfig {
    
        /**
         * durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
         * exclusive 表示该消息队列是否只在当前connection生效,默认是false
         * auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
         */
    
        @Bean(name = RabbitMQConstant.QUEUE_EASY)
        public Queue easyQueue() {
            return new Queue(RabbitMQConstant.QUEUE_EASY, true, false, false);
        }
    
        @Bean(name = RabbitMQConstant.QUEUE_WORK)
        public Queue workQueue() {
            return new Queue(RabbitMQConstant.QUEUE_WORK, true, false, false);
        }
    
        @Bean(name = RabbitMQConstant.QUEUE_TOPIC_FIRST)
        public Queue topicQueue() {
            return new Queue(RabbitMQConstant.QUEUE_TOPIC_FIRST, true, false, false);
        }
    
        @Bean(name = RabbitMQConstant.QUEUE_TOPIC_SECOND)
        public Queue topicQueueSecond() {
            return new Queue(RabbitMQConstant.QUEUE_TOPIC_SECOND, true, false, false);
        }
    
        @Bean(name = RabbitMQConstant.QUEUE_FANOUT)
        public Queue fanoutQueue() {
            return new Queue(RabbitMQConstant.QUEUE_FANOUT, true, false, false);
        }
    
        @Bean(name = RabbitMQConstant.QUEUE_FANOUT_SECOND)
        public Queue fanoutQueueSecond() {
            return new Queue(RabbitMQConstant.QUEUE_FANOUT_SECOND, true, false, false);
        }
    
    
    }

    绑定交换机和队列

    @Configuration
    @Slf4j
    public class RabbitMqConfig {
    
        @Resource
        private QueueConfig queueConfig;
        @Resource
        private ExchangeConfig exchangeConfig;
        /**
         * 连接工厂
         */
        @Resource
        private ConnectionFactory connectionFactory;
    
    
        /**
         * 将消息队列和交换机进行绑定,指定路由
         */
        @Bean
        public Binding bindingDirect() {
            return BindingBuilder.bind(queueConfig.easyQueue()).to(exchangeConfig.directExchange()).with(RabbitMQConstant.ROUTING_KEY_EASY);
        }
    
        @Bean
        public Binding bindingWork() {
            return BindingBuilder.bind(queueConfig.workQueue()).to(exchangeConfig.workExchange()).with(RabbitMQConstant.ROUTING_KEY_WORK);
        }
    
        @Bean
        public Binding bindingTopic() {
            return BindingBuilder.bind(queueConfig.topicQueue()).to(exchangeConfig.topicExchange()).with(RabbitMQConstant.ROUTING_KEY_TOPIC_FIRST);
        }
    
        @Bean
        public Binding bindingTopicSecond() {
            return BindingBuilder.bind(queueConfig.topicQueueSecond()).to(exchangeConfig.topicExchange()).with(RabbitMQConstant.ROUTING_KEY_TOPIC_SECOND);
        }
    
        @Bean
        public Binding bindingFanout() {
            return BindingBuilder.bind(queueConfig.fanoutQueue()).to(exchangeConfig.fanoutExchange());
        }
    
        @Bean
        public Binding bindingFanoutSecond() {
            return BindingBuilder.bind(queueConfig.fanoutQueueSecond()).to(exchangeConfig.fanoutExchange());
        }
    
        /** ======================== 定制一些处理策略 =============================*/
    
        /**
         * 定制化amqp模版
         * <p>
         * Rabbit MQ的消息确认有两种。
         * <p>
         * 一种是消息发送确认:这种是用来确认生产者将消息发送给交换机,交换机传递给队列过程中,消息是否成功投递。
         * 发送确认分两步:一是确认是否到达交换机,二是确认是否到达队列
         * <p>
         * 第二种是消费接收确认:这种是确认消费者是否成功消费了队列中的消息。
         */
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
            rabbitTemplate.setMandatory(true);
    
            /**
             * 使用该功能需要开启消息确认,yml需要配置 publisher-confirms: true
             * 通过实现ConfirmCallBack接口,用于实现消息发送到交换机Exchange后接收ack回调
             * correlationData  消息唯一标志
             * ack              确认结果
             * cause            失败原因
             */
            rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallback());
            /**
             * 使用该功能需要开启消息返回确认,yml需要配置 publisher-returns: true
             * 通过实现ReturnCallback接口,如果消息从交换机发送到对应队列失败时触发
             * message    消息主体 message
             * replyCode  消息主体 message
             * replyText  描述
             * exchange   消息使用的交换机
             * routingKey 消息使用的路由键
             */
            rabbitTemplate.setReturnCallback(new MsgSendReturnCallback());
    
    
            return rabbitTemplate;
        }
    
    }

    源码地址:

    https://gitee.com/xiaorenwu_dashije/rabbitmq_demo.git

    包含Direct模式、Work模式、Fanout模式、Topic模式

  • 相关阅读:
    Windows XP下 Android开发环境 搭建
    Android程序的入口点
    在eclipse里 新建android项目时 提示找不到proguard.cfg
    64位WIN7系统 下 搭建Android开发环境
    在eclipse里 新建android项目时 提示找不到proguard.cfg
    This Android SDK requires Android Developer Toolkit version 20.0.0 or above
    This Android SDK requires Android Developer Toolkit version 20.0.0 or above
    Android requires compiler compliance level 5.0 or 6.0. Found '1.4' instead
    Windows XP下 Android开发环境 搭建
    Android程序的入口点
  • 原文地址:https://www.cnblogs.com/java-spring/p/13645778.html
Copyright © 2011-2022 走看看