zoukankan      html  css  js  c++  java
  • Rabbitmq与spring整合之重要组件介绍——AMQP声明式配置&RabbitTemplate组件

    上一节是使用rabbitAdmin的管理组件进行声明队列,交换器,绑定等操作,本节则是采用AMQP声明式配置来声明这些东西。AMQP声明主要是通过@Bean注解进行的。

    配置:

     1 package com.zxy.demo.config;
     2 
     3 import org.springframework.amqp.core.Binding;
     4 import org.springframework.amqp.core.BindingBuilder;
     5 import org.springframework.amqp.core.DirectExchange;
     6 import org.springframework.amqp.core.FanoutExchange;
     7 import org.springframework.amqp.core.Queue;
     8 import org.springframework.amqp.core.TopicExchange;
     9 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    10 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    11 import org.springframework.amqp.rabbit.core.RabbitAdmin;
    12 import org.springframework.amqp.rabbit.core.RabbitTemplate;
    13 import org.springframework.context.annotation.Bean;
    14 import org.springframework.context.annotation.ComponentScan;
    15 import org.springframework.context.annotation.Configuration;
    16 
    17 
    18 @Configuration
    19 @ComponentScan(basePackages= {"com.zxy.demo.*"})
    20 public class RabbitmqCofing {
    21 //    注入连接工厂,spring的配置,springboot可以配置在属性文件中
    22     @Bean
    23     public ConnectionFactory connectionFactory() {
    24         CachingConnectionFactory connection = new CachingConnectionFactory();
    25         connection.setAddresses("192.168.10.110:5672");
    26         connection.setUsername("guest");
    27         connection.setPassword("guest");
    28         connection.setVirtualHost("/");
    29         return connection;
    30     }
    31 //    配置RabbitAdmin来管理rabbit
    32     @Bean
    33     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
    34         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    35         //用RabbitAdmin一定要配置这个,spring加载的是后就会加载这个类================特别重要
    36         rabbitAdmin.setAutoStartup(true);
    37         return rabbitAdmin;
    38     }
    39 //===========================以上结合测试rabbitAdmin部分===========================================================    
    40     
    41     
    42     
    43 //===========================以下为AMQP配置队列绑定等,spring容器加载时候就能够注入===========================================================    
    44 //    采用AMQP定义队列、交换器、绑定等
    45     @Bean(name="direct.queue01")
    46     public Queue queue001() {
    47         return new Queue("direct.queue01", true, false, false);
    48     }
    49     @Bean(name="test.direct01")
    50     public DirectExchange directExchange() {
    51         return new DirectExchange("test.direct01", true, false, null);
    52     }
    53     @Bean
    54     public Binding bind001() {
    55         return BindingBuilder.bind(queue001()).to(directExchange()).with("mq.#");
    56     }
    57     @Bean(name="topic.queue01")
    58     public Queue queue002() {
    59         return new Queue("topic.queue01", true, false, false);
    60     }
    61     @Bean(name="test.topic01")
    62     public TopicExchange topicExchange() {
    63         return new TopicExchange("test.topic01", true, false, null);
    64     }
    65     @Bean
    66     public Binding bind002() {
    67         return BindingBuilder.bind(queue002()).to(topicExchange()).with("mq.topic");
    68     }
    69     @Bean(name="fanout.queue01")
    70     public Queue queue003() {
    71         return new Queue("fanout.queue", true, false, false);
    72     }
    73     @Bean(name="test.fanout01")
    74     public FanoutExchange fanoutExchange() {
    75         return new FanoutExchange("test.fanout01", true, false, null);
    76     }
    77     @Bean
    78     public Binding bind003() {
    79         return BindingBuilder.bind(queue003()).to(fanoutExchange());
    80     }
    81     
    82     
    83     
    84 //===========================注入rabbitTemplate组件===========================================================    
    85 //    跟spring整合注入改模板,跟springboot整合的话只需要在配置文件中配置即可
    86     @Bean 
    87     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    88         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    89         return rabbitTemplate;
    90     }
    91 }

    单元测试:

     1 package com.zxy.demo;
     2 
     3 import org.junit.Test;
     4 import org.junit.runner.RunWith;
     5 import org.springframework.amqp.AmqpException;
     6 import org.springframework.amqp.core.Binding;
     7 import org.springframework.amqp.core.BindingBuilder;
     8 import org.springframework.amqp.core.DirectExchange;
     9 import org.springframework.amqp.core.FanoutExchange;
    10 import org.springframework.amqp.core.Message;
    11 import org.springframework.amqp.core.MessageDeliveryMode;
    12 import org.springframework.amqp.core.MessagePostProcessor;
    13 import org.springframework.amqp.core.MessageProperties;
    14 import org.springframework.amqp.core.Queue;
    15 import org.springframework.amqp.core.TopicExchange;
    16 import org.springframework.amqp.rabbit.core.RabbitAdmin;
    17 import org.springframework.amqp.rabbit.core.RabbitTemplate;
    18 import org.springframework.beans.factory.annotation.Autowired;
    19 import org.springframework.boot.test.context.SpringBootTest;
    20 import org.springframework.test.context.junit4.SpringRunner;
    21 
    22 
    23 @RunWith(SpringRunner.class)
    24 @SpringBootTest
    25 public class ForwardApplicationTests {
    26 
    27     @Test
    28     public void contextLoads() {
    29     }
    30     @Autowired
    31     private RabbitAdmin rabbitAdmin;
    32     @Test
    33     public void testAdmin() {
    34 //        切记命名不能重复复
    35         rabbitAdmin.declareQueue(new Queue("test.direct.queue"));
    36         rabbitAdmin.declareExchange(new DirectExchange("test.direct"));
    37         rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "mq.direct", null));
    38 
    39         rabbitAdmin.declareQueue(new Queue("test.topic.queue", true,false, false));
    40         rabbitAdmin.declareExchange(new TopicExchange("test.topic", true,false));
    41 //        如果注释掉上面两句实现声明,直接进行下面的绑定竟然不行,该版本amqp-client采用的是5.1.2,将上面两行代码放开,则运行成功
    42         rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.topic.queue", true,false, false))
    43                 .to(new TopicExchange("test.topic", true,false)).with("mq.topic"));
    44 //        经过实验确实是需要先声明,才可以运行通过
    45         rabbitAdmin.declareQueue(new Queue("test.fanout.queue",true,false,false,null));
    46         rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", true, false, null));
    47         rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.fanout.queue", true, false,false))
    48                 .to(new FanoutExchange("test.fanout", true, false)));
    49         rabbitAdmin.purgeQueue("test.direct.queue", false);//清空队列消息
    50     }
    51     @Autowired
    52     private RabbitTemplate rabbitTemplate;
    53     @Test
    54     public void testTemplate() {
    55         String body = "hello,test rabbitTemplage!";
    56         MessageProperties properties = new MessageProperties();
    57         properties.setContentEncoding("utf-8");
    58         properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    59         properties.setPriority(1);
    60         properties.setHeader("nihao:", "yes!");
    61         Message message = new Message(body.getBytes(), properties);
    62 //        MessagePostProcessor参数是在消息发送过程中动态修改消息属性的类
    63         rabbitTemplate.convertAndSend("test.direct01", "mq.direct", message,new MessagePostProcessor() {
    64             
    65             @Override
    66             public Message postProcessMessage(Message message) throws AmqpException {
    67 //                修改属性
    68                 message.getMessageProperties().setHeader("nihao:", "no");
    69 //                添加属性
    70                 message.getMessageProperties().setHeader("新添加属性:", "添加属性1");
    71                 return message;
    72             }
    73         });
    74         
    75         
    76 //        发送objcet类型
    77         rabbitTemplate.convertAndSend("test.topic01", "mq.topic", "send object type message!!!");
    78         System.out.println("发送完毕!!!");
    79     }
    80 
    81 }
  • 相关阅读:
    Chrome开发者工具中Elements(元素)断点的用途
    最简单的SAP云平台开发教程
    Java实现 LeetCode 495 提莫攻击
    Java实现 LeetCode 494 目标和
    Java实现 LeetCode 494 目标和
    Java实现 LeetCode 494 目标和
    Java实现 LeetCode 493 翻转对
    Java实现 LeetCode 493 翻转对
    Java实现 LeetCode 493 翻转对
    Java实现 LeetCode 492 构造矩形
  • 原文地址:https://www.cnblogs.com/xiaoyao-001/p/9609900.html
Copyright © 2011-2022 走看看