RabbitMQ--消息确认机制(confirm)
Confirm模式
RabbitMQ为了解决生成者不知道消息是否真正到达broker这个问题,采用通过AMQP协议层面为我们提供了事务机制方案,但是采用事务机制实现会降低RabbitMQ的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用Confirm模式。
producer端confirm模式的实现原理
- 生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
- confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
- 在channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack 。
编程优化逻辑
对于固定消息体大小和线程数,如果消息持久化,生产者confirm(或者采用事务机制),消费者ack,那么对性能有很大的影响。消息持久化的优化没有太好方法,用更好的物理存储(SAS, SSD, RAID卡)总会带来改善。生产者confirm这一环节的优化则主要在于客户端程序的优化之上。归纳起来,客户端实现生产者confirm有三种编程方式:
- 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
- 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
- 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。
第1种
普通confirm模式最简单,publish一条消息后,等待服务器端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。
关键代码如下:
1 channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); 2 if(!channel.waitForConfirms()){ 3 System.out.println("send message failed."); 4 }
第二种
批量confirm模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish消息,然后等待服务器端confirm, 相比普通confirm模式,批量极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。
关键代码:
1 channel.confirmSelect(); 2 for(int i=0;i<batchCount;i++){ 3 channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); 4 } 5 if(!channel.waitForConfirms()){ 6 System.out.println("send message failed."); 7 }
第三种
异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。
关键代码:
1 SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); 2 channel.confirmSelect(); 3 channel.addConfirmListener(new ConfirmListener() { 4 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 5 if (multiple) { 6 confirmSet.headSet(deliveryTag + 1).clear(); 7 } else { 8 confirmSet.remove(deliveryTag); 9 } 10 } 11 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 12 System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); 13 if (multiple) { 14 confirmSet.headSet(deliveryTag + 1).clear(); 15 } else { 16 confirmSet.remove(deliveryTag); 17 } 18 } 19 }); 20 21 while (true) { 22 long nextSeqNo = channel.getNextPublishSeqNo(); 23 channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); 24 confirmSet.add(nextSeqNo); 25 }
消息确认(Consumer端)
为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。
采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。
当noAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。
RabbitMQ管理平台界面上可以看到当前队列中Ready状态和Unacknowledged状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者但是未收到ack信号的消息数。也可以通过命令行来查看上述信息:rabbitmqctl list_queues name messages_ready messages_unacknowledged
代码示例:
1 public static void main(String[] args) throws IOException { 2 ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory(); 3 Connection connection = connectionFactory.createConnection(); 4 Channel channel = connection.createChannel(false); 5 /** 6 * 创建队列申明 7 */ 8 boolean durable = true; 9 channel.queueDeclare(RabbitConfig.QUEUE_TOPIC2, durable, false, false, null); 10 11 /** 12 * 绑定队列到交换机 13 */ 14 channel.queueBind(RabbitConfig.QUEUE_TOPIC2, EXCHANGE_TOPIC, "commodity.*"); 15 16 17 /** 18 * 改变分发规则 19 */ 20 channel.basicQos(1); 21 DefaultConsumer consumer = new DefaultConsumer(channel) { 22 @Override 23 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 24 super.handleDelivery(consumerTag, envelope, properties, body); 25 System.out.println("[2] 接口数据 : " + new String(body, "utf-8")); 26 try { 27 Thread.sleep(200); 28 } catch (InterruptedException e) { 29 e.printStackTrace(); 30 } finally { 31 System.out.println("[2] done! "); 32 //消息应答:手动回执,手动确认消息 33 channel.basicAck(envelope.getDeliveryTag(), false); 34 } 35 } 36 }; 37 //监听队列 38 /** 39 * autoAck 消息应答 40 * 默认轮询分发打开:true :这种模式一旦rabbitmq将消息发送给消费者,就会从内存中删除该消息,不关心客户端是否消费正常。 41 * 使用公平分发需要关闭autoAck:false 需要手动发送回执 42 */ 43 boolean autoAck = false; 44 channel.basicConsume(RabbitConfig.QUEUE_TOPIC2, autoAck, consumer); 45 }
broker将在下面的情况中对消息进行confirm:
- broker发现当前消息无法被路由到指定的queues中(如果设置了mandatory属性,则broker会发送basic.return)
- 非持久属性的消息到达了其所应该到达的所有queue中(和镜像queue中)
- 持久消息到达了其所应该到达的所有queue中(和镜像中),并被持久化到了磁盘(fsync)
- 持久消息从其所在的所有queue中被consume了(如果必要则会被ack)
basicRecover:是路由不成功的消息可以使用recovery重新发送到队列中。
basicReject:是接收端告诉服务器这个消息我拒绝接收,不处理,可以设置是否放回到队列中还是丢掉,而且只能一次拒绝一个消息,官网中有明确说明不能批量拒绝消息,为解决批量拒绝消息才有了basicNack。
basicNack:可以一次拒绝N条消息,客户端可以设置basicNack方法的multiple参数为true,服务器会拒绝指定了delivery_tag的所有未确认的消息(tag是一个64位的long值,最大值是9223372036854775807)。