源码地址:https://github.com/hutuchong518/RabbitmqStudy
需求: spring boot 整合 rabbitmq rpc功能, 需要将 请求和响应 这两个队列 分别放在不同的MQ服务器上,以提高单个MQ服务器的吞吐量和性能。
MQ服务器1:
IP:192.168.179.128
对列:hello1
MQ服务器2:
IP:172.16.16.218
对列:hello2
这里实现的关键 是 创建队列 到 指定 MQ服务器中, 网上一些文章 都是 一把轮 没有区分,在实施上有问题的其实,这里通过实践并解决,以供参考。
下面是代码:
1 package com.mq.util; 2 3 4 import com.rabbitmq.client.AMQP; 5 import com.rabbitmq.client.Channel; 6 import org.springframework.amqp.core.Queue; 7 import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; 8 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 9 import org.springframework.amqp.rabbit.connection.Connection; 10 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 11 import org.springframework.amqp.rabbit.core.RabbitTemplate; 12 import org.springframework.beans.factory.annotation.Qualifier; 13 import org.springframework.beans.factory.annotation.Value; 14 import org.springframework.beans.factory.config.ConfigurableBeanFactory; 15 import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; 16 import org.springframework.context.annotation.Bean; 17 import org.springframework.context.annotation.Configuration; 18 import org.springframework.context.annotation.Primary; 19 import org.springframework.context.annotation.Scope; 20 21 import java.io.IOException; 22 23 24 @Configuration 25 public class RabbitConfig { 26 27 @Bean(name = "firstConnectionFactory") 28 @Primary 29 public ConnectionFactory firstConnectionFactory( 30 @Value("${spring.rabbitmq.first.host}") String host, 31 @Value("${spring.rabbitmq.first.port}") int port, 32 @Value("${spring.rabbitmq.first.username}") String username, 33 @Value("${spring.rabbitmq.first.password}") String password 34 ) { 35 CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 36 connectionFactory.setHost(host); 37 connectionFactory.setPort(port); 38 connectionFactory.setUsername(username); 39 connectionFactory.setPassword(password); 40 return connectionFactory; 41 } 42 43 @Bean(name = "secondConnectionFactory") 44 public ConnectionFactory secondConnectionFactory( 45 @Value("${spring.rabbitmq.second.host}") String host, 46 @Value("${spring.rabbitmq.second.port}") int port, 47 @Value("${spring.rabbitmq.second.username}") String username, 48 @Value("${spring.rabbitmq.second.password}") String password 49 ) { 50 CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 51 connectionFactory.setHost(host); 52 connectionFactory.setPort(port); 53 connectionFactory.setUsername(username); 54 connectionFactory.setPassword(password); 55 return connectionFactory; 56 } 57 58 @Bean(name = "firstRabbitTemplate") 59 //@Primary //貌似没用,移除 60 @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)//必须是prototype类型 61 public RabbitTemplate firstRabbitTemplate( 62 @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory 63 ) { 64 RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory); 65 return firstRabbitTemplate; 66 } 67 68 @Bean(name = "secondRabbitTemplate") 69 public RabbitTemplate secondRabbitTemplate( 70 @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory 71 ) { 72 RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory); 73 return secondRabbitTemplate; 74 } 75 76 @Bean(name = "firstFactory") 77 public SimpleRabbitListenerContainerFactory firstFactory( 78 SimpleRabbitListenerContainerFactoryConfigurer configurer, 79 @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory 80 ) { 81 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 82 configurer.configure(factory, connectionFactory); 83 return factory; 84 } 85 86 @Bean(name = "secondFactory") 87 public SimpleRabbitListenerContainerFactory secondFactory( 88 SimpleRabbitListenerContainerFactoryConfigurer configurer, 89 @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory 90 ) { 91 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 92 configurer.configure(factory, connectionFactory); 93 return factory; 94 } 95 96 @Bean 97 public void runFirstQueue(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) { 98 System.out.println("configuration firstQueue ........................"); 99 //return new Queue("hello1"); 100 Connection connection = connectionFactory.createConnection(); 101 Channel channel = connection.createChannel(false); 102 try { 103 channel.queueDeclare("hello1", false, false, false, null); 104 channel.queueBind("hello1", "exchange1", "hello.#"); 105 } catch (Exception e) { 106 e.printStackTrace(); 107 } 108 } 109 110 @Bean 111 public void runSecondQueue(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { 112 System.out.println("configuration secondQueue ........................"); 113 //return new Queue("hello2"); 114 Connection connection = connectionFactory.createConnection(); 115 Channel channel = connection.createChannel(false); 116 try { 117 channel.queueDeclare("hello2", false, false, false, null); 118 channel.queueBind("hello2", "exchange2", "hello.#"); 119 } catch (Exception e) { 120 e.printStackTrace(); 121 } 122 } 123 124 125 //下面2个对列创建方式 测试后发现不是 针对指定mq 服务器创建,只会在第一个服务器创建 126 /* 127 @Bean 128 public Queue firstQueue() { 129 System.out.println("configuration firstQueue ........................"); 130 return new Queue("hello1"); 131 } 132 133 @Bean 134 public Object secondQueue() { 135 System.out.println("configuration secondQueue ........................"); 136 return new Queue("hello2"); 137 } 138 */ 139 }
参考:https://www.cnblogs.com/hutuchong/p/7443252.html