可靠性投递可分为两个阶段:
1. producer --> broker
2. broker --> consumer
- producer 到 mq server又可以分为两个阶段:
- producer -> exchange 采用confirm确认模式,即从producer到exchange就会返回一个confirmCallback
- exchange -> queue 采用return 退回模式,即从exchange到queue 投递失败才会返回一个returnCallback
- pom依赖跟spring集成rabbitmq一样。
- rabbitmq.properties 和spring集成rabbitmq一样。
- spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory publisher-confirms="true" 确认模式开启:消息发送方是否将消息发送给exchange,如果发送到了,会返回确认消息 publisher-returns="true" 返回模式开启:exchange将消息发送给queue时,发送不过去,返回消息发送失败结果。 --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true" /> <!--定义管理交换机、队列--> <rabbit:admin connection-factory="connectionFactory"/> <!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <!--消息可靠性投递(生产端)--> <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue> <rabbit:direct-exchange name="test_exchange_confirm"> <rabbit:bindings> <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> </beans>
- 确认模式代码
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate; /** * 确认模式: * 步骤: * 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true" * 2. 在rabbitTemplate定义ConfirmCallBack回调函数 */ @Test public void testConfirm() { //2. 定义回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ //接收成功 System.out.println("接收成功消息" + cause); }else { //接收失败 System.out.println("接收失败消息" + cause); //做一些处理,让消息再次发送。 } } }); //3. 发送消息 rabbitTemplate.convertAndSend("test_exchange_confirm000", "confirm", "message confirm...."); } }
- 回退模式代码:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate; /** * 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败时 才会执行 ReturnCallBack * 步骤: * 1. 开启回退模式:publisher-returns="true" * 2. 设置ReturnCallBack * 3. 设置Exchange处理消息的模式: * 1). 如果消息没有路由到Queue,则丢弃消息(默认) * 2). 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack */ @Test public void testReturn() { //设置交换机处理失败消息的模式。在交换机没有发送成功到队列的情况下,设置了,才会调用ReturnCallback() //否则,将会直接将消息丢弃,不调用ReturnCallback() rabbitTemplate.setMandatory(true); //2.设置ReturnCallBack rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * @param message 消息对象 * @param replyCode 错误码 * @param replyText 错误信息 * @param exchange 交换机 * @param routingKey 路由键 */ @Override public void returnedMessage(Message message, int replyCode,String replyText,String exchange,String routingKey) { System.out.println("return 执行了...."); System.out.println(message); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); //处理 } }); //3. 发送消息 rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm...."); } }
- 两种模式的小结:
- 设置 ConnectionFactory的publisher-confirms="true" 开启 确认模式。
- 使用 rabbitTemplate.setConfirmCallback 设置回调函数。当消息发送到 exchange 后回调 confirm 方法。在方法中判断 ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。(不论成功失败,只要开启了,就会发送消息)
- 设置 ConnectionFactory 的 publisher-returns="true" 开启 退回模式。
- 使用 rabbitTemplate.setReturnCallback 设置退回函数,当消息从exchange 路由到 queue 失败后,如果设置了 rabbitTemplate.setMandatory(true) 参数,则会将消息退回给 producer并执行回调函数returnedMessage。(开启模式,需要在java代码中设置Mandatory为true,并且只有发送失败才会调用returnedMessage)
- broker -> consumer
- consumer确认模式有两种:自动(acknowledge="none")和手动(acknowledge="manual")。
- 自动确认:当消息一旦被Consumer接收到,则自动确认收到,并将相应消息从队列中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
- 手动确认:则需要在业务处理成功后,调用channel.basicAck(),手动签收;如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
- pom依赖相同
- rabbitmq.properties 相同
- spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <!--扫描包--> <context:component-scan base-package="com.atguigu.listener" /> <!--定义监听器容器 acknowledge="none":默认的签收方式,自动签收 acknowledge="manual":手动签收 Ctrl + alt + 空格:属性值提示 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <!--延迟队列效果实现: 一定要监听的是 死信队列!!!--> <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener> </rabbit:listener-container> </beans>
- 自动确认代码:
@Component public class AckListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println(new String(message.getBody())); } }
- 手动确认代码:
/** * Consumer ACK机制: * 1. 设置手动签收。acknowledge="manual" * 2. 让监听器类实现ChannelAwareMessageListener接口 * 3. 如果消息成功处理,则调用channel的 basicAck()签收 * 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer */ @Component public class AckListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { Thread.sleep(1000); // 获取消息传递标记 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // ① 接收消息 System.out.println(new String(message.getBody())); // ② 处理业务逻辑 System.out.println("处理业务逻辑"); int i = 3/0;//出现错误 // ③ 手动签收 /** * 第一个参数:表示收到的标签 * 第二个参数mutiple:如果为true表示可以签收所有的消息 */ channel.basicAck(deliveryTag,true); } catch (Exception e) { e.printStackTrace(); // ④ 拒绝签收 /* 第三个参数:requeue:重回队列。设置为true,则消息重新回到queue,broker会重新发送该消息给消费端 */ channel.basicNack(deliveryTag,true,true); } } }
- 单元测试代码:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml") public class ConsumerTest { @Test public void test(){ while (true){ } } }
- consumer 确认模式小结:
- 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
- 消费监听类实现的不是MessageListener接口,而是带有信道的ChannelAwareMessageListener接口。并且实现方法onMessage(Message message,Channel channel),多了channel参数。
- 在消费端没有出现异常,调用channel.basicAck(deliveryTag,true);方法确认签收消息
- 出现异常,则在catch中调用 basicNack,拒绝消息,让MQ重新发送消息。