两个项目。分别是生产者和消费者项目
。首先引入依赖。两边pom都一样
第一次练习,启动生产者后,再启动消费者,一直报找不到 队列的声明。
后排查发现是 需要现在生产者这边浏览器访问一次生产消息的方法,以让交换机和队列在rabbit服务器生成。
因为交换机的生成属于懒加载。不发送消息是不生成交换机的。所以开始直接启动消费者会报错 找不到队列的声明
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.kf</groupId> <artifactId>consumer-springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies> </project>
生产者这边写入Fanout的配置
package com.kf.conf; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; /** * Fanout配置 * @author kf * */ @Component public class FanoutConf { /** * fanout没有路由键 * 1:定义队列名,交换机名 * 2:注册队列,交换机 * 3:绑定队列和交换机 */ private String QUEUE_SMS = "QUEUE_SMS"; private String QUEUE_EMAIL = "QUEUE_EMAIL"; private String FANOUTEXCHANGE = "FANOUTEXCHANGE"; //注册短信队列 @Bean public Queue queueSms(){ return new Queue(QUEUE_SMS); } //注册邮件队列 @Bean public Queue queueEmail(){ return new Queue(QUEUE_EMAIL); } //注册交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(FANOUTEXCHANGE); } //绑定队列和交换机 /** * * @param queueSms 此参数用的是上方定义队列的方法名,注册Bean时,默认方法名为该Bean的id * @param fanoutExchange 此参数用的是上方定义队列的方法名,注册Bean时,默认方法名为该Bean的id * @return */ @Bean public Binding bindgFanoutExchangeSms(Queue queueSms, FanoutExchange fanoutExchange){ return BindingBuilder.bind(queueSms).to(fanoutExchange); } @Bean public Binding bindgFanoutExchangeEmail(Queue queueEmail, FanoutExchange fanoutExchange){ return BindingBuilder.bind(queueEmail).to(fanoutExchange); } }
创建消息发送的templet:
package com.kf.producer; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class FanoutProducer { @Autowired private AmqpTemplate amqpTemplate; public void sendMes(String queueName){ System.out.println("队列名:"+queueName); String msg = "此处是fanout的消息体"; amqpTemplate.convertAndSend(queueName, msg); } }
创建controller:
package com.kf.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.kf.producer.FanoutProducer; @RestController public class ProducerController { @Autowired private FanoutProducer fanoutProducer; @RequestMapping("/sendFanout") public String sendFanoutMessage(String queueName){ fanoutProducer.sendMes(queueName); return "success"; } }
创建启动:
package com.kf; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ProducerMainClass { public static void main(String[] args) { SpringApplication.run(ProducerMainClass.class, args); } }
引入rabbit的链接配置
spring: rabbitmq: ####ip host: 127.0.0.1 ####端口 port: 5672 ####用户名 username: guest ####密码 password: guest ### 虚拟主机 virtual-host: /kf
消费者工程:
package com.kf.consumer.sms; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "QUEUE_SMS") public class SmsConsumer { //监听队列消息注解 @RabbitHandler public void accept(String msg){ System.out.println("短信消费者接收消息:"+msg); } }
package com.kf.consumer.email; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "QUEUE_EMAIL") public class EmailConsumer { //监听队列消息注解 @RabbitHandler public void accept(String msg){ System.out.println("邮件消费者接收到:"+msg); } }
创建启动:
package com.kf; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ConsumerMainClass { public static void main(String[] args) { SpringApplication.run(ConsumerMainClass.class, args); } }
创建链接。注意启动端口号:
spring: rabbitmq: ####ip host: 127.0.0.1 ####端口 port: 5672 ####用户名 username: guest ####密码 password: guest ### 虚拟主机 virtual-host: /kf server: port: 8081