zoukankan      html  css  js  c++  java
  • RabbitMQ的高级特性(一)可靠性传递 lq

    可靠性投递可分为两个阶段:
    1. producer --> broker
    2. broker --> consumer

    1. producer 到 mq server又可以分为两个阶段:
      1. producer -> exchange 采用confirm确认模式,即从producer到exchange就会返回一个confirmCallback
      2. exchange -> queue 采用return 退回模式,即从exchange到queue 投递失败才会返回一个returnCallback
        1. pom依赖跟spring集成rabbitmq一样。
        2. rabbitmq.properties 和spring集成rabbitmq一样。
        3. spring-rabbitmq-producer.xml
        <?xml version="1.0" encoding="UTF-8"?>
        <beans xmlns="http://www.springframework.org/schema/beans"
        	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        	   xmlns:context="http://www.springframework.org/schema/context"
        	   xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        	   xsi:schemaLocation="http://www.springframework.org/schema/beans
        	   http://www.springframework.org/schema/beans/spring-beans.xsd
        	   http://www.springframework.org/schema/context
        	   https://www.springframework.org/schema/context/spring-context.xsd
        	   http://www.springframework.org/schema/rabbit
        	   http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
        	<!--加载配置文件-->
        	<context:property-placeholder location="classpath:rabbitmq.properties"/>
        
        	<!-- 定义rabbitmq connectionFactory
        		publisher-confirms="true" 确认模式开启:消息发送方是否将消息发送给exchange,如果发送到了,会返回确认消息
        	publisher-returns="true" 返回模式开启:exchange将消息发送给queue时,发送不过去,返回消息发送失败结果。
        	-->
        	<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
        							   port="${rabbitmq.port}"
        							   username="${rabbitmq.username}"
        							   password="${rabbitmq.password}"
        							   virtual-host="${rabbitmq.virtual-host}"
        							   publisher-confirms="true"
        							   publisher-returns="true"
        	/>
        	<!--定义管理交换机、队列-->
        	<rabbit:admin connection-factory="connectionFactory"/>
        
        	<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
        	<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
        
        	<!--消息可靠性投递(生产端)-->
        	<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
        	<rabbit:direct-exchange name="test_exchange_confirm">
        		<rabbit:bindings>
        			<rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
        		</rabbit:bindings>
        	</rabbit:direct-exchange>
        
        </beans>
        
        1. 确认模式代码
        @RunWith(SpringJUnit4ClassRunner.class)
        @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
        public class ProducerTest {
        
        	@Autowired
        	private RabbitTemplate rabbitTemplate;
        	/**
        	 * 确认模式:
        	 * 步骤:
        	 * 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
        	 * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
        	 */
        	@Test
        	public void testConfirm() {
        		//2. 定义回调
        		rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        			@Override
        			public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        				if (ack){
        					//接收成功
        					System.out.println("接收成功消息" + cause);
        				}else {
        					//接收失败
        					System.out.println("接收失败消息" + cause);
        					//做一些处理,让消息再次发送。
        				}
        			}
        		});
        
        		//3. 发送消息
        		rabbitTemplate.convertAndSend("test_exchange_confirm000", "confirm", "message confirm....");
        	}
        
        }
        
        1. 回退模式代码:
        @RunWith(SpringJUnit4ClassRunner.class)
        @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
        public class ProducerTest {
        
        	@Autowired
        	private RabbitTemplate rabbitTemplate;
        
        	/**
        	 * 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败时 才会执行 ReturnCallBack
        	 * 步骤:
        	 * 1. 开启回退模式:publisher-returns="true"
        	 * 2. 设置ReturnCallBack
        	 * 3. 设置Exchange处理消息的模式:
        	 *      1). 如果消息没有路由到Queue,则丢弃消息(默认)
        	 *      2). 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
        	 */
        	@Test
        	public void testReturn() {
        
        		//设置交换机处理失败消息的模式。在交换机没有发送成功到队列的情况下,设置了,才会调用ReturnCallback()
        		//否则,将会直接将消息丢弃,不调用ReturnCallback()
        		rabbitTemplate.setMandatory(true);
        		//2.设置ReturnCallBack
        		rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        			/**
        			 * @param message   消息对象
        			 * @param replyCode 错误码
        			 * @param replyText 错误信息
        			 * @param exchange  交换机
        			 * @param routingKey 路由键
        			 */
        			@Override
        			public void returnedMessage(Message message, int replyCode,String replyText,String exchange,String routingKey) {
        				System.out.println("return 执行了....");
        
        				System.out.println(message);
        				System.out.println(replyCode);
        				System.out.println(replyText);
        				System.out.println(exchange);
        				System.out.println(routingKey);
        				//处理
        			}
        		});
        		//3. 发送消息
        		rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
        	}
        }
        
      3. 两种模式的小结:
        • 设置 ConnectionFactory的publisher-confirms="true" 开启 确认模式。
        • 使用 rabbitTemplate.setConfirmCallback 设置回调函数。当消息发送到 exchange 后回调 confirm 方法。在方法中判断 ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。(不论成功失败,只要开启了,就会发送消息)
        • 设置 ConnectionFactory 的 publisher-returns="true" 开启 退回模式。
        • 使用 rabbitTemplate.setReturnCallback 设置退回函数,当消息从exchange 路由到 queue 失败后,如果设置了 rabbitTemplate.setMandatory(true) 参数,则会将消息退回给 producer并执行回调函数returnedMessage。(开启模式,需要在java代码中设置Mandatory为true,并且只有发送失败才会调用returnedMessage)
    2. broker -> consumer
      • consumer确认模式有两种:自动(acknowledge="none")和手动(acknowledge="manual")。
      • 自动确认:当消息一旦被Consumer接收到,则自动确认收到,并将相应消息从队列中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
      • 手动确认:则需要在业务处理成功后,调用channel.basicAck(),手动签收;如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
        1. pom依赖相同
        2. rabbitmq.properties 相同
        3. spring-rabbitmq-consumer.xml
        <?xml version="1.0" encoding="UTF-8"?>
        <beans xmlns="http://www.springframework.org/schema/beans"
        	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        	   xmlns:context="http://www.springframework.org/schema/context"
        	   xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        	   xsi:schemaLocation="http://www.springframework.org/schema/beans
        	   http://www.springframework.org/schema/beans/spring-beans.xsd
        	   http://www.springframework.org/schema/context
        	   https://www.springframework.org/schema/context/spring-context.xsd
        	   http://www.springframework.org/schema/rabbit
        	   http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
        	<!--加载配置文件-->
        	<context:property-placeholder location="classpath:rabbitmq.properties"/>
        
        	<!-- 定义rabbitmq connectionFactory -->
        	<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
        							   port="${rabbitmq.port}"
        							   username="${rabbitmq.username}"
        							   password="${rabbitmq.password}"
        							   virtual-host="${rabbitmq.virtual-host}"/>
        	<!--扫描包-->
        	<context:component-scan base-package="com.atguigu.listener" />
        
        	<!--定义监听器容器
        	acknowledge="none":默认的签收方式,自动签收
        	acknowledge="manual":手动签收
        	Ctrl + alt + 空格:属性值提示
        	-->
        	<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
        		<!--延迟队列效果实现:  一定要监听的是 死信队列!!!-->
        		<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>
        	</rabbit:listener-container>
        
        </beans>
        
        1. 自动确认代码:
        @Component
        public class AckListener implements MessageListener {
        	@Override
        	public void onMessage(Message message) {
        		System.out.println(new String(message.getBody()));
        	}
        }
        
        1. 手动确认代码:
        /**
         * Consumer ACK机制:
         *  1. 设置手动签收。acknowledge="manual"
         *  2. 让监听器类实现ChannelAwareMessageListener接口
         *  3. 如果消息成功处理,则调用channel的 basicAck()签收
         *  4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
        */
        @Component
        public class AckListener implements ChannelAwareMessageListener {
        	@Override
        	public void onMessage(Message message, Channel channel) throws Exception {
        		Thread.sleep(1000);
        		// 获取消息传递标记
        		long deliveryTag = message.getMessageProperties().getDeliveryTag();
        		try {
        			// ① 接收消息
        			System.out.println(new String(message.getBody()));
        			// ② 处理业务逻辑
        			System.out.println("处理业务逻辑");
        			int i = 3/0;//出现错误
        			// ③ 手动签收
        			/**
        			 * 第一个参数:表示收到的标签
        			 * 第二个参数mutiple:如果为true表示可以签收所有的消息
        			 */
        			channel.basicAck(deliveryTag,true);
        		} catch (Exception e) {
        			e.printStackTrace();
        			// ④ 拒绝签收
        			 /*
        			第三个参数:requeue:重回队列。设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
        			 */
        			channel.basicNack(deliveryTag,true,true);
        		}
        	}
        }
        
        1. 单元测试代码:
        @RunWith(SpringJUnit4ClassRunner.class)
        @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
        public class ConsumerTest {
        	@Test
        	public void test(){
        		while (true){
        
        		}
        	}
        }
        
      • consumer 确认模式小结:
        1. 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
        2. 消费监听类实现的不是MessageListener接口,而是带有信道的ChannelAwareMessageListener接口。并且实现方法onMessage(Message message,Channel channel),多了channel参数。
        3. 在消费端没有出现异常,调用channel.basicAck(deliveryTag,true);方法确认签收消息
        4. 出现异常,则在catch中调用 basicNack,拒绝消息,让MQ重新发送消息。
  • 相关阅读:
    [ZZ]终极期望
    推荐一部好电影
    读书笔记:《Java2 教程》(五)
    波音飞机的消息
    雪景
    [ZZ]候捷谈Java反射机制
    关于J2ME开发的感想(20060505更新)
    读书笔记:《Java2 教程》(七)
    读书笔记:《Java2 教程》(六)
    注册了Bloglines
  • 原文地址:https://www.cnblogs.com/rbwbear/p/15557911.html
Copyright © 2011-2022 走看看