zoukankan      html  css  js  c++  java
  • RabbitMQ入门_06_深入了解ack

    A. Delivery Tag

    参考资料:https://www.rabbitmq.com/confirms.html

    仔细查看一下 Consumer 的回调方法:

                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    ......
                    consumerChannel1.basicAck(envelope.getDeliveryTag(), false);
                }
    

    当我们需要确认一条消息已经被消费时,我们调用的 basicAck 方法的第一个参数是 Delivery Tag。

    Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。

    RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。

    运行下面的例子可以直观的看到这点:

    gordon.study.rabbitmq.ack.TestAckBasic.java

    public class TestAckBasic {
     
        private static final String QUEUE_NAME = "hello";
     
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            final Channel consumerChannel1 = connection.createChannel();
            consumerChannel1.queueDeclare(QUEUE_NAME, false, false, false, null);
            consumerChannel1.basicQos(3);
            Consumer consumer1 = new DefaultConsumer(consumerChannel1) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.printf("in consumer A (delivery tag is %d): %s
    ", envelope.getDeliveryTag(), message);
                    try {
                        TimeUnit.MILLISECONDS.sleep(200);
                    } catch (InterruptedException e) {
                    }
                    consumerChannel1.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            consumerChannel1.basicConsume(QUEUE_NAME, false, consumer1);
     
            final Channel consumerChannel2 = connection.createChannel();
            consumerChannel2.basicQos(3);
            Consumer consumer2 = new DefaultConsumer(consumerChannel2) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.printf("in consumer B (delivery tag is %d): %s
    ", envelope.getDeliveryTag(), message);
                    try {
                        TimeUnit.MILLISECONDS.sleep(200);
                    } catch (InterruptedException e) {
                    }
                    consumerChannel2.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            consumerChannel2.basicConsume(QUEUE_NAME, false, consumer2);
     
            Channel senderChannel = connection.createChannel();
            for (int i = 0; i < 10;) {
                String message = "NO. " + ++i;
                TimeUnit.MILLISECONDS.sleep(100);
                senderChannel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            }
            senderChannel.close();
        }
    }
    

    result:

    in consumer A (delivery tag is 1): NO. 1
    in consumer B (delivery tag is 1): NO. 2
    in consumer A (delivery tag is 2): NO. 3
    in consumer B (delivery tag is 2): NO. 4
    in consumer A (delivery tag is 3): NO. 5
    in consumer B (delivery tag is 3): NO. 6
    in consumer A (delivery tag is 4): NO. 7
    in consumer B (delivery tag is 4): NO. 8
    in consumer A (delivery tag is 5): NO. 9
    in consumer B (delivery tag is 5): NO. 10
    

    可见,两个信道的 delivery tag 分别从 1 递增到 5。(如果修改代码,将两个 Consumer 共享同一个信道,则 delivery tag 是从 1 递增到 10,参考 gordon.study.rabbitmq.ack.TestAckInOneChannel.java

    basicAck 方法的第二个参数 multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认;如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认。(批量确认针对的是整个信道,参考gordon.study.rabbitmq.ack.TestBatchAckInOneChannel.java。)

    对同一消息的重复确认,或者对不存在的消息的确认,会产生 IO 异常,导致信道关闭。

    B. 忘了确认会怎样

    如果我们注释掉22行,让 consumerChannel1 不再确认消息,世界会怎样?

    Unacked messages

    只要程序还在运行,这3条消息就一直是 Unacked 状态,无法被 RabbitMQ 重新投递。更厉害的是,RabbitMQ 消息消费并没有超时机制,也就是说,程序不重启,消息就永远是 Unacked 状态。处理运维事件时不要忘了这些 Unacked 状态的消息

    当程序关闭时(实际只要 Consumer 关闭就行),这3条消息会恢复为 Ready 状态。

    C. 取消确认

    当消费消息出现异常时,我们需要取消确认,这时我们可以使用 Channel 的 basicReject 方法。

        void basicReject(long deliveryTag, boolean requeue) throws IOException;
    

    第一个参数指定 delivery tag,第二个参数说明如何处理这个失败消息。requeue 值为 true 表示该消息重新放回队列头,值为 false 表示放弃这条消息

    一般来说,如果是系统无法处理的异常,我们一般是将 requeue 设为 false,例如消息格式错误,再处理多少次也是异常。调用第三方接口超时这类异常 requeue 应该设为 true。

    从 basicReject 方法参数可见,取消确认不支持批量操作(类似于 basicAck 的 multiple 参数)。所以,RabbitMQ 增加了 basicNack 方法以提供批量取消能力。参考 https://www.rabbitmq.com/nack.html

    PS:Reject 的消息重新推送来时,delivery tag 就是新的值了。

  • 相关阅读:
    Ubuntu下Sublime Text 2优化配置
    Ubuntu14.04 设置wifi热点
    我是如何从程序小白成为码农的
    eclipse 配置黑色主题
    经典面试题(1):统计整数中1的个数
    Matlab一个错误引发的血案:??? Error using ==> str2num Requires string or character array input.
    折腾到死:matlab7.0 安装
    VMware 与Ubuntu通过samba服务器共享文件
    大自然的搬运工:Ubuntu环境下gedit的一些个简单配置
    UML(Unified Model Language)统一建模语言
  • 原文地址:https://www.cnblogs.com/gordonkong/p/6952957.html
Copyright © 2011-2022 走看看