zoukankan      html  css  js  c++  java
  • RabbitMQ(八)——消息确认

    RabbitMQ系列

    RabbitMQ(一)——简介

    RabbitMQ(二)——模式类型

    RabbitMQ(三)——简单模式

    RabbitMQ(四)——工作队列模式

    RabbitMQ(五)——发布订阅模式

    RabbitMQ(六)——路由模式

    RabbitMQ(七)——主题模式

    RabbitMQ(八)——消息确认

    RabbitMQ(九)——消息持久化

    RabbitMQ(十)——消息优先级

     前言

      为什么要使用消息确认?

        某些场景中需要确保每条消息都要被成功处理,消息确认分为两种:

      • 一种是生产者发送消息到Broker时,Broker给生产者发送消息确认回执,告诉生产者消息已被成功发送到Broker。
      • 另外一种是消费者接收到Broker发送的消息时,消费者给Broker发送确认回执,用于通知消息已被成功消费。

    第一种:生产者端消息确认

      生产者向Broker发送消息,Broker接收到消息后给生产者发送确认回执。主要有两种方式:Tx机制模式和Confirm模式

      Tx机制模式:

        Tx机制可以叫事务机制,RabbitMQ中有三个Tx机制的方法:TxSelect(),TxCommit(),TxRollback()。

        channel.TxSelect()将当前channel设置成transaction模式开启事务,channel.TxCommit()提交事务,channel.TxRollback()回滚事务使用Tx机制,首先使用channel.TxSelect()开启事务,然后发布消息给Broker服务器,如果执行channel.TxCommit()成功了表示消息被Broker接收了。当channel.TxCommit()提交时失败,可以捕获异常然后channel.TxRollback()回滚事务。

    channel.TxSelect();
                try {
                    for (int i = 0; i < 10; i++)
                    {
                        string msg = $"第{i + 1}条消息";
                        //5.发布消息
                        channel.BasicPublish("", "simple", null, Encoding.UTF8.GetBytes(msg));
                        Console.WriteLine($"已发送消息:{msg}");
                    }
                    channel.TxCommit();
                } 
                catch
                {
                    channel.TxRollback();
                }
    View Code

       

      Confirm模式:

        Confirm主要方法:ConfirmSelect() , WaitForCnofirms()和WaitForCnofirmOrDie()。

        channel.ConfirmSelect()表示开启Confirm模式。channel.WaitForConfirms()等待所有消息确认,如果所有的消息都被服务端成功接收返回true,只要有一条没有被成功接收就返回false。channel.WaitForConfirmsOrDie()与channel.WaitForConfirms()相似,也是等待消息确认,区别在于该方法没有返回值,只要有任意一条消息没被成功接收,会抛出一个OperationInterrupedException类型异常; 

            //
            // 摘要:
            //     Wait until all published messages have been confirmed.
            //
            // 参数:
            //   timeout:
            //     How long to wait (at most) before returning whether or not any nacks were returned.
            //
            //   timedOut:
            //     True if the method returned because the timeout elapsed, not because all messages
            //     were ack'd or at least one nack'd.
            //
            // 返回结果:
            //     True if no nacks were received within the timeout, otherwise false.
            //
            // 言论:
            //     Waits until all messages published since the last call have been either ack'd
            //     or nack'd by the broker. Returns whether all the messages were ack'd (and none
            //     were nack'd). Note, throws an exception when called on a non-Confirm channel.
            [AmqpMethodDoNotImplementAttribute(null)]
            bool WaitForConfirms(TimeSpan timeout, out bool timedOut);
            //
            // 摘要:
            //     Wait until all published messages have been confirmed.
            //
            // 参数:
            //   timeout:
            //     How long to wait (at most) before returning whether or not any nacks were returned.
            //
            // 返回结果:
            //     True if no nacks were received within the timeout, otherwise false.
            //
            // 言论:
            //     Waits until all messages published since the last call have been either ack'd
            //     or nack'd by the broker. Returns whether all the messages were ack'd (and none
            //     were nack'd). Note, throws an exception when called on a non-Confirm channel.
            [AmqpMethodDoNotImplementAttribute(null)]
            bool WaitForConfirms(TimeSpan timeout);
            //
            // 摘要:
            //     Wait until all published messages have been confirmed.
            //
            // 言论:
            //     Waits until all messages published since the last call have been either ack'd
            //     or nack'd by the broker. Returns whether all the messages were ack'd (and none
            //     were nack'd). Note, throws an exception when called on a non-Confirm channel.
            [AmqpMethodDoNotImplementAttribute(null)]
            bool WaitForConfirms();
            //
            // 摘要:
            //     Wait until all published messages have been confirmed.
            //
            // 言论:
            //     Waits until all messages published since the last call have been ack'd by the
            //     broker. If a nack is received, throws an OperationInterrupedException exception
            //     immediately.
            [AmqpMethodDoNotImplementAttribute(null)]
            void WaitForConfirmsOrDie();
            //
            // 摘要:
            //     Wait until all published messages have been confirmed.
            //
            // 言论:
            //     Waits until all messages published since the last call have been ack'd by the
            //     broker. If a nack is received or the timeout elapses, throws an OperationInterrupedException
            //     exception immediately.
            [AmqpMethodDoNotImplementAttribute(null)]
            void WaitForConfirmsOrDie(TimeSpan timeout);        
    Confirm API
    //开启事务
                //channel.TxSelect();
                channel.ConfirmSelect();
                try {
                    for (int i = 0; i < 10; i++)
                    {
                        string msg = $"第{i + 1}条消息";
                        //5.发布消息
                        channel.BasicPublish("", "simple", null, Encoding.UTF8.GetBytes(msg));
                        Console.WriteLine($"已发送消息:{msg}");
                    }
                    //channel.TxCommit();
                    channel.WaitForConfirmsOrDie();
                    Console.WriteLine("发送完成");
                } 
                catch(Exception ex)
                {
                    //channel.TxRollback();
                    Console.WriteLine(ex.Message);
                }
    View Code

     

    第二种:消费者端消息确认

      Broker发送到消费者时,提供了两种消息确认方式:自动确认和显示确认。

      自动确认:

        当Broker向消费者发送消息后,不等消息处理结束,立即自动返回一个确认回执。使用:设置消费方法的参数autoAck为true。当Broker接收到确认回执时会删除消息,如果消费者在接收到消息并返回了确认回执后,然后消息还没处理消费者挂了,这条消息就找不回了。

    channel.BasicConsume("simple", true, consumer);

      显示确认:

        自动确认会出现消息丢失,如果让消息处理完再返回确认回执那么久不存在消息丢失问题了,这就是显示确认的思路。使用:先设置消费方法的参数autoAck为false,再使用channel.BasicAck()或channel.BasicReject()方法确认、拒绝消息。

    //确认消息:deliveryTag参数是分发的标记,multiple表示是否确认多条
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 
    //拒绝消息:deliveryTag参数也是分发的标记,requeue表示消息被拒绝后是否重新入队
    channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);

        当使用显示确认时,如果消费者处理完消息不发送回执,那么消息不会被删除,消息的状态一直是Unacked,这条消息也不会再发送给其他消费者。如果一个消费者在处理消息时尚未发送确认回执的情况下挂了,那么消息会被重新放入队列(状态从Unacked变成Ready),有其他消费者存在时,消息会发送给其他消费者。

    consumer.Received += (model, e) =>
                        {
                            byte[] message = e.Body.ToArray();
                            
                            //返回消息确认(true/false,自动/手动确认),没确认就不会消费掉消息
                            if (Encoding.UTF8.GetString(message).Contains("1"))
                            {
                                Console.WriteLine("接收消息:" + Encoding.UTF8.GetString(message));
                                channel.BasicAck(e.DeliveryTag, false);
                            }
                            else 
                            {
                                Console.WriteLine("拒绝消息:" + Encoding.UTF8.GetString(message));
                                //拒绝消息 false:拒绝后丢弃  true:拒绝后重新入队
                                channel.BasicReject(e.DeliveryTag, false);
                            }
                            
                        };
                        //消费者开启监听
                        channel.BasicConsume("simple", false, consumer);
    View Code

  • 相关阅读:
    免费第三方API平台整合
    接口使用数据库缓存考虑的不周到之处
    找了两个小时的错误,net.sf.json.JSONException: JSON keys cannot be null.
    jsp动态页面访问报错:HTTP Status 500
    JAVA中json转换为集合(对象)之间的相互转换
    听头条
    使用DataOutputStream输出流的read方法出现读取字节不一致解决办法,本地和测试环境不一致
    ibatis中的xml配置文件
    poj 1325 Machine Schedule 题解
    poj 1469 COURSES 题解
  • 原文地址:https://www.cnblogs.com/zousc/p/12835979.html
Copyright © 2011-2022 走看看