zoukankan      html  css  js  c++  java
  • rabbitmq 消息的状态转换

    tutorial:http://www.rabbitmq.com/tutorials/tutorial-two-java.html

    这里解释接收消息端关于 acknowledge和prefetch的设置问题

    这里有两段代码,sender,负责发送100条消息; recv,负责接收消息,每接收到一条消息sleep 1 秒。

     1 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     2         //channel.basicQos(1,false);
     3         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
     4 
     5         Consumer consumer = new DefaultConsumer(channel) {
     6             @Override
     7             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
     8                     throws IOException {
     9                 String message = new String(body, "UTF-8");
    10                 System.out.println(" [x] Received '" + message + "'");
    11                 try{
    12                     Thread.sleep(1000);
    13                 }catch (Exception e){
    14 
    15                 }
    16                 //channel.basicAck(envelope.getDeliveryTag(), false);
    17 
    18             }
    19         };
    20         channel.basicConsume(QUEUE_NAME, true, consumer);

    先展示一个基础版本的recv。

    运行这段代码遇到两个费解的现象

    1. 先启动sender产生100条消息,后启动两个recv,发现所有消息都被第一个启动的recv接收了,第二个recv没有接收到任何消息。但如果先两个recv,再启动sender,那么两个recv是平均分配消息的。

    2. 先启动sender,启动第一个recv,处理10条消息,关掉recv,再启动第二个recv,第二个recv没有消息。11~100条消息丢失了。

    为了说明上面的问题,我们先描述一下一个消息在rmq server端的状态

    在server端,消息进入queue后,首先是ready状态。如果有recv 消费这条消息,那么消息进入 unack 状态,当recv ack这条消息后,server端将删除消息。

    现象1的解释:

    先启动sender,server端有100个ready 消息。启动recv1,虽然recv1还没有来得及处理这些消息,但recv1 接收了100个消息。在server端100个消息都进入unack状态。因为我们设置的ack方式是autoack, line 20

    因此所有消息立刻就从server端删除了。当我再启动recv2时,队列已经没有消息了,所以recv2没有接收到任何消息。反过来,先启动两个recv,sender生产消息的时候,server会按round robbin方式分配消息,因此两个recv各接收50条消息

    现象2的解释:

    因为消息都已经发送给了recv1,server端收到了ack,删除了消息,系统中只有recv1缓存了消息,如果关掉recv1,所有消息都会丢失。recv2无法拿到recv1未处理的消息。

    下面我们看看改进版本。变化在于

    • line 20 autoack 改为false
    • line 16 每条消息处理后ack
     1 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     2         //channel.basicQos(1,false);
     3         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
     4 
     5         Consumer consumer = new DefaultConsumer(channel) {
     6             @Override
     7             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
     8                     throws IOException {
     9                 String message = new String(body, "UTF-8");
    10                 System.out.println(" [x] Received '" + message + "'");
    11                 try{
    12                     Thread.sleep(1000);
    13                 }catch (Exception e){
    14 
    15                 }
    16                 channel.basicAck(envelope.getDeliveryTag(), false);
    17 
    18             }
    19         };
    20         channel.basicConsume(QUEUE_NAME, false, consumer);

    可以发现现象1 依旧,但现象2 不同了。我们可以通过命令查看server端的状态

    rabbitmqctl list_queues name messages_unacknowledged messages_ready

    启动sender后 unack=0,ready=100

    启动recv1 后 unack=100, ready=0,所有消息都给了recv1

    等recv1 处理几条消息后 unack=89, ready=0,recv1处理并ack了11条消息,

    关闭recv1 后 unack=0, ready=85,未ack的消息都回到ready状态

    启动recv2,unack=89, ready =0,所有消息都转给了recv2.

    如果消息量很大,那么缓存消息就可能吃掉recv的所有内存导致系统崩溃。因此我们打开 line2。这样recv在ack一个消息后才会领取下一个消息。再来看看按照上面流程queue里消息的状态

    启动sender后 unack=0,ready=100

    启动recv1 后,unack=1, ready=99

    启动recv2后,unack=2, ready=95

    在消息处理完前,unack都是2,recv1和recv2 各持有一个消息

  • 相关阅读:
    [BJOI2019] 光线
    C# 从零开始写 SharpDx 应用 笔刷
    BAT 脚本判断当前系统是 x86 还是 x64 系统
    BAT 脚本判断当前系统是 x86 还是 x64 系统
    win2d 通过 CanvasActiveLayer 画出透明度和裁剪
    win2d 通过 CanvasActiveLayer 画出透明度和裁剪
    PowerShell 拿到显卡信息
    PowerShell 拿到显卡信息
    win10 uwp 如何使用DataTemplate
    win10 uwp 如何使用DataTemplate
  • 原文地址:https://www.cnblogs.com/englefly/p/8435998.html
Copyright © 2011-2022 走看看