zoukankan      html  css  js  c++  java
  • Rabbit--ack机制

      消息应答时执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。
      一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。

      但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。

      为了确保消息不会丢失,RabbitMQ支持消息应答

      消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。

      如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。

      没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。

      消息应答是默认打开的。我们通过显示的设置autoAsk=true关闭这种机制。现即自动应答开,一旦我们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。

    代码示例:

    生产者端代码不变,消费者端代码这部分就是用于开启手动应答模式的。

    // 监听队列,手动返回完成  
    channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer); 

    第二个参数为false则表示关闭自动应答机制,改为手动应答

    // 返回确认状态  
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

    在处理完消息时,返回应答状态。

    消费者端完整代码:

     1 static void Main(string[] args)
     2         {
     3             var factory = new ConnectionFactory();
     4             factory.HostName = " ";
     5             factory.Port = 5672;
     6             factory.UserName = " ";
     7             factory.Password = " ";
     8             factory.AutomaticRecoveryEnabled = true;
     9             using (var connection = factory.CreateConnection())
    10             {
    11                 using (var channel = connection.CreateModel())
    12                 {
    13                     channel.QueueDeclare(queue: "Test",
    14                         durable: true,
    15                         exclusive: false,
    16                         autoDelete: false,
    17                         arguments: null);
    18                  //count设置等待数量,size:消息大小,global设置channel是否与connetion同级
    19                     channel.BasicQos(prefetchCount: 3, prefetchSize: 0, global: false);
    20                     var property= channel.CreateBasicProperties();
    21                     property.Persistent = true;
    22                     Console.WriteLine(" [*] waiting for msg ");
    23                     var consumer = new EventingBasicConsumer(channel);
    24                     consumer.Received += (model, ea) =>
    25                     {
    26                         var body = ea.Body;
    27                         var msg = Encoding.UTF8.GetString(body);
    28                         Console.WriteLine("[x] reciverd {0} ", msg);
    29                         int dots = msg.Split('.').Length - 1;
    30                         Thread.Sleep(dots * 3000);
    31 
    32                         Console.WriteLine(" [x] Done ");
    33                       //手动的消息回执
    34                         channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    35                     };
    36                    //改为手动消息应答
    37                     channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
    38 
    39                     Console.WriteLine(" Press [enter] to exit ");
    40                     Console.ReadLine();
    41                 }
    42             }
    43         }    
  • 相关阅读:
    CF981D
    CF883H
    Hdu 5884
    全排列
    二叉搜索树实现
    my.ini配置详解
    主元素问题
    排序算法(还需补充)
    迷宫问题(DFS,BFS)
    算法导论4--求最大和数组
  • 原文地址:https://www.cnblogs.com/cuijl/p/8072396.html
Copyright © 2011-2022 走看看