zoukankan      html  css  js  c++  java
  • spring boot项目11:RabbitMQ-基础使用

    JAVA 8

    Spring Boot 2.5.3

    RabbitMQ 3.6.10, Erlang 20.2.2(单机)

    ---

    授人以渔:

    1、Spring Boot Reference Documentation

    This document is also available as Multi-page HTML, Single page HTML and PDF.

    有PDF版本哦,下载下来!

    2、Spring AMQP

    有PDF版本哦,下载下来!

    当前最新版 RabbitMQ(官网):

    RabbitMQ 3.9.4 release at 17 Aug 2021

    目录

    试验1:发送消息到 queue1队列

    试验2:使用 自定义交换机 发送消息 到 queue1队列

    试验3:多个Listener接收DirectExchange转发到队列queue1的消息

    试验4:使用FanoutExchange

    试验5:使用TopicExchange

    试验6:接收消息手动确认

    参考文档

    消息队列(MQ)是一种重要的中间件,可以实现 1)进程、应用间通信;2)异步任务处理,可以很好地实现 程序的低耦合。

    AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计(百度百科)。

    AMQP是一种 线路级协议(wire-level protocol)——遵循这个协议的实现 之间是可以无缝互操作的。

    RabbitMQ是使用 Erlang语言实现了 AMQP协议。

    Spring Boot中可以方便地集成RabbitMQ,依赖 spring-boot-starter-amqp 包即可:

    本文介绍 S.B. 集成 RabbitMQ 及其基础使用场景。

    补充:

    RabbitMQ启动后,默认使用下面端口:

    5672 服务器端口,S.B.中配置 常用
    15672 Web服务器端口,可以对RabbitMQ服务器进行管理 常用
    25672 用于节点间和CLI工具通信(Erlang分发服务器端口),并从动态范围分配 不常用
    4369 EPMD默认端口 不常用

    注,请看 参考文档(原来有这么多)

    依赖 spring-boot-starter-amqp 包后,Spring容器中会存在一下相关Bean:

    rabbitConnectionFactory
    rabbitTemplateConfigurer
    
    rabbitTemplate # 用过,发送消息
    amqpAdmin # 尚未用过
    
    rabbitMessagingTemplate
    
    simpleRabbitListenerContainerFactoryConfigurer
    rabbitListenerContainerFactory
    directRabbitListenerContainerFactoryConfigurer
    
    RabbitAutoConfiguration
    RabbitProperties
    
    RabbitAnnotationDrivenConfiguration
    RabbitAnnotationDrivenConfiguration$EnableRabbitConfiguration
    
    RabbitAutoConfiguration
    RabbitAutoConfiguration$RabbitConnectionFactoryCreator
    RabbitAutoConfiguration$RabbitTemplateConfiguration
    RabbitAutoConfiguration$MessagingTemplateConfiguration

    rabbitTemplate 是其中 (自己)最常用(重要)的,用来发送消息。

    public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
    		implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
    		ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
    }

    其下的 convertAndSend(...) 是用来发送消息的:

    注,convertSendAndReceive(...) 是 和 Basic RPC pattern 相关,还没用过(不懂)。

    还有一些 send(...) 函数,也是用来发送消息(和 上面的 convertAndSend(...) 有什么区别呢?):

    RabbitMQ关键概念:

    消息队列(Queue)

    交换机(Exchange)

    绑定(Binding)

    路由键值(routingKey)

    频道(channel)

    ---

    消息发送到 交换机,交换机 根据 routingKey 转发到不同队列,再由 队列监听者(Spring 中 @RabbitListener 定义) 接收&处理;

    其中,队列需要绑定到交换机,其中,每个队列都会绑定到 默认交换机——Direct类型

    默认交换机:AMQP default

    Queue、Exchange、Binding都继承了 AbstractDeclarable 类:

    public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
    }
    
    public class Queue extends AbstractDeclarable implements Cloneable {
    }
    
    public class Binding extends AbstractDeclarable {
    }
    
    public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
    }
    

    AbstractExchange 下 有几个子类——Exchange,使用RabbitMQ时会经常用到:

    RabbitMQ支持多种消息传输模式,点对点、广播、发布-订阅 等都支持,这需要使用不同的交换机来实现

    下面是一章经典的图:

    试验1:发送消息到 queue1队列

    首先,定义队列queue1,定义队列监听器(@RabbitListener),然后,调用rabbitTemplate.convertAndSend 发送消息。

    配置文件:

    #
    # RabbitMQ
    # mylinux 为 虚拟机,hosts中配置(单机模式)
    spring.rabbitmq.host=mylinux
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin

    注,上面的配置 可以考虑试验 spring.rabbitmq.addresses 替代——一个搞定。

    spring.rabbitmq.addresses=amqp://admin:admin@mylinux:5672
    试验1源码
    # 文件RabbitConfig.java
    @Configuration
    public class RabbitConfig {
    	
    	public final static String QUEUE_1 = "queue1";
    	
    	@Bean
    	public Queue queue1() {
    		Queue queue1 = new Queue(QUEUE_1);
    		return queue1;
    	}
    	
    }
    
    # 文件RabbitmqHelloApplication.java (主类,部分)
    @Component
    @Order(1)
    @Slf4j
    class TestRunner1 implements CommandLineRunner {
    
    	@Autowired
    	private RabbitTemplate rabbitTemplate;
    	
    	@Override
    	public void run(String... args) throws Exception {
    		/**
    		 * 每隔2秒发送1条消息
    		 * 看看 是否到 queue1、是否能监听到
    		 */
    		for (int i=0; i<10; i++) {
    			log.info("发送msg-{}", i);
    			
    			// 默认交换机 AMQP default,默认的 routingKey="",,怎么接收并处理?TODO
    //			rabbitTemplate.convertAndSend("Simple msg-" + i + ", @" + new Date());
    			
    			// 默认交换机,routingKey=queue1,转发到 queue队列
    			rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_1, "Simple msg-" + i + ", @" + new Date());
    			TimeUnit.SECONDS.sleep(1L);
    		}
    	}
    	
    }
    
    @Component
    @Slf4j
    class RabbitListener1 {
    	
    	/**
    	 * 接收队列queue1 的消息,默认绑定到 默认交换机(AMQP default)
    	 * @author ben
    	 * @date 2021-08-23 22:31:58 CST
    	 * @param content
    	 */
    	@RabbitListener(queues = {RabbitConfig.QUEUE_1})
    	public void processMessage(String content) {
    		log.info("接收消息:content={}", content);
    	}
    	
    }

    试验1测试结果:

    日志1
    2021-08-23 23:40:14.380  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-0
    2021-08-23 23:40:14.401  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-0, @Mon Aug 23 23:40:14 CST 2021
    2021-08-23 23:40:15.389  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-1
    2021-08-23 23:40:15.393  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-1, @Mon Aug 23 23:40:15 CST 2021
    2021-08-23 23:40:16.390  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-2
    2021-08-23 23:40:16.392  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-2, @Mon Aug 23 23:40:16 CST 2021
    2021-08-23 23:40:17.390  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-3
    2021-08-23 23:40:17.394  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-3, @Mon Aug 23 23:40:17 CST 2021
    2021-08-23 23:40:18.391  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-4
    2021-08-23 23:40:18.401  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-4, @Mon Aug 23 23:40:18 CST 2021
    2021-08-23 23:40:19.392  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-5
    2021-08-23 23:40:19.397  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-5, @Mon Aug 23 23:40:19 CST 2021
    2021-08-23 23:40:20.394  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-6
    2021-08-23 23:40:20.401  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-6, @Mon Aug 23 23:40:20 CST 2021
    2021-08-23 23:40:21.395  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-7
    2021-08-23 23:40:21.399  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-7, @Mon Aug 23 23:40:21 CST 2021
    2021-08-23 23:40:22.397  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-8
    2021-08-23 23:40:22.401  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-8, @Mon Aug 23 23:40:22 CST 2021
    2021-08-23 23:40:23.398  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-9
    2021-08-23 23:40:23.407  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-9, @Mon Aug 23 23:40:23 CST 2021

    本试验试验的是 默认交换机——AMQP default。

    在开始发送时,使用了1个参数的 convertAndSend,结果,发送成功,但监听器没反应。原来,此时使用了 默认的routingKey="",队列没有收到。

    改为 上面的 2个参数的convertAndSend 后,监听器才收到了发送的消息——routingKey为 队列名。

    RabbitConfig 里面定义的队列是必须的,否则,启动会发生异常:

    o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: queue1
    o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3
    org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[queue1]

    小结:默认交换机

    1、发送使用默认routingKey,无法进入队列,对于的Listener监听不到;

    2、发送使用 队列名 作为 routingKey,Listener接收成功;

    试验2:使用 自定义交换机 发送消息 到 queue1队列

    在 试验1 的基础上,定义 DirectExchange类型directExchange1——名称ex1、Binding类型binding1——绑定queue1和ex1,发送时,发送到 交换机ex1。

    源码改动:

    试验2源码
    # RabbitConfig.java
    @Configuration
    public class RabbitConfig {
    	
    	public final static String QUEUE_1 = "queue1";
    	public final static String EXCHANGE_1 = "ex1";
    	
    	@Bean
    	public Queue queue1() {
    		Queue queue1 = new Queue(QUEUE_1);
    		return queue1;
    	}
    	
    	@Bean
    	public DirectExchange directExchange1() {
    		return new DirectExchange(EXCHANGE_1);
    	}
    	
    	@Bean
    	public Binding binding1(Queue queue1, DirectExchange directExchange1) {
    		// 绑定队列queue1到交换机ex1,使用队列名作为 routingKey
            return BindingBuilder
    				.bind(queue1)
    				.to(directExchange1)
    				.withQueueName();
    	}
    	
    }
    
    # RabbitmqHelloApplication.java
    # 消息多了 default 前缀
    rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_1, "default-Simple msg-" + i + ", @" + new Date());
    # TestRunner1#run 中增加下面的发送语句
    // ex1交换机=>queue1
    rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_1, RabbitConfig.QUEUE_1, "ex1-Simple msg-" + i + ", @" + new Date());

    注,保留了 发送消息到 默认交换机。

    测试结果:

    日志2
    2021-08-23 23:56:05.294  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-0
    2021-08-23 23:56:05.316  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-0, @Mon Aug 23 23:56:05 CST 2021
    2021-08-23 23:56:05.317  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-0, @Mon Aug 23 23:56:05 CST 2021
    2021-08-23 23:56:06.303  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-1
    2021-08-23 23:56:06.313  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-1, @Mon Aug 23 23:56:06 CST 2021
    2021-08-23 23:56:06.315  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-1, @Mon Aug 23 23:56:06 CST 2021
    2021-08-23 23:56:07.306  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-2
    2021-08-23 23:56:07.311  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-2, @Mon Aug 23 23:56:07 CST 2021
    2021-08-23 23:56:07.311  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-2, @Mon Aug 23 23:56:07 CST 2021
    2021-08-23 23:56:08.307  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-3
    2021-08-23 23:56:08.316  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-3, @Mon Aug 23 23:56:08 CST 2021
    2021-08-23 23:56:08.317  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-3, @Mon Aug 23 23:56:08 CST 2021
    2021-08-23 23:56:09.310  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-4
    2021-08-23 23:56:09.315  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-4, @Mon Aug 23 23:56:09 CST 2021
    2021-08-23 23:56:09.316  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-4, @Mon Aug 23 23:56:09 CST 2021
    2021-08-23 23:56:10.311  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-5
    2021-08-23 23:56:10.318  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-5, @Mon Aug 23 23:56:10 CST 2021
    2021-08-23 23:56:10.320  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-5, @Mon Aug 23 23:56:10 CST 2021
    2021-08-23 23:56:11.312  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-6
    2021-08-23 23:56:11.315  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-6, @Mon Aug 23 23:56:11 CST 2021
    2021-08-23 23:56:11.315  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-6, @Mon Aug 23 23:56:11 CST 2021
    2021-08-23 23:56:12.312  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-7
    2021-08-23 23:56:12.314  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-7, @Mon Aug 23 23:56:12 CST 2021
    2021-08-23 23:56:12.315  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-7, @Mon Aug 23 23:56:12 CST 2021
    2021-08-23 23:56:13.313  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-8
    2021-08-23 23:56:13.318  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-8, @Mon Aug 23 23:56:13 CST 2021
    2021-08-23 23:56:13.318  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-8, @Mon Aug 23 23:56:13 CST 2021
    2021-08-23 23:56:14.315  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-9
    2021-08-23 23:56:14.319  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-9, @Mon Aug 23 23:56:14 CST 2021
    2021-08-23 23:56:14.324  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-9, @Mon Aug 23 23:56:14 CST 2021
    

    使用队列名作为 routingKey,来自 默认交换机、自定义交换机ext1 的消息都收到了。

    Web管理页面变化:

    监听器收到了 2条消息!

    根据 Web管理页面得到的信息:队列 无法和 默认交换机,因此,如果按照上面的方式 发送消息,那么,默认交换机会转发到 queue1 队列。

    Default exchange
    The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. 
    It is not possible to explicitly bind to, or unbind from the default exchange. 
    It also cannot be deleted.

    试验3:多个Listener接收DirectExchange转发到队列queue1的消息

    注,取消上面的 TestRunner1 的消息发送,改为调用接口。

    测试源码:

    # 调用接口 /try1/send 发送消息
    @RestController
    @RequestMapping(value="/try1")
    @Slf4j
    public class Try1Controller {
    
    	@Autowired
    	private RabbitTemplate rabbitTemplate;
    	
    	/**
    	 * 使用 交换机ext1 发送消息到 队列queue1
    	 * @author ben
    	 * @date 2021-08-27 21:24:19 CST
    	 * @return
    	 */
    	@GetMapping(value="/send")
    	public boolean send() {
    		IntStream.range(0, 10).forEach(i->{
    			// ex1交换机=>queue1
    			rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_1, RabbitConfig.QUEUE_1, 
    					"Try1Controller-ex1-Simple msg-" + i + ", @" + new Date());
    			
    			try {
    				TimeUnit.SECONDS.sleep(1L);
    			} catch (InterruptedException e) {
    				log.info("异常:{}", e.getMessage());
    			}
    		});
    		
    		return true;
    	}
    	
    }
    
    # 两个监听器
    @Component
    @Slf4j
    class RabbitListener1 {
    	
    	/**
    	 * Listener1
    	 * @author ben
    	 * @date 2021-08-23 22:31:58 CST
    	 * @param content
    	 */
    	@RabbitListener(queues = {RabbitConfig.QUEUE_1})
    	public void processMessage1(String content) {
    		log.info("接收消息:processMessage1-content={}", content);
    	}
    	
    	/**
    	 * Listener2
    	 * @author ben
    	 * @date 2021-08-27 21:26:30 CST
    	 * @param content
    	 */
    	@RabbitListener(queues = {RabbitConfig.QUEUE_1})
    	public void processMessage2(String content) {
    		log.info("接收消息:processMessage2-content={}", content);
    	}
    	
    }

    一个队列,两个监听器。来自博客园

    执行结果:

    调用接口 /try1/send,共发送10条消息,两个监听器都收到了,分别5条——实现了均衡。

    试验4:使用FanoutExchange

    一个Fanout交换机,两个队列,绑定。

    配置:

    @Configuration
    public class RabbitConfig2 {
    	
    	public final static String QUEUE_21 = "queue21";
    	public final static String QUEUE_22 = "queue22";
    	public final static String FAN_EXCHANGE_2 = "fanout2";
    	
    	@Bean
    	public Queue queue21() {
    		Queue queue1 = new Queue(QUEUE_21);
    		return queue1;
    	}
    	
    	@Bean
    	public Queue queue22() {
    		Queue queue1 = new Queue(QUEUE_22);
    		return queue1;
    	}
    	
    	@Bean
    	public FanoutExchange FanoutExchange() {
    		// 最简单的构造函数
    		return new FanoutExchange(FAN_EXCHANGE_2);
    	}
    	
    	@Bean
    	public Binding binding21(Queue queue21, FanoutExchange FanoutExchange) {
    		return BindingBuilder
    				.bind(queue21)
    				// to后面 没有 withXXX 函数,to的结果即返回 Binding对象
    				.to(FanoutExchange);
    	}
    	
    	@Bean
    	public Binding binding22(Queue queue22, FanoutExchange FanoutExchange) {
    		return BindingBuilder
    				.bind(queue22)
    				// to后面 没有 withXXX 函数,to的结果即返回 Binding对象
    				.to(FanoutExchange);
    	}
    	
    }

    发送接口:

    	/**
    	 * 发送消息
    	 * @author ben
    	 * @date 2021-08-27 21:24:19 CST
    	 * @return
    	 */
    	@GetMapping(value="/send")
    	public boolean send() {
    		IntStream.range(0, 10).forEach(i->{
    			rabbitTemplate.convertAndSend(RabbitConfig2.FAN_EXCHANGE_2, RabbitConfig2.QUEUE_21, 
    					"Try2Controller-Simple msg-" + i + ", @" + new Date());
    			
    			try {
    				TimeUnit.SECONDS.sleep(1L);
    			} catch (InterruptedException e) {
    				log.info("异常:{}", e.getMessage());
    			}
    		});
    		
    		return true;
    	}

    监听器:来自博客园

    @Component
    @Slf4j
    public class Try2Listener {
    
    	/**
    	 * Listener1
    	 * @author ben
    	 * @date 2021-08-23 22:31:58 CST
    	 * @param content
    	 */
    	@RabbitListener(queues = {RabbitConfig2.QUEUE_21})
    	public void processMessage1(String content) {
    		log.info("接收消息:队列{}-processMessage1-content={}", 
    				RabbitConfig2.QUEUE_21, content);
    	}
    	
    	/**
    	 * Listener2
    	 * @author ben
    	 * @date 2021-08-27 21:26:30 CST
    	 * @param content
    	 */
    	@RabbitListener(queues = {RabbitConfig2.QUEUE_21})
    	public void processMessage2(String content) {
    		log.info("接收消息:队列{}-processMessage2-content={}",
    				RabbitConfig2.QUEUE_21, content);
    	}
    	
    	/**
    	 * Listener3
    	 * @author ben
    	 * @date 2021-08-27 21:26:30 CST
    	 * @param content
    	 */
    	@RabbitListener(queues = {RabbitConfig2.QUEUE_22})
    	public void processMessage3(String content) {
    		log.info("接收消息:队列{}-processMessage3-content={}", RabbitConfig2.QUEUE_22, content);
    	}
    	
    }

    QUEUE_21 两个监听器,QUEUE_22 一个监听器。

    启动后Web管理器展示:

    测试结果:

    调用接口 /try2/send,

    QUEUE_21 两个监听器,均衡地接收并处理了所有消息;来自博客园

    QUEUE_22 一个监听器,独自处理了所有消息。

    虽然发送时使用了 routingKey 参数,但是,绑定时没有使用 routingKey。来自博客园

    FanoutExchange会把收到的消息 发送到所有与其绑定的队列,这就是 实现广播机制的原理吧。

    试验5:使用TopicExchange

    根据主题(routingKey)将消息发送到不同的队列,可以使用通配符。

    配置:

    @Configuration
    public class RabbitConfig3 {
    	
    	public final static String QUEUE_31 = "queue31";
    	public final static String QUEUE_32 = "queue32";
    	public final static String TOPIC_EXCHANGE_3 = "topic3";
    	
    	@Bean
    	public Queue queue31() {
    		return new Queue(QUEUE_31);
    	}
    	
    	@Bean
    	public Queue queue32() {
    		return new Queue(QUEUE_32);
    	}
    	
    	@Bean
    	public TopicExchange topicExchange() {
    		// 最简单的构造函数
    		return new TopicExchange(TOPIC_EXCHANGE_3);
    	}
    	
    	@Bean
    	public Binding binding31(Queue queue31, TopicExchange topicExchange) {
    		return BindingBuilder
    				.bind(queue31)
    				.to(topicExchange)
    				// routingKey 为 QUEUE_31——队列名
    				.with(QUEUE_31);
    	}
    	
    	@Bean
    	public Binding binding32(Queue queue32, TopicExchange topicExchange) {
    		return BindingBuilder
    				.bind(queue32)
    				.to(topicExchange)
    				// routingKey 为 *,通配符,所有消息
    				.with("*");
    	}
    	
    }

    测试接口:/try3/send

    总计发送 30条,分别使用不同的 routingKey。来自博客园

    	@GetMapping(value="/send")
    	public boolean send() {
    		IntStream.range(0, 10).forEach(i->{
    			// routingKey 为队列31的名称
    			rabbitTemplate.convertAndSend(RabbitConfig3.TOPIC_EXCHANGE_3, RabbitConfig3.QUEUE_31, 
    					"Try3Controller-Simple msg 1-" + i + ", @" + new Date());
    
    			// routingKey 为队列32的名称
    			rabbitTemplate.convertAndSend(RabbitConfig3.TOPIC_EXCHANGE_3, RabbitConfig3.QUEUE_32, 
    					"Try3Controller-Simple msg 2-" + i + ", @" + new Date());
    			
    			// routingKey 为topicXyz
    			rabbitTemplate.convertAndSend(RabbitConfig3.TOPIC_EXCHANGE_3, "topicXyz", 
    					"Try3Controller-Simple msg 3-" + i + ", @" + new Date());
    			
    			try {
    				TimeUnit.SECONDS.sleep(1L);
    			} catch (InterruptedException e) {
    				log.info("异常:{}", e.getMessage());
    			}
    		});
    		
    		return true;
    	}

    监听器:两个,分别监听 QUEUE_31、QUEUE_32。

    	@RabbitListener(queues = {RabbitConfig3.QUEUE_31})
    	public void processMessage31(String content) {
    		log.info("接收消息:队列{}-processMessage31-content={}", 
    				RabbitConfig3.QUEUE_31, content);
    	}
    	
    	@RabbitListener(queues = {RabbitConfig3.QUEUE_32})
    	public void processMessage32(String content) {
    		log.info("接收消息:队列{}-processMessage32-content={}", RabbitConfig3.QUEUE_32, content);
    	}

    执行结果(部分):

    两个监听器,总计收到 40条,

    其中,processMessage31 收到了10条,只是 routingKey 为 队列名 queue31 的,来自博客园

    processMessage32 收到30条——所有——因为使用了 通配符 “*”。

    疑问:

    都是通过 routingKey 转发消息到队列,DirectExchange 和 TopicExchange 的区别是什么呢?

    原来,TopicExchange 也叫 通配符交换机,可以使用 通配符 来转发各种消息,而 DirectExchange 则只是根据 routingKey 的字面值 来转发消息到 队列。

    待验证,TODO

    试验6:接收消息手动确认

    在前面的试验中,@RabbitListener 接收消息都没有做更多工作,直接就获取了 消息内容。

    其实,这时的消息是 自动确认的。

    怎么做到 手动确认消息呢?

    @RabbitListener有一个属性——ackMode,将其配置为 MANUAL 即开启 此监听器的手动确认功能(参考类 AcknowledgeMode)。

    注,当然,应该还有 全局配置的方式,暂不清楚

    本节在 试验3-试验DirectExchange 的基础上继续测试:一个交换机、一个队列、两个RabbitListener 。来自博客园

    6-1:配置其中一个为 手动确认

    	// 手动确认
        @RabbitListener(queues = {RabbitConfig.QUEUE_1}, ackMode="MANUAL")
    	public void processMessage1(String content) {
    		log.info("接收消息:processMessage1-content={}", content);
    	}
        
        // 自动确认
    	@RabbitListener(queues = {RabbitConfig.QUEUE_1})
    	public void processMessage2(String content) {
    		log.info("接收消息:processMessage2-content={}", content);
    	}

    执行消息发送接口,初看日志,消息被两个监听器 均衡地消费了。

    实际上呢?queue1队列里面还剩5个待消费。

    关闭应用,此时,应用没有立即停止,而是输出下面的内容:来自博客园

    注意,上图的日志不是 唯一的情景,有时候,队列中的消息 还会被 自动确认的 监听器给消费了——非确定性的

    那么,怎么执行手动确认呢?需要改造被 @RabbitListener 注解的函数

    如下:增加了参数 channel(com.rabbitmq.client.Channel),tag——必须使用 @Header!

    	@RabbitListener(queues = {RabbitConfig.QUEUE_1}, ackMode="MANUAL")
    //	public void processMessage1(String content) {
    	public void processMessage1(String content, Channel channel,
    			// @Header 必须!否则,异常!
    			@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    		log.info("接收消息:processMessage1-content={}", content);
    		log.info("手动确认消息:tag={}, channelNumber={}, channel", tag, channel.getChannelNumber(), channel);
    		
    		// 确认消息
    		channel.basicAck(tag, false);
    	}

    执行结果:

    手动确认成功,RabbitMQ的Web管理器中,也没有剩余消息了。来自博客园

    6-2:配置所有@RabbitListener 手动确认

    可以看到,上面的 channelNumber 等于 1,那么,将两个 @RabbitListener 都配置为 手动确认的时候如何?

    如下:

    	@RabbitListener(queues = {RabbitConfig.QUEUE_1}, ackMode="MANUAL")
    //	public void processMessage1(String content) {
    	public void processMessage1(String content, Channel channel,
    			// @Header 必须!否则,异常!
    			@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    		log.info("接收消息1:content={}", content);
    		log.info("手动确认消息1:tag={}, channelNumber={}, channel", tag, channel.getChannelNumber(), channel);
    		
    		// 确认消息
    		channel.basicAck(tag, false);
    	}
    	
    	@RabbitListener(queues = {RabbitConfig.QUEUE_1})
    //	public void processMessage2(String content) {
    	public void processMessage2(String content, Channel channel,
    			// @Header 必须!否则,异常!
    			@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    		log.info("接收消息2:content={}", content);
    		log.info("手动确认消息2:tag={}, channelNumber={}, channel", tag, channel.getChannelNumber(), channel);
    		
    		// 确认消息
    		channel.basicAck(tag, false);
    	}

    执行结果:

    消息接收成功了。

    注意,

    channelNumber 有 1、2 两个了,对应两个 @RabbitListener;来自博客园

    而且,各个@RabbitListener中的 tag值 是按照顺序增长的。

    6-3:启动2个应用,共4个 @RabbitListener

    Web监控平台可以看到:

    执行消息发送。

    执行结果:

    一个处理了4条、一个处理了6条,总计10条。来自博客园

    关闭其中一个应用,再测试:

    剩下的一个应用处理了所有消息。

    》》》全文完《《《 

    RabbitMQ还有更多内容可以探究的:

    1、消息持久化,默认的交换机、队列都是持久化的——durable=true;什么时候不需要持久化呢?

    2、可以用来做延迟消息队列——消息发送了,不立即处理,指定时间到了后再处理,怎么实现呢?

    3、更多高级的配置,spring.rabbitmq.*;

    4、RabbitMQ服务器优化等;

    5、集群架构,看了一篇博文,居然介绍了4种!

    6、会用之后,是不是要探究下原理呢?

    ……

    参考文档

    1、RabbitMQ学习笔记:4369、5672、15672、25672默认端口号修改

    2、RabbitMQ三种Exchange模式(fanout,direct,topic)简介

    3、RabbitMQ 的4种集群架构

    4、

  • 相关阅读:
    页面模板
    HTML,CSS,JaveScript
    TCP三次握手
    BLDC无刷直流电机的原理及驱动基础
    调试日志——基于stm32的智能声光报警器(三)
    调试日志——基于stm32的智能声光报警器(二)
    调试日志——基于stm32的智能声光报警器(一)
    Jlink-10 pin 的定义(stm32使用)官方定义
    关于学习新知识的一点想法
    前端笔记-javaScript-3
  • 原文地址:https://www.cnblogs.com/luo630/p/15178354.html
Copyright © 2011-2022 走看看