1.前言
上一随笔详细记录了直连交换机的方法,发送的消息是异步的,如果消息未被消费者消费,那么可以一直存在消息队列中。
那么有没有办法做一个回调,当消息被消费后,被通知消息成功被消费者消费啦?
答案是有的。
需要在消息生产者的消息生产类实现 RabbitTemplate.ConfirmCallback 接口,重写 回调方法confirm(),
同时 RabbitTemplate 模板工具需要自定义注入连接rabbitmq的连接工厂对象才可以正常执行回调操作。
而消费者端的代码不需要修改。
下面演示,以上一节随笔为基础,修改消息生产者部分代码实现演示,随笔地址https://www.cnblogs.com/c2g5201314/p/13156932.html
总结: (1)异步操作,获取回调消费结果,需要实现RabbitTemplate.ConfirmCallback 接口,然后重写 confirm()方法。 (2)获取回调结果,指的是获取消息是否被消费端正常消费而返回的结果,并不是消费端返回 的处理结果,这一点得注意,如果需要等待消费端返回处理结果,则需要做同步操作, 而不是做回调操作。 (3)需要做同步操作时,应该rabbitTemplate.convertSendAndReceive()方法,返回结果类型是Object,需要根据消费端返回的数据类型来决定强转的类型。 (4)异步则使用rabbitTemplate.convertAndSend()方法。
2.操作
(1)修改配置类,添加自定义RabbitTemplate模板
完整源码
package com.example.rabbitmqproducer1004.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbitmq配置类---消息生产者 */ @Configuration public class RabbitmqConfig { //日志记录器 Logger logger = LoggerFactory.getLogger(getClass()); //=========================================================== /** * 手动配置RabbitTemplate 是为了获得回调操作,否则无法执行获取消费结果 */ /** * 获取rabbitmq的登录信息 */ //ip地址 @Value("${spring.rabbitmq.host}") private String host; //端口号 @Value("${spring.rabbitmq.port}") private int port; //账号 @Value("${spring.rabbitmq.username}") private String username; //密码 @Value("${spring.rabbitmq.password}") private String password; /** * 设置连接工厂 */ @Bean public ConnectionFactory connectionFactory() { //实例缓存连接工厂,参数是 rabbitmq的ip和端口 CachingConnectionFactory factory = new CachingConnectionFactory(host, port); //登录用户名 factory.setUsername(username); //登录密码 factory.setPassword(password); //设置主机的虚拟路径 factory.setVirtualHost("/"); //确认是否发布 factory.setPublisherConfirms(true); return factory; } /** * 设置rabbitmq模板 */ @Bean public RabbitTemplate rabbitTemplate() { //将连接工程工厂对象注入模板里,然后返回一个模板对象 return new RabbitTemplate(this.connectionFactory()); } //===================================================================== /** * 定义 交换机、消息队列、路由键 的名字 */ //定义交换机名字 exchange public static final String EXCHANG_1 = "exchange_1"; //定义消息队列名字 queue public static final String QUEUE_1 = "queu_1"; //定义路由键 routingkey public static final String ROUTINGKEY_1 = "routing_1"; //=============================================================== /** * 下面的是 直连交换机 设置 绑定 消息队列 到 交换机 * * DirectExchange:直连交换机,按照routingkey分发到指定队列 */ //============================================== /** * 设置交换机类型 */ @Bean public DirectExchange directExchange() { logger.warn("设置交换机类型"); //实例交换机对象,然后注入该交换机的名字 return new DirectExchange(EXCHANG_1); } /** * 创建消息队列 */ @Bean public Queue queue1() { logger.warn("创建消息队列"); //实例消息队列对象,输入该队列名字,如果需要该队列持久化,则设为true,默认是false // return new Queue(QUEUE_1, true); return new Queue(QUEUE_1); } /** * 绑定 消息队列 到 交换机【一个 交换机 允许被多个 消息队列 绑定】 */ @Bean public Binding binding() { logger.warn("绑定 消息队列 到 交换机"); //使用绑定构造器将 指定的队列 绑定到 指定的交换机上 ,Direct交换机需要携带 路由键 return BindingBuilder.bind(queue1()).to(directExchange()).with(ROUTINGKEY_1); } }
(2)修改消息生产类
实现接口
重写回方法
使用构造注入RabbitTemplate模板对象
完整源码
package com.example.rabbitmqproducer1004.rabbitmqFactory; import com.example.rabbitmqproducer1004.config.RabbitmqConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.UUID; /** * 消息生产类 */ @Component //实现接口 //public class SendMessage { //需要设置回调方法,获取消费结果才需要实现 RabbitTemplate.ConfirmCallback 接口, public class SendMessage implements RabbitTemplate.ConfirmCallback { Logger logger = LoggerFactory.getLogger(this.getClass()); //====================================================================== /** * 方法一:设置回调方法,获取消费结果, * <p> * 缺点是:必须手动配置RabbitTemplate模板 ,代码量大 */ //存储 rabbitmq模板的临时变量 private final RabbitTemplate rabbitTemplate; /** * 构造注入rabbitmq模板,这样可以设置回调方法,获取消费结果,但是必须手动配置RabbitTemplate模板 */ @Autowired public SendMessage(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; //设置确认回调的方法,参数类型为ConfirmCallback this.rabbitTemplate.setConfirmCallback(this); } /** * 回调方法,获取消费结果 * * @param correlationData 关联数据 * @param b 消息是否被消费成功,成功为true ,失败为false * @param s 原因 ,消费成功则返回null,否则返回失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { logger.warn("回调的连接数据:" + correlationData); if (correlationData != null) { //CorrelationData [id=1bcab025-2b4c-4f74-a22d-41007e30f551] logger.warn("获取correlationData的id值:" + correlationData.getId()); } //1bcab025-2b4c-4f74-a22d-41007e30f551 if (b) { logger.warn("回调结果:消息消费成功"); } else { logger.warn("回调结果:失败。原因:" + s); } } //======================================================================== // /** // * 方法二 :不需要获取获取消费结果,只需要发送即可 // * // * 优点:自动装配,代码量少 // */ // @Autowired // private RabbitTemplate rabbitTemplate; //======================================================================== /** * 发送消息 * <p> * 参数是消息内容 */ public void send(String message) { logger.warn("发送消息,内容:" + message); /** * 方法一:异步操作,不等待消费者端返回处理结果,设置在回调操作的关联数据,用于识别是哪一条消息和确认是否执行成功 */ // 实例关联数据对象,使用UUID随机数 作为 回调id CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 发送消息 ,参数分别是 : 指定的交换机名字 、指定的路由关键字、消息字符串、关联数据 rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANG_1,RabbitmqConfig.ROUTINGKEY_1,message,correlationData); /** * 方法二:异步操作,不等待消费者端返回处理结果,且在消息回调操作的关联数据为null,如果不做回调操作,则建议这样使用 */ // //发送消息 ,参数分别是 : 指定的交换机名字 、指定的路由关键字、消息字符串 // rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANG_1, RabbitmqConfig.ROUTINGKEY_1, message); /** * 方法三:同步操作,等待消费者端返回处理结果 */ // Object dd = rabbitTemplate.convertSendAndReceive(RabbitmqConfig.EXCHANG_1, RabbitmqConfig.ROUTINGKEY_1, message); // logger.warn("结果是什么???==" + dd); } }
3.测试
依次启动生产者端、消费者端
访问网址 http://localhost:1004/mq?msg=你大爷,帮我发短信3999
查看生产者控制台打印
对调成功
再看看消费者的打印台
成功!!!撒花