zoukankan      html  css  js  c++  java
  • Spring Boot整合RabbitMQ

    pom

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <version>2.0.5.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <version>2.0.5.RELEASE</version>
            </dependency>
    
            <!--spirngboot集成rabbitmq-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
                <version>2.0.5.RELEASE</version>
            </dependency>
    View Code

    yml

    server:
      port: 2222
    spring:
      application:
        name: rabbitqm-test
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        virtualHost: /
        listener:
          simple:
            acknowledge-mode: manual #手动签收
            prefetch: 1  #能者多劳

    主类

    @SpringBootApplication
    public class Application {
        public static void main(String[] args) {
            SpringApplication.run(Application.class);
        }
    }

    这里我们需要把连接,通道,交换机交给spring管理写一个配置类

    package cn.jiedada.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    //MQ配置
    @Configuration
    public class MQConfig {
    
        public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
        public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
    
        /**
         * 交换机配置
         * ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置
         * @return the exchange
         */
        @Bean(EXCHANGE_TOPICS_INFORM)   //bean的名字,Spring容器中的那个bean名字
        public Exchange exchangeTopicsInform() {
            //durable(true)持久化,消息队列重启后交换机仍然存在
            //MQ中的交换机的名字
            return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
        }
    
        //声明队列
        @Bean(QUEUE_INFORM_SMS)
        public Queue queueInformSms() {
            Queue queue = new Queue(QUEUE_INFORM_SMS,true);
            return queue;
        }
    
        //声明队列
        @Bean(QUEUE_INFORM_EMAIL)
        public Queue queueInformEmail() {
            Queue queue = new Queue(QUEUE_INFORM_EMAIL,true);
            return queue;
        }
    
        /**
         * channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");
         * 绑定队列到交换机 .
         *
         * @param queue    the queue
         * @param exchange the exchange
         * @return the binding
         */
        @Bean
        public Binding bindingQueueInformSms(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                             @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with("ruidada.#").noargs();
        }
    
        @Bean
        public Binding bindingQueueInformEmail(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                               @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with("jiedada.#").noargs();
        }
    
    
    
    }
    View Code

    提供者这里我们用spring boot测试

    package cn.jiedada.sender;
    
    import cn.jiedada.Application;
    import cn.jiedada.config.MQConfig;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    //消息发送
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = Application.class)
    public class SenderTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSend() throws InterruptedException {
            rabbitTemplate.convertAndSend(
                    MQConfig.EXCHANGE_TOPICS_INFORM,    //交换机
                    "jiedada.add",  //队列的routingkey
                    "收到消息了。。。。。。。。。。"
            );
            Thread.sleep(1000);
        }
    
    }

    消费者

    @Component
    public class Consumer1 {
    
        //消费SMS
        @RabbitListener(queues = MQConfig.QUEUE_INFORM_SMS)
        public void revSMS(String msgStr, Message message, Channel channel) throws IOException {
            System.out.println("收短短信.....................");
            System.out.println("消息内容:"+msgStr);
            System.out.println("getBody:"+new String(message.getBody()));
            System.out.println("getReceivedExchange:"+message.getMessageProperties().getReceivedExchange());
            System.out.println("getReceivedRoutingKey:"+message.getMessageProperties().getReceivedRoutingKey());
            //交货的ID,消息ID
            System.out.println("getDeliveryTag:"+message.getMessageProperties().getDeliveryTag());
    
            //签收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    
        }
    
        //消费Email
    
        //消费SMS
        @RabbitListener(queues = MQConfig.QUEUE_INFORM_EMAIL)
        public void revEmail(String msgStr, Message message, Channel channel) throws IOException {
            System.out.println("收短邮件.....................");
            System.out.println("消息内容:"+msgStr);
            System.out.println("getBody:"+new String(message.getBody()));
            System.out.println("getReceivedExchange:"+message.getMessageProperties().getReceivedExchange());
            System.out.println("getReceivedRoutingKey:"+message.getMessageProperties().getReceivedRoutingKey());
            //交货的ID,消息ID
            System.out.println("getDeliveryTag:"+message.getMessageProperties().getDeliveryTag());
    
            //签收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    }
  • 相关阅读:
    Python常用函数
    MySQL常用操作
    Python与JAVA的异同
    token
    用户cookie和会话session、sessionID的关系
    Jenkins应用
    Python3 logging模块
    python 多线程threading模块
    引用的声明周期结束时,并不会调用析构函数,只有本体的声明周期结束时,才会调用析构函数
    行为像指针的类的对象每次作为参数传入函数或者传出函数时都要小心
  • 原文地址:https://www.cnblogs.com/xiaoruirui/p/13683224.html
Copyright © 2011-2022 走看看