前文中也多次提到消息传输的一些概念,这一篇比较全面的介绍一下,然后补充一些内容。
消息的应答
RabbitMQ有两种应答模式,自动和手动。这也是AMQP协议所推荐的。这在point-to-point和broadcast都是一样的。
自动应答-当RabbitMQ把消息发送到接收端,接收端把消息出队列的时候就自动帮你发应答消息给服务。
手动应答-需要我们开发人员手动去调用ack方法去告诉服务已经收到。
文档推荐在大数据传输中,如果对个别消息的丢失不是很敏感的话选用自动应答比较理想,而对于那些一个消息都不能丢的场景,需要选用手动应答,也就是说在正确处理完以后才应答。如果选择了自动应答,那么消息重发这个功能就没有了。
消息的拒收
拒收,是接收端在收到消息的时候响应给RabbitMQ服务的一种命令,告诉服务器不应该由我处理,或者拒绝处理,扔掉。接收端在发送reject命令的时候可以选择是否要重新放回queue中。如果没有其他接收者监控这个queue的话,要注意一直无限循环发送的危险。
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
channel.BasicReject(ea.DeliveryTag, false);
BasicReject方法第一个参数是消息的DeliveryTag,对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序来表示:1,2,3,4 等等。第二个参数是是否放回queue中,requeue。
BasicReject一次只能拒绝接收一个消息,而BasicNack方法可以支持一次0个或多个消息的拒收,并且也可以设置是否requeue。
channel.BasicNack(3, true, false);
在第一个参数DeliveryTag中如果输入3,则消息DeliveryTag小于等于3的,这个Channel的,都会被拒收。
消息的QoS
QoS = quality-of-service, 顾名思义,服务的质量。通常我们设计系统的时候不能完全排除故障或保证说没有故障,而应该设计有完善的异常处理机制。在出现错误的时候知道在哪里出现什么样子的错误,原因是什么,怎么去恢复或者处理才是真正应该去做的。在接收消息出现故障的时候我们可以通过RabbitMQ重发机制来处理。重发就有重发次数的限制,有些时候你不可能不限次数的重发,这取决于消息的大小,重要程度和处理方式。
甚至QoS是在接收端设置的。发送端没有任何变化,接收端的代码也比较简单,只需要加如下代码:
channel.BasicQos(0, 1, false);
代码第一个参数是可接收消息的大小的,但是似乎在客户端2.8.6版本中它必须为0,即使:不受限制。如果不输0,程序会在运行到这一行的时候报错,说还没有实现不为0的情况。第二个参数是处理消息最大的数量。举个例子,如果输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息,消息只会在队列中阻塞。如果输入3,那么可以最多有3个消息不应答,如果到达了3个,则发送端发给这个接收方得消息只会在队列中,而接收方不会有接收到消息的事件产生。总结说,就是在下一次发送应答消息前,客户端可以收到的消息最大数量。第三个参数则设置了是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的。
这种数量的设置,也为我们在多个客户端监控同一个queue的这种负载均衡环境下提供了更多的选择。