zoukankan      html  css  js  c++  java
  • spring boot项目11:RabbitMQ-基础使用

    JAVA 8

    Spring Boot 2.5.3

    RabbitMQ 3.6.10, Erlang 20.2.2(单机)

    ---

    授人以渔:

    1、Spring Boot Reference Documentation

    This document is also available as Multi-page HTML, Single page HTML and PDF.

    有PDF版本哦,下载下来!

    2、Spring AMQP

    有PDF版本哦,下载下来!

    当前最新版 RabbitMQ(官网):

    RabbitMQ 3.9.4 release at 17 Aug 2021

    目录

    试验1:发送消息到 queue1队列

    试验2:使用 自定义交换机 发送消息 到 queue1队列

    试验3:多个Listener接收DirectExchange转发到队列queue1的消息

    试验4:使用FanoutExchange

    试验5:使用TopicExchange

    试验6:接收消息手动确认

    参考文档

    消息队列(MQ)是一种重要的中间件,可以实现 1)进程、应用间通信;2)异步任务处理,可以很好地实现 程序的低耦合。

    AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计(百度百科)。

    AMQP是一种 线路级协议(wire-level protocol)——遵循这个协议的实现 之间是可以无缝互操作的。

    RabbitMQ是使用 Erlang语言实现了 AMQP协议。

    Spring Boot中可以方便地集成RabbitMQ,依赖 spring-boot-starter-amqp 包即可:

    本文介绍 S.B. 集成 RabbitMQ 及其基础使用场景。

    补充:

    RabbitMQ启动后,默认使用下面端口:

    5672 服务器端口,S.B.中配置 常用
    15672 Web服务器端口,可以对RabbitMQ服务器进行管理 常用
    25672 用于节点间和CLI工具通信(Erlang分发服务器端口),并从动态范围分配 不常用
    4369 EPMD默认端口 不常用

    注,请看 参考文档(原来有这么多)

    依赖 spring-boot-starter-amqp 包后,Spring容器中会存在一下相关Bean:

    rabbitConnectionFactory
    rabbitTemplateConfigurer
    
    rabbitTemplate # 用过,发送消息
    amqpAdmin # 尚未用过
    
    rabbitMessagingTemplate
    
    simpleRabbitListenerContainerFactoryConfigurer
    rabbitListenerContainerFactory
    directRabbitListenerContainerFactoryConfigurer
    
    RabbitAutoConfiguration
    RabbitProperties
    
    RabbitAnnotationDrivenConfiguration
    RabbitAnnotationDrivenConfiguration$EnableRabbitConfiguration
    
    RabbitAutoConfiguration
    RabbitAutoConfiguration$RabbitConnectionFactoryCreator
    RabbitAutoConfiguration$RabbitTemplateConfiguration
    RabbitAutoConfiguration$MessagingTemplateConfiguration

    rabbitTemplate 是其中 (自己)最常用(重要)的,用来发送消息。

    public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
    		implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
    		ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
    }

    其下的 convertAndSend(...) 是用来发送消息的:

    注,convertSendAndReceive(...) 是 和 Basic RPC pattern 相关,还没用过(不懂)。

    还有一些 send(...) 函数,也是用来发送消息(和 上面的 convertAndSend(...) 有什么区别呢?):

    RabbitMQ关键概念:

    消息队列(Queue)

    交换机(Exchange)

    绑定(Binding)

    路由键值(routingKey)

    频道(channel)

    ---

    消息发送到 交换机,交换机 根据 routingKey 转发到不同队列,再由 队列监听者(Spring 中 @RabbitListener 定义) 接收&处理;

    其中,队列需要绑定到交换机,其中,每个队列都会绑定到 默认交换机——Direct类型

    默认交换机:AMQP default

    Queue、Exchange、Binding都继承了 AbstractDeclarable 类:

    public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
    }
    
    public class Queue extends AbstractDeclarable implements Cloneable {
    }
    
    public class Binding extends AbstractDeclarable {
    }
    
    public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
    }
    

    AbstractExchange 下 有几个子类——Exchange,使用RabbitMQ时会经常用到:

    RabbitMQ支持多种消息传输模式,点对点、广播、发布-订阅 等都支持,这需要使用不同的交换机来实现

    下面是一章经典的图:

    试验1:发送消息到 queue1队列

    首先,定义队列queue1,定义队列监听器(@RabbitListener),然后,调用rabbitTemplate.convertAndSend 发送消息。

    配置文件:

    #
    # RabbitMQ
    # mylinux 为 虚拟机,hosts中配置(单机模式)
    spring.rabbitmq.host=mylinux
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin

    注,上面的配置 可以考虑试验 spring.rabbitmq.addresses 替代——一个搞定。

    spring.rabbitmq.addresses=amqp://admin:admin@mylinux:5672
    试验1源码
    # 文件RabbitConfig.java
    @Configuration
    public class RabbitConfig {
    	
    	public final static String QUEUE_1 = "queue1";
    	
    	@Bean
    	public Queue queue1() {
    		Queue queue1 = new Queue(QUEUE_1);
    		return queue1;
    	}
    	
    }
    
    # 文件RabbitmqHelloApplication.java (主类,部分)
    @Component
    @Order(1)
    @Slf4j
    class TestRunner1 implements CommandLineRunner {
    
    	@Autowired
    	private RabbitTemplate rabbitTemplate;
    	
    	@Override
    	public void run(String... args) throws Exception {
    		/**
    		 * 每隔2秒发送1条消息
    		 * 看看 是否到 queue1、是否能监听到
    		 */
    		for (int i=0; i<10; i++) {
    			log.info("发送msg-{}", i);
    			
    			// 默认交换机 AMQP default,默认的 routingKey="",,怎么接收并处理?TODO
    //			rabbitTemplate.convertAndSend("Simple msg-" + i + ", @" + new Date());
    			
    			// 默认交换机,routingKey=queue1,转发到 queue队列
    			rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_1, "Simple msg-" + i + ", @" + new Date());
    			TimeUnit.SECONDS.sleep(1L);
    		}
    	}
    	
    }
    
    @Component
    @Slf4j
    class RabbitListener1 {
    	
    	/**
    	 * 接收队列queue1 的消息,默认绑定到 默认交换机(AMQP default)
    	 * @author ben
    	 * @date 2021-08-23 22:31:58 CST
    	 * @param content
    	 */
    	@RabbitListener(queues = {RabbitConfig.QUEUE_1})
    	public void processMessage(String content) {
    		log.info("接收消息:content={}", content);
    	}
    	
    }

    试验1测试结果:

    日志1
    2021-08-23 23:40:14.380  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-0
    2021-08-23 23:40:14.401  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-0, @Mon Aug 23 23:40:14 CST 2021
    2021-08-23 23:40:15.389  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-1
    2021-08-23 23:40:15.393  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-1, @Mon Aug 23 23:40:15 CST 2021
    2021-08-23 23:40:16.390  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-2
    2021-08-23 23:40:16.392  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-2, @Mon Aug 23 23:40:16 CST 2021
    2021-08-23 23:40:17.390  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-3
    2021-08-23 23:40:17.394  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-3, @Mon Aug 23 23:40:17 CST 2021
    2021-08-23 23:40:18.391  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-4
    2021-08-23 23:40:18.401  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-4, @Mon Aug 23 23:40:18 CST 2021
    2021-08-23 23:40:19.392  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-5
    2021-08-23 23:40:19.397  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-5, @Mon Aug 23 23:40:19 CST 2021
    2021-08-23 23:40:20.394  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-6
    2021-08-23 23:40:20.401  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-6, @Mon Aug 23 23:40:20 CST 2021
    2021-08-23 23:40:21.395  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-7
    2021-08-23 23:40:21.399  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-7, @Mon Aug 23 23:40:21 CST 2021
    2021-08-23 23:40:22.397  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-8
    2021-08-23 23:40:22.401  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-8, @Mon Aug 23 23:40:22 CST 2021
    2021-08-23 23:40:23.398  INFO 13700 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-9
    2021-08-23 23:40:23.407  INFO 13700 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=Simple msg-9, @Mon Aug 23 23:40:23 CST 2021

    本试验试验的是 默认交换机——AMQP default。

    在开始发送时,使用了1个参数的 convertAndSend,结果,发送成功,但监听器没反应。原来,此时使用了 默认的routingKey="",队列没有收到。

    改为 上面的 2个参数的convertAndSend 后,监听器才收到了发送的消息——routingKey为 队列名。

    RabbitConfig 里面定义的队列是必须的,否则,启动会发生异常:

    o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: queue1
    o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3
    org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[queue1]

    小结:默认交换机

    1、发送使用默认routingKey,无法进入队列,对于的Listener监听不到;

    2、发送使用 队列名 作为 routingKey,Listener接收成功;

    试验2:使用 自定义交换机 发送消息 到 queue1队列

    在 试验1 的基础上,定义 DirectExchange类型directExchange1——名称ex1、Binding类型binding1——绑定queue1和ex1,发送时,发送到 交换机ex1。

    源码改动:

    试验2源码
    # RabbitConfig.java
    @Configuration
    public class RabbitConfig {
    	
    	public final static String QUEUE_1 = "queue1";
    	public final static String EXCHANGE_1 = "ex1";
    	
    	@Bean
    	public Queue queue1() {
    		Queue queue1 = new Queue(QUEUE_1);
    		return queue1;
    	}
    	
    	@Bean
    	public DirectExchange directExchange1() {
    		return new DirectExchange(EXCHANGE_1);
    	}
    	
    	@Bean
    	public Binding binding1(Queue queue1, DirectExchange directExchange1) {
    		// 绑定队列queue1到交换机ex1,使用队列名作为 routingKey
            return BindingBuilder
    				.bind(queue1)
    				.to(directExchange1)
    				.withQueueName();
    	}
    	
    }
    
    # RabbitmqHelloApplication.java
    # 消息多了 default 前缀
    rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_1, "default-Simple msg-" + i + ", @" + new Date());
    # TestRunner1#run 中增加下面的发送语句
    // ex1交换机=>queue1
    rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_1, RabbitConfig.QUEUE_1, "ex1-Simple msg-" + i + ", @" + new Date());

    注,保留了 发送消息到 默认交换机。

    测试结果:

    日志2
    2021-08-23 23:56:05.294  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-0
    2021-08-23 23:56:05.316  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-0, @Mon Aug 23 23:56:05 CST 2021
    2021-08-23 23:56:05.317  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-0, @Mon Aug 23 23:56:05 CST 2021
    2021-08-23 23:56:06.303  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-1
    2021-08-23 23:56:06.313  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-1, @Mon Aug 23 23:56:06 CST 2021
    2021-08-23 23:56:06.315  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-1, @Mon Aug 23 23:56:06 CST 2021
    2021-08-23 23:56:07.306  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-2
    2021-08-23 23:56:07.311  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-2, @Mon Aug 23 23:56:07 CST 2021
    2021-08-23 23:56:07.311  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-2, @Mon Aug 23 23:56:07 CST 2021
    2021-08-23 23:56:08.307  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-3
    2021-08-23 23:56:08.316  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-3, @Mon Aug 23 23:56:08 CST 2021
    2021-08-23 23:56:08.317  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-3, @Mon Aug 23 23:56:08 CST 2021
    2021-08-23 23:56:09.310  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-4
    2021-08-23 23:56:09.315  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-4, @Mon Aug 23 23:56:09 CST 2021
    2021-08-23 23:56:09.316  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-4, @Mon Aug 23 23:56:09 CST 2021
    2021-08-23 23:56:10.311  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-5
    2021-08-23 23:56:10.318  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-5, @Mon Aug 23 23:56:10 CST 2021
    2021-08-23 23:56:10.320  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-5, @Mon Aug 23 23:56:10 CST 2021
    2021-08-23 23:56:11.312  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-6
    2021-08-23 23:56:11.315  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-6, @Mon Aug 23 23:56:11 CST 2021
    2021-08-23 23:56:11.315  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-6, @Mon Aug 23 23:56:11 CST 2021
    2021-08-23 23:56:12.312  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-7
    2021-08-23 23:56:12.314  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-7, @Mon Aug 23 23:56:12 CST 2021
    2021-08-23 23:56:12.315  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-7, @Mon Aug 23 23:56:12 CST 2021
    2021-08-23 23:56:13.313  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-8
    2021-08-23 23:56:13.318  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-8, @Mon Aug 23 23:56:13 CST 2021
    2021-08-23 23:56:13.318  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-8, @Mon Aug 23 23:56:13 CST 2021
    2021-08-23 23:56:14.315  INFO 13500 --- [           main] org.lib.rabbitmqhello.TestRunner1        : 发送msg-9
    2021-08-23 23:56:14.319  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=default-Simple msg-9, @Mon Aug 23 23:56:14 CST 2021
    2021-08-23 23:56:14.324  INFO 13500 --- [ntContainer#0-1] org.lib.rabbitmqhello.RabbitListener1    : 接收消息:content=ex1-Simple msg-9, @Mon Aug 23 23:56:14 CST 2021
    

    使用队列名作为 routingKey,来自 默认交换机、自定义交换机ext1 的消息都收到了。

    Web管理页面变化:

    监听器收到了 2条消息!

    根据 Web管理页面得到的信息:队列 无法和 默认交换机,因此,如果按照上面的方式 发送消息,那么,默认交换机会转发到 queue1 队列。

    Default exchange
    The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. 
    It is not possible to explicitly bind to, or unbind from the default exchange. 
    It also cannot be deleted.

    试验3:多个Listener接收DirectExchange转发到队列queue1的消息

    注,取消上面的 TestRunner1 的消息发送,改为调用接口。

    测试源码:

    # 调用接口 /try1/send 发送消息
    @RestController
    @RequestMapping(value="/try1")
    @Slf4j
    public class Try1Controller {
    
    	@Autowired
    	private RabbitTemplate rabbitTemplate;
    	
    	/**
    	 * 使用 交换机ext1 发送消息到 队列queue1
    	 * @author ben
    	 * @date 2021-08-27 21:24:19 CST
    	 * @return
    	 */
    	@GetMapping(value="/send")
    	public boolean send() {
    		IntStream.range(0, 10).forEach(i->{
    			// ex1交换机=>queue1
    			rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_1, RabbitConfig.QUEUE_1, 
    					"Try1Controller-ex1-Simple msg-" + i + ", @" + new Date());
    			
    			try {
    				TimeUnit.SECONDS.sleep(1L);
    			} catch (InterruptedException e) {
    				log.info("异常:{}", e.getMessage());
    			}
    		});
    		
    		return true;
    	}
    	
    }
    
    # 两个监听器
    @Component
    @Slf4j
    class RabbitListener1 {
    	
    	/**
    	 * Listener1
    	 * @author ben
    	 * @date 2021-08-23 22:31:58 CST
    	 * @param content
    	 */
    	@RabbitListener(queues = {RabbitConfig.QUEUE_1})
    	public void processMessage1(String content) {
    		log.info("接收消息:processMessage1-content={}", content);
    	}
    	
    	/**
    	 * Listener2
    	 * @author ben
    	 * @date 2021-08-27 21:26:30 CST
    	 * @param content
    	 */
    	@RabbitListener(queues = {RabbitConfig.QUEUE_1})
    	public void processMessage2(String content) {
    		log.info("接收消息:processMessage2-content={}", content);
    	}
    	
    }

    一个队列,两个监听器。来自博客园

    执行结果:

    调用接口 /try1/send,共发送10条消息,两个监听器都收到了,分别5条——实现了均衡。

    试验4:使用FanoutExchange

    一个Fanout交换机,两个队列,绑定。

    配置:

    @Configuration
    public class RabbitConfig2 {
    	
    	public final static String QUEUE_21 = "queue21";
    	public final static String QUEUE_22 = "queue22";
    	public final static String FAN_EXCHANGE_2 = "fanout2";
    	
    	@Bean
    	public Queue queue21() {
    		Queue queue1 = new Queue(QUEUE_21);
    		return queue1;
    	}
    	
    	@Bean
    	public Queue queue22() {
    		Queue queue1 = new Queue(QUEUE_22);
    		return queue1;
    	}
    	
    	@Bean
    	public FanoutExchange FanoutExchange() {
    		// 最简单的构造函数
    		return new FanoutExchange(FAN_EXCHANGE_2);
    	}
    	
    	@Bean
    	public Binding binding21(Queue queue21, FanoutExchange FanoutExchange) {
    		return BindingBuilder
    				.bind(queue21)
    				// to后面 没有 withXXX 函数,to的结果即返回 Binding对象
    				.to(FanoutExchange);
    	}
    	
    	@Bean
    	public Binding binding22(Queue queue22, FanoutExchange FanoutExchange) {
    		return BindingBuilder
    				.bind(queue22)
    				// to后面 没有 withXXX 函数,to的结果即返回 Binding对象
    				.to(FanoutExchange);
    	}
    	
    }

    发送接口:

    	/**
    	 * 发送消息
    	 * @author ben
    	 * @date 2021-08-27 21:24:19 CST
    	 * @return
    	 */
    	@GetMapping(value="/send")
    	public boolean send() {
    		IntStream.range(0, 10).forEach(i->{
    			rabbitTemplate.convertAndSend(RabbitConfig2.FAN_EXCHANGE_2, RabbitConfig2.QUEUE_21, 
    					"Try2Controller-Simple msg-" + i + ", @" + new Date());
    			
    			try {
    				TimeUnit.SECONDS.sleep(1L);
    			} catch (InterruptedException e) {
    				log.info("异常:{}", e.getMessage());
    			}
    		});
    		
    		return true;
    	}

    监听器:来自博客园

    @Component
    @Slf4j
    public class Try2Listener {
    
    	/**
    	 * Listener1
    	 * @author ben
    	 * @date 2021-08-23 22:31:58 CST
    	 * @param content
    	 */
    	@RabbitListener(queues = {RabbitConfig2.QUEUE_21})
    	public void processMessage1(String content) {
    		log.info("接收消息:队列{}-processMessage1-content={}", 
    				RabbitConfig2.QUEUE_21, content);
    	}
    	
    	/**
    	 * Listener2
    	 * @author ben
    	 * @date 2021-08-27 21:26:30 CST
    	 * @param content
    	 */
    	@RabbitListener(queues = {RabbitConfig2.QUEUE_21})
    	public void processMessage2(String content) {
    		log.info("接收消息:队列{}-processMessage2-content={}",
    				RabbitConfig2.QUEUE_21, content);
    	}
    	
    	/**
    	 * Listener3
    	 * @author ben
    	 * @date 2021-08-27 21:26:30 CST
    	 * @param content
    	 */
    	@RabbitListener(queues = {RabbitConfig2.QUEUE_22})
    	public void processMessage3(String content) {
    		log.info("接收消息:队列{}-processMessage3-content={}", RabbitConfig2.QUEUE_22, content);
    	}
    	
    }

    QUEUE_21 两个监听器,QUEUE_22 一个监听器。

    启动后Web管理器展示:

    测试结果:

    调用接口 /try2/send,

    QUEUE_21 两个监听器,均衡地接收并处理了所有消息;来自博客园

    QUEUE_22 一个监听器,独自处理了所有消息。

    虽然发送时使用了 routingKey 参数,但是,绑定时没有使用 routingKey。来自博客园

    FanoutExchange会把收到的消息 发送到所有与其绑定的队列,这就是 实现广播机制的原理吧。

    试验5:使用TopicExchange

    根据主题(routingKey)将消息发送到不同的队列,可以使用通配符。

    配置:

    @Configuration
    public class RabbitConfig3 {
    	
    	public final static String QUEUE_31 = "queue31";
    	public final static String QUEUE_32 = "queue32";
    	public final static String TOPIC_EXCHANGE_3 = "topic3";
    	
    	@Bean
    	public Queue queue31() {
    		return new Queue(QUEUE_31);
    	}
    	
    	@Bean
    	public Queue queue32() {
    		return new Queue(QUEUE_32);
    	}
    	
    	@Bean
    	public TopicExchange topicExchange() {
    		// 最简单的构造函数
    		return new TopicExchange(TOPIC_EXCHANGE_3);
    	}
    	
    	@Bean
    	public Binding binding31(Queue queue31, TopicExchange topicExchange) {
    		return BindingBuilder
    				.bind(queue31)
    				.to(topicExchange)
    				// routingKey 为 QUEUE_31——队列名
    				.with(QUEUE_31);
    	}
    	
    	@Bean
    	public Binding binding32(Queue queue32, TopicExchange topicExchange) {
    		return BindingBuilder
    				.bind(queue32)
    				.to(topicExchange)
    				// routingKey 为 *,通配符,所有消息
    				.with("*");
    	}
    	
    }

    测试接口:/try3/send

    总计发送 30条,分别使用不同的 routingKey。来自博客园

    	@GetMapping(value="/send")
    	public boolean send() {
    		IntStream.range(0, 10).forEach(i->{
    			// routingKey 为队列31的名称
    			rabbitTemplate.convertAndSend(RabbitConfig3.TOPIC_EXCHANGE_3, RabbitConfig3.QUEUE_31, 
    					"Try3Controller-Simple msg 1-" + i + ", @" + new Date());
    
    			// routingKey 为队列32的名称
    			rabbitTemplate.convertAndSend(RabbitConfig3.TOPIC_EXCHANGE_3, RabbitConfig3.QUEUE_32, 
    					"Try3Controller-Simple msg 2-" + i + ", @" + new Date());
    			
    			// routingKey 为topicXyz
    			rabbitTemplate.convertAndSend(RabbitConfig3.TOPIC_EXCHANGE_3, "topicXyz", 
    					"Try3Controller-Simple msg 3-" + i + ", @" + new Date());
    			
    			try {
    				TimeUnit.SECONDS.sleep(1L);
    			} catch (InterruptedException e) {
    				log.info("异常:{}", e.getMessage());
    			}
    		});
    		
    		return true;
    	}

    监听器:两个,分别监听 QUEUE_31、QUEUE_32。

    	@RabbitListener(queues = {RabbitConfig3.QUEUE_31})
    	public void processMessage31(String content) {
    		log.info("接收消息:队列{}-processMessage31-content={}", 
    				RabbitConfig3.QUEUE_31, content);
    	}
    	
    	@RabbitListener(queues = {RabbitConfig3.QUEUE_32})
    	public void processMessage32(String content) {
    		log.info("接收消息:队列{}-processMessage32-content={}", RabbitConfig3.QUEUE_32, content);
    	}

    执行结果(部分):

    两个监听器,总计收到 40条,

    其中,processMessage31 收到了10条,只是 routingKey 为 队列名 queue31 的,来自博客园

    processMessage32 收到30条——所有——因为使用了 通配符 “*”。

    疑问:

    都是通过 routingKey 转发消息到队列,DirectExchange 和 TopicExchange 的区别是什么呢?

    原来,TopicExchange 也叫 通配符交换机,可以使用 通配符 来转发各种消息,而 DirectExchange 则只是根据 routingKey 的字面值 来转发消息到 队列。

    待验证,TODO

    试验6:接收消息手动确认

    在前面的试验中,@RabbitListener 接收消息都没有做更多工作,直接就获取了 消息内容。

    其实,这时的消息是 自动确认的。

    怎么做到 手动确认消息呢?

    @RabbitListener有一个属性——ackMode,将其配置为 MANUAL 即开启 此监听器的手动确认功能(参考类 AcknowledgeMode)。

    注,当然,应该还有 全局配置的方式,暂不清楚

    本节在 试验3-试验DirectExchange 的基础上继续测试:一个交换机、一个队列、两个RabbitListener 。来自博客园

    6-1:配置其中一个为 手动确认

    	// 手动确认
        @RabbitListener(queues = {RabbitConfig.QUEUE_1}, ackMode="MANUAL")
    	public void processMessage1(String content) {
    		log.info("接收消息:processMessage1-content={}", content);
    	}
        
        // 自动确认
    	@RabbitListener(queues = {RabbitConfig.QUEUE_1})
    	public void processMessage2(String content) {
    		log.info("接收消息:processMessage2-content={}", content);
    	}

    执行消息发送接口,初看日志,消息被两个监听器 均衡地消费了。

    实际上呢?queue1队列里面还剩5个待消费。

    关闭应用,此时,应用没有立即停止,而是输出下面的内容:来自博客园

    注意,上图的日志不是 唯一的情景,有时候,队列中的消息 还会被 自动确认的 监听器给消费了——非确定性的

    那么,怎么执行手动确认呢?需要改造被 @RabbitListener 注解的函数

    如下:增加了参数 channel(com.rabbitmq.client.Channel),tag——必须使用 @Header!

    	@RabbitListener(queues = {RabbitConfig.QUEUE_1}, ackMode="MANUAL")
    //	public void processMessage1(String content) {
    	public void processMessage1(String content, Channel channel,
    			// @Header 必须!否则,异常!
    			@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    		log.info("接收消息:processMessage1-content={}", content);
    		log.info("手动确认消息:tag={}, channelNumber={}, channel", tag, channel.getChannelNumber(), channel);
    		
    		// 确认消息
    		channel.basicAck(tag, false);
    	}

    执行结果:

    手动确认成功,RabbitMQ的Web管理器中,也没有剩余消息了。来自博客园

    6-2:配置所有@RabbitListener 手动确认

    可以看到,上面的 channelNumber 等于 1,那么,将两个 @RabbitListener 都配置为 手动确认的时候如何?

    如下:

    	@RabbitListener(queues = {RabbitConfig.QUEUE_1}, ackMode="MANUAL")
    //	public void processMessage1(String content) {
    	public void processMessage1(String content, Channel channel,
    			// @Header 必须!否则,异常!
    			@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    		log.info("接收消息1:content={}", content);
    		log.info("手动确认消息1:tag={}, channelNumber={}, channel", tag, channel.getChannelNumber(), channel);
    		
    		// 确认消息
    		channel.basicAck(tag, false);
    	}
    	
    	@RabbitListener(queues = {RabbitConfig.QUEUE_1})
    //	public void processMessage2(String content) {
    	public void processMessage2(String content, Channel channel,
    			// @Header 必须!否则,异常!
    			@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    		log.info("接收消息2:content={}", content);
    		log.info("手动确认消息2:tag={}, channelNumber={}, channel", tag, channel.getChannelNumber(), channel);
    		
    		// 确认消息
    		channel.basicAck(tag, false);
    	}

    执行结果:

    消息接收成功了。

    注意,

    channelNumber 有 1、2 两个了,对应两个 @RabbitListener;来自博客园

    而且,各个@RabbitListener中的 tag值 是按照顺序增长的。

    6-3:启动2个应用,共4个 @RabbitListener

    Web监控平台可以看到:

    执行消息发送。

    执行结果:

    一个处理了4条、一个处理了6条,总计10条。来自博客园

    关闭其中一个应用,再测试:

    剩下的一个应用处理了所有消息。

    》》》全文完《《《 

    RabbitMQ还有更多内容可以探究的:

    1、消息持久化,默认的交换机、队列都是持久化的——durable=true;什么时候不需要持久化呢?

    2、可以用来做延迟消息队列——消息发送了,不立即处理,指定时间到了后再处理,怎么实现呢?

    3、更多高级的配置,spring.rabbitmq.*;

    4、RabbitMQ服务器优化等;

    5、集群架构,看了一篇博文,居然介绍了4种!

    6、会用之后,是不是要探究下原理呢?

    ……

    参考文档

    1、RabbitMQ学习笔记:4369、5672、15672、25672默认端口号修改

    2、RabbitMQ三种Exchange模式(fanout,direct,topic)简介

    3、RabbitMQ 的4种集群架构

    4、

  • 相关阅读:
    CF763C Timofey and Remoduling
    CF762E Radio Stations
    CF762D Maximum Path
    CF763B Timofey and Rectangles
    URAL1696 Salary for Robots
    uva10884 Persephone
    LA4273 Post Offices
    SCU3037 Painting the Balls
    poj3375 Network Connection
    Golang zip压缩文件读写操作
  • 原文地址:https://www.cnblogs.com/luo630/p/15178354.html
Copyright © 2011-2022 走看看