zoukankan      html  css  js  c++  java
  • SpringBoot整合RabbitMQ

    SpringBoot整合RabbitMQ

    一、引入相关依赖

    <dependencies>
        <!--amqp-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    
        <!--test-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    

    二、配置RabbitMQ

    首先应当确保你已安装了RabbitMQ,如果你没有安装,请参考:Docker 安装 RabbitMq

    查看RabbitMQ自动配置类RabbitAutoConfiguration:

    @EnableConfigurationProperties(RabbitProperties.class)
    @Import(RabbitAnnotationDrivenConfiguration.class)
    public class RabbitAutoConfiguration {
    

    其中@EnableConfigurationProperties(RabbitProperties.class)是RabbitMQ的相关属性配置。

    点进去RabbitProperties.class

    @ConfigurationProperties(prefix = "spring.rabbitmq")
    public class RabbitProperties {
    
    	private static final int DEFAULT_PORT = 5672;
    
    	private static final int DEFAULT_PORT_SECURE = 5671;
    
    	/**
    	 * RabbitMQ host. Ignored if an address is set.
    	 */
    	private String host = "localhost";
    
    	/**
    	 * RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is
    	 * enabled.
    	 */
    	private Integer port;
    
    	/**
    	 * Login user to authenticate to the broker.
    	 */
    	private String username = "guest";
    
    	/**
    	 * Login to authenticate against the broker.
    	 */
    	private String password = "guest";
    

    我们可以通过spring.rabbitmq,在application.yml文件中配置相关的属性,比如host、port、username、password。

    在application.yml配置RabbitMQ:

    spring:
      #rabbitmq的相关配置
      rabbitmq:
        host: 192.168.204.131
        port: 5672
        username: guest
        password: guest
    

    继续查看RabbitAutoConfiguration:

    @Bean
    @ConditionalOnSingleCandidate(ConnectionFactory.class)
    @ConditionalOnMissingBean(RabbitOperations.class)
    public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate();
        configurer.configure(template, connectionFactory);
        return template;
    }
    
    @Bean
    @ConditionalOnSingleCandidate(ConnectionFactory.class)
    @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
    @ConditionalOnMissingBean
    public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
    

    发现其向容器中注入了两个组件:RabbitTemplate和AmqpAdmin,这两个组件有什么作用呢?

    RabbitTemplate:可以发送消息、接收消息。

    AmqpAdmin操作Exchange、Queue、Binding等,比如创建、删除、解绑。

    1、测试RabbitTemplate

    首先在容器中通过自动注入的方式获取RabbitTemplate,然后在测试类中测试:

    @SpringBootTest
    class SpringBoot02AmqpApplicationTests {
        @Autowired
        RabbitTemplate rabbitTemplate;
    }
    

    (1)使用RabbitTemplate测试发送消息

    • send(String exchange, String routingKey, Message message):需要自己定义一个Message,比较麻烦。
    • convertAndSend(String exchange, String routingKey, Object object):只需要传入一个Object,自动序列化发送给rabbitmq,object默认被当成消息体。
    //单播(点对点)发送。
    @Test
    public void testRabbitTemplate() {
        HashMap<String, Object> map = new HashMap<>();
        map.put("name", "zhangsan");
        map.put("age", 22);
        rabbitTemplate.convertAndSend("exchange.direct","aiguigu.news",map);
    }
    

    这种方式在接收端接收的数据是这样式的:

    rO0ABXNyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAx3CAAAABAAAAADdAAEbmFtZXQA CHpoYW5nc2FudAAEbGlzdHNyABpqYXZhLnV0aWwuQXJyYXlzJEFycmF5TGlzdNmkPL7NiAbSAgABWwABYXQAE1tMamF2YS9sYW5nL09iamVjdDt4cHVyABdb TGphdmEuaW8uU2VyaWFsaXphYmxlO67QCaxT1+1JAgAAeHAAAAADdAAEaGFoYXNyABFqYXZhLmxhbmcuSW50ZWdlchLioKT3gYc4AgABSQAFdmFsdWV4cgAQ amF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAKac3IAEWphdmEubGFuZy5Cb29sZWFuzSBygNWc+u4CAAFaAAV2YWx1ZXhwAXQAA2FnZXNxAH4ACwAA
    ABZ4

    这是由于默认使用的是jdk的序列化方式,那么如何将消息转化为json格式的数据发送出去?接下来自定义使用Jackson2JsonMessageConverter的消息转化器。

    自定义MessageConverter配置:

    @Configuration
    @EnableRabbit   //开启基于注解的rabbitmq
    public class MyAMQPConfig {
    
        /**
         * 设置自定义的 MessageConverter
         * 使用Jackson2JsonMessageConverter消息转换器
         * @return
         */
        @Bean
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }
    

    然后再次测试,在接收端接收的数据如下:

    {"name":"zhangsan","list":["haha",666,true],"age":22}

    再发送个对象试试:

    Book book = new Book("西游记", "吴承恩");
    rabbitTemplate.convertAndSend("exchange.direct", "aiguigu.news", book);
    

    使用自定义的消息转化器之后,接收端数据:

    {"bookName":"西游记","author":"吴承恩"}

    (2)使用RabbitTemplate测试接收消息

    • receiveAndConvert(String queueName):接收队列名称为queueName的消息。
    //接收数据
    @Test
    public void testReceive() {
        Object o = rabbitTemplate.receiveAndConvert("aiguigu.news");
        System.out.println(o.getClass());
        System.out.println(o);
    
        // 接收map
        // class java.util.HashMap
        // {name=zhangsan, list=[haha, 666, true], age=22}
    
        // 接收book对象
        // class com.example.bean.Book
        // Book{bookName='西游记', author='吴承恩'}
    }
    

    2、测试AmqpAdmin

    • removeBinding(Binding binding):解除某个bingding

      @Test
      public void testRemoveBinding() {
          //解除某个bingding
          amqpAdmin.removeBinding(new Binding("declaredQueue", Binding.DestinationType.QUEUE,"amqpAdmin_direct.exchange", "amqp.haha", null));
      }
      
    • deleteExchange(String s):删除指定的exchange.

      boolean deleteExchange = amqpAdmin.deleteExchange("amqpAdmin_direct.exchange");
      System.out.println(deleteExchange); //true
      
    • deleteQueue(String s):删除指定Queue

      boolean deleteQueue = amqpAdmin.deleteQueue("declaredQueue");
      System.out.println("deleteQueue:"+deleteQueue); //true
      
    • getQueueInfo(String s),获取指定队列的信息。

      @Test
      public void getQueueInformation() {
          QueueInformation queueInformation = amqpAdmin.getQueueInfo("declaredQueue");
          int consumerCount = queueInformation.getConsumerCount();
          int messageCount = queueInformation.getMessageCount();
          String name = queueInformation.getName();
          System.out.println("consumerCount:" + consumerCount);   //0
          System.out.println("messageCount:" + messageCount); //0
          System.out.println("name:" + name); //declaredQueue
      }
      
    • declareExchange(Exchange exchange):声明一个exchange.

      /**
           * 以declare开头的是创建组件。
           * declareExchange(Exchange exchange):声明一个exchange
           * Exchange是一个接口,其实现类有:
           * 1.DirectExchange
           * 2.FanoutExchange
           * 3.TopicExchange
           * 4.HeadersExchange
           * 5.CustomExchange
           */
      @Test
      public void testCreateExchange() {
          //创建一个Exchange
          amqpAdmin.declareExchange(new DirectExchange("amqpAdmin_direct.exchange"));
          System.out.println("创建完成!");
      
          //创建一个queue
          String declaredQueue = amqpAdmin.declareQueue(new Queue("declaredQueue"));
          System.out.println("declaredQueue:" + declaredQueue);   //declaredQueue
      
          //创建绑定规则
          amqpAdmin.declareBinding(new Binding("declaredQueue", Binding.DestinationType.QUEUE,
                                               "amqpAdmin_direct.exchange", "amqp.haha", null));
      } 
      

    三、监听消息队列中的内容

    使用@EnableRabbit+@RabbitListener监听消息队列中的内容。

    @EnableRabbit:表示开启基于注解的rabbitmq。

    @RabbitListener:表示监听某个队列的内容。

    @Service
    public class BookServiceImpl implements BookService {
        /**
         * 注解:@RabbitListener(queues = "aiguigu.news"),表示监听aigui.news这个队列的内容。
         *
         * @param book
         */
        @RabbitListener(queues = "aiguigu.news")
        @Override
        public void receive(Book book) {
            System.out.println("收到aiguigu.news消息:" + book);
        }
    
        /**
         * 接收消息的第二种方式:
         *
         * @param message
         */
        @RabbitListener(queues = "aiguigu")
        @Override
        public void receive(Message message) {
            //获取消息体
            byte[] body = message.getBody();
            System.out.println(body);   //[B@fe4bdc2
            //获得消息属性
            MessageProperties properties = message.getMessageProperties();
            System.out.println(properties);
            /*
            MessageProperties [headers={__TypeId__=com.example.bean.Book}, contentType=application/json,
            contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0,
            redelivered=false, receivedExchange=exchange.direct, receivedRoutingKey=aiguigu, deliveryTag=1,
            consumerTag=amq.ctag-ynkD05MwnffSCo9h7W5DGA, consumerQueue=aiguigu]
             */
        }
    }
    

    实现的效果,当给某个exchange发送消息的之后,exchange按照binding规则将消息分发给对应的队列,使用 @RabbitListener可以监听到这个队列的消息,就可以获取消息进行相应的操作。

  • 相关阅读:
    TCP协议详解-IPv4
    welcome to my cnblog
    怎样解决闭包造成的内存泄漏
    跳转路由后请求失败
    vant grid组件图片加载问题
    3次握手
    res.send()传参----Invalid status code: 1
    堆栈总结
    jQuery实现全选
    phpstudy_pro打开MySQL服务,一闪一闪的
  • 原文地址:https://www.cnblogs.com/nieaojie625/p/13779558.html
Copyright © 2011-2022 走看看