1, 生产者服务:
maven:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies>
定义队列,交换机,并且绑定队列和交换机,同时注入到spring 中
package com.aiyuesheng.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; //采用fanout 交换机 @Component public class FanoutConfig { // 邮件队列 private String FANOUT_EMAIL_QUEUE = "rabbitmq_simple_queue_email_one"; // 短信队列 private String FANOUT_TEXT_QUEUE = "rabbitmq_simple_queue_text_one"; // 交换机 private String EXCHANGE_NAME = "rabbitmq_simple_queue_exchange_one"; //定义邮件队列 @Bean public Queue getEmailQueue(){ return new Queue(FANOUT_EMAIL_QUEUE); } //定义短信队列 @Bean public Queue getTextQueue(){ return new Queue(FANOUT_TEXT_QUEUE); } //定义fanout交换机 @Bean public FanoutExchange getExchange(){ return new FanoutExchange(EXCHANGE_NAME); } // 邮件队列绑定fanout 交换机 // 交换机和队列进行绑定的时候,这个队列参数名称一定要和获取的队列的方法名一致,交换机队名称一定要和获取交换机的方法名一致 @Bean public Binding emailBindToExchange(Queue getEmailQueue, FanoutExchange getExchange){ return BindingBuilder.bind(getEmailQueue).to(getExchange); } // 短信队列绑定fanout 交换机 @Bean public Binding textBindToExchange(Queue getTextQueue, FanoutExchange getExchange){ return BindingBuilder.bind(getTextQueue).to(getExchange); } }
conifg 的上一层就是producer:
@Component public class FanoutProducer { @Autowired private AmqpTemplate amqpTemplate; public void sendMessage(String routingKey){ String message = "Hello World"; System.out.println(message); amqpTemplate.convertAndSend(routingKey, message); } }
service 发送消息服务:
@RestController public class ProducerService { @Autowired private FanoutProducer fanoutProducer; @RequestMapping("/sendMessage") public String sendMessage(String routingKey) { fanoutProducer.sendMessage(routingKey); return "success"; } }
application.yml:
spring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: chris ####密码 password: chris ### 地址 virtual-host: /chris_vh
2,消费者服务:
邮件消费者:
package com.aiyuesheng.consumer; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; // 连接fanout 交换机 的邮件消费者 // @RabbitListener 监听队列 @Component @RabbitListener(queues = "rabbitmq_simple_queue_email_one") public class FanoutEamilConsumer { @RabbitHandler public void receive(String msg) throws Exception { System.out.println("邮件消费者获取消息:" + msg); } }
信息消费者:
// 连接fanout 交换机的信息消费者 @Component @RabbitListener(queues="rabbitmq_simple_queue_text_one") public class FanoutTextConsumer { @RabbitHandler public void receive(String msg) throws Exception { System.out.println("信息消费者获取消息:" + msg); } }
application.yml
server:
port: 8081
spring:
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: chris
####密码
password: chris
### 地址
virtual-host: /chris_vh