zoukankan      html  css  js  c++  java
  • 第四节:RabbitMq剖析生产者、消费者的几种消息确认机制(Conform、事务、自动、手动)

    一. 生产者-确认机制

    1. Confirm模式

    (1). 含义:就是应答模式,生产者发送一条消息之后,Rabbitmq服务器做了个响应,表示收到了。

    (2). 特点:异步模式,在应之前,可以继续发送消息,单条消息、批量消息均可继续发送。

    (3). 核心代码:单条消息确认: channel.waitForConfirms()

             批量消息确认: channel.waitForConfirmsOrDie()

             异步监听消息确认:channel.addConfirmListener()

    PS: 大致流程:channel.ConfirmSelect();开启确认模式→发送消息→提供一个回执方法WaitForConfirms(); 返回一个bool 值

    代码分享:

        /// <summary>
        /// 生产者-Confirm模式
        /// </summary>
        public class ProductionConfirm
        {
            public static void Show()
            {
                Console.ForegroundColor = ConsoleColor.Red;
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    //创建通道channel
                    using (var channel = connection.CreateModel())
                    {
                        Console.WriteLine("-------------------------生产者准备就绪-----------------------------");
                        channel.QueueDeclare(queue: "ConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.ExchangeDeclare(exchange: "ConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        channel.QueueBind(queue: "ConfirmQueue", exchange: "ConfirmExchange", routingKey: "ConfirmSelectKey");
                        string message = "";
                        //在控制台输入消息,按enter键发送消息              
                        while (!message.Equals("stop", StringComparison.CurrentCultureIgnoreCase))
                        {
                            Console.WriteLine("请输入要发送的消息:");
                            message = Console.ReadLine();
                            var body = Encoding.UTF8.GetBytes(message);
                            try
                            {
                                //开启消息确认模式
                                channel.ConfirmSelect();
                                //发送消息
                                channel.BasicPublish(exchange: "ConfirmExchange", routingKey: "ConfirmSelectKey", basicProperties: null, body: body);
                                if (channel.WaitForConfirms())   //单条消息确认
                                {
                                    //表示消息发送成功(已经存入队列)
                                    Console.WriteLine($"【{message}】发送到Broke成功!");
                                }
                                else
                                { 
                                    //表示消息发送失败
                                }
                                //channel.WaitForConfirmsOrDie();//如果所有消息发送成功 就正常执行, 如果有消息发送失败;就抛出异常;
                            }
                            catch (Exception)
                            {
                                //表示消息发送失败
                                Console.WriteLine($"【{message}】发送到Broker失败!");
                            }
                        }                   
                    }
                }
                Console.ReadKey();
            }
        }
    View Code

    运行效果:

    2. TX事务模式

    (1). 含义:基于AMPQ协议;可以让信道设置成一个带事务的信道,分为三步:开启事务、提交事务、事务回滚

    (2). 特点:同步模式,在事务提交之前不能继续发送消息,该模式相比Confirm模式效率差一点。

    (3). 核心代码:channel.TxSelect(); 开启一个事务

             channel.TxCommit();提交事务, 这一步成功后,消息才真正的写入队列

           channel.TxRollback(); //事务回滚

    代码分享:

      /// <summary>
        ///生产者-事务模式
        /// </summary>
        public class ProductionTx
        {
            public static void Show()
            {
                Console.ForegroundColor = ConsoleColor.Red;
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                { 
                    //创建通道channel
                    using (var channel = connection.CreateModel())
                    {
                        Console.WriteLine("---------------生产者准备就绪-------------------");
                        channel.QueueDeclare(queue: "MessageTxQueue01", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queue: "MessageTxQueue02", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        //声明交换机exchang
                        channel.ExchangeDeclare(exchange: "MessageTxQueueExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        //绑定exchange和queue
                        channel.QueueBind(queue: "MessageTxQueue01", exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey01");
                        channel.QueueBind(queue: "MessageTxQueue02", exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey02");
                        string message = "";
                        //发送消息,在控制台输入消息,按enter键发送消息
                        while (!message.Equals("stop", StringComparison.CurrentCultureIgnoreCase))
                        {
                            Console.WriteLine("请输入要发送的消息:");
                            message = Console.ReadLine();
                            var body = Encoding.UTF8.GetBytes(message);
                            try
                            {
                                //① 开启事务机制
                                channel.TxSelect(); //事务是协议支持的
                                //发送消息,同时给多个队列发送消息;要么都成功;要么都失败;
                                channel.BasicPublish(exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey01", basicProperties: null, body: body);
                                channel.BasicPublish(exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey02", basicProperties: null, body: body);
    
                                //int.Parse("dfdsfdf"); //模拟错误,消息发送不成功,进入catch,事务回滚
    
                                //② 事务提交
                                channel.TxCommit(); //只有事务提交成功以后,才会真正的写入到队列里面去
                                Console.WriteLine($"【{message}】发送到Broke成功!");
                            }
                            catch (Exception ex)
                            {
                                Console.WriteLine($"【{message}】发送到Broker失败!{ex.Message}");
                                //③ 事务回滚
                                channel.TxRollback(); 
                            }
                        }
                        Console.Read();
                    }
                }
            }
        }
    View Code

    运行效果:

    二. 消费者-确认机制

    1. 自动确认

    (1) 含义:是消费消息的时候,只有收到消息,就直接给RabbitMQ应答,直接总览该队列中所有消息了

    (2) 特点:只是消费成功了一条消息,RabbitMQ也会认为你是全部成功了,会把该队列中所有消息从队列中移除,这样会导致消息的丢失

    (3) 核心代码:autoAck: true ,表示自动确认

    (4) 案例演示:生产者先启动,3s后启动消费者,然后消费者消费1条消息的时候,进行停顿一段时间,发现队列中不止少了1条消息,少了很多,说明第1次应答,将当时队列中的消息全部删除了。

    配套生产者代码: 

        /// <summary>
        /// 普通的生产者
        /// (用于配合演示消费者的两种应答机制)
        /// </summary>
        public class ProductionConfirmPh
        {
            public static void Show()
            {
                Console.ForegroundColor = ConsoleColor.Red;
    
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        Console.WriteLine("---------------------生产者准备就绪---------------------------------");
                        channel.QueueDeclare(queue: "ConsumerConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        //声明交换机exchang
                        channel.ExchangeDeclare(exchange: "ConsumerConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        //绑定exchange和queue
                        channel.QueueBind(queue: "ConsumerConfirmQueue", exchange: "ConsumerConfirmExchange", routingKey: "ConsumerConfirmKey");
    
                        for (int i = 1; i <= 1000; i++)
                        {
                            string message = $"消息{i}";
                            channel.BasicPublish(exchange: "ConsumerConfirmExchange",
                                             routingKey: "ConsumerConfirmKey",
                                             basicProperties: null,
                                             body: Encoding.UTF8.GetBytes(message));                      
                            Console.WriteLine($"【{message}】 已发送~~~");
                            Thread.Sleep(500);
                        }
                        Console.Read();
                    }
                }
            }
        }
    View Code

    消费者代码

    /// <summary>
        /// 消费者-自动确认
        /// (需要配合生产者,让生产者03/ProductionConfirmPh先启动)
        /// </summary>
        public class ConsumerAutoConfirm
        {
            public static void Show1()
            {
                Console.ForegroundColor = ConsoleColor.Green;
    
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {                   
                        //定义消费者                                      
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            Console.WriteLine(Encoding.UTF8.GetString(ea.Body.ToArray()));
                        };
                        Console.WriteLine("消费者准备就绪....");
                        //autoAck: true 自动确认
                        channel.BasicConsume(queue: "ConsumerConfirmQueue", autoAck: false, consumer: consumer);
                        Console.ReadKey();
                    }
                }
            }
    
    
            public static void Show2()
            {
                Console.ForegroundColor = ConsoleColor.Green;
                var factory = new ConnectionFactory
                {
                    HostName = "localhost",//RabbitMQ服务在本地运行
                    UserName = "guest",//用户名
                    Password = "guest"//密码 
                };
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //定义消费者                                      
                        var consumer = new EventingBasicConsumer(channel);
                        int i = 0;
                        consumer.Received += (model, ea) =>
                        {
                            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                            if (i < 50)
                            {
                                Console.WriteLine($"第{i+1}次消费,内容为:{message}");
                                Thread.Sleep(10000000);
                            }
                            else
                            {
                                Console.WriteLine($"第{i + 1}次消费,内容为:{message}");
                            }
                            i++;
                        };
                        Console.WriteLine("------------------消费者准备就绪-------------------------");
    
                        //autoAck: true 自动确认
                        channel.BasicConsume(queue: "ConsumerConfirmQueue", autoAck: true, consumer: consumer);
                        Console.ReadKey();
                    }
                }
            }
        }
    View Code

    结果现象截图

    2. 显式确认(手动确认)

    (1) 含义:消费者消费一条,回执给RabbitMq一条消息,Rabbitmq 只删除当前这一条消息;相当于是一条消费了,删除一条消息,性能稍微低一些;

    (2) 特点:消费1条应答一次,可以告诉RabbitMq消费成功or失败,消费成功,服务器删除该条消息,消费失败,可以删除也可以重新写入。

    (3) 核心代码:autoAck: false,表示不自动确认

          然后:hannel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 表示消费成功

          channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); 表示消费失败, 可以配置:requeue: true:重新写入到队列里去; false: 删除消息

    (4) 案例演示:生产者先启动,3s后启动消费者,然后消费者消费1条消息的时候,进行停顿一段时间,发现队列中仅少了1条消息,说明应答一次,删除1条。

    配套生产者代码: 

        /// <summary>
        /// 普通的生产者
        /// (用于配合演示消费者的两种应答机制)
        /// </summary>
        public class ProductionConfirmPh
        {
            public static void Show()
            {
                Console.ForegroundColor = ConsoleColor.Red;
    
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服务在本地运行
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        Console.WriteLine("---------------------生产者准备就绪---------------------------------");
                        channel.QueueDeclare(queue: "ConsumerConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        //声明交换机exchang
                        channel.ExchangeDeclare(exchange: "ConsumerConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        //绑定exchange和queue
                        channel.QueueBind(queue: "ConsumerConfirmQueue", exchange: "ConsumerConfirmExchange", routingKey: "ConsumerConfirmKey");
    
                        for (int i = 1; i <= 1000; i++)
                        {
                            string message = $"消息{i}";
                            channel.BasicPublish(exchange: "ConsumerConfirmExchange",
                                             routingKey: "ConsumerConfirmKey",
                                             basicProperties: null,
                                             body: Encoding.UTF8.GetBytes(message));                      
                            Console.WriteLine($"【{message}】 已发送~~~");
                            Thread.Sleep(500);
                        }
                        Console.Read();
                    }
                }
            }
        }
    View Code

    消费者代码

    /// <summary>
        /// 消费者-手动确认
        /// (需要配合生产者,让生产者03/ProductionConfirmPh先启动)
        /// </summary>
        public class ConsumerNoAutoConfirm
        {
            public static void Show1()
            {
                Console.ForegroundColor = ConsoleColor.Green;
                var factory = new ConnectionFactory
                {
                    HostName = "localhost",//RabbitMQ服务在本地运行
                    UserName = "guest",//用户名
                    Password = "guest"//密码 
                };
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //定义消费者                                      
                        var consumer = new EventingBasicConsumer(channel);
                        int i = 0;
                        consumer.Received += (model, ea) =>
                        {
                            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                            if (i < 50)
                            {
                                //手动确认  (下面模拟消息正常消费,告诉rabbitmq,可以删除该条消息)
                                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                                Console.WriteLine(message);
                            }
                            else
                            {
                                //手动确认  (下面模拟消息异常消费,告诉rabbitmq,可以删除该条消息,也可以重新写入队列)
                                // requeue: true:重新写入到队列里去; false: 删除消息
                                channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
                            }
                            i++;
                        };
                        Console.WriteLine("------------------消费者准备就绪-------------------------");
    
                        //处理消息 autoAck: false  显示确认(手动确认) 
                        channel.BasicConsume(queue: "ConsumerConfirmQueue", autoAck: false, consumer: consumer);
                        Console.ReadKey();
                    }
                }
            }
    
    
            public static void Show2()
            {
                Console.ForegroundColor = ConsoleColor.Green;
                var factory = new ConnectionFactory
                {
                    HostName = "localhost",//RabbitMQ服务在本地运行
                    UserName = "guest",//用户名
                    Password = "guest"//密码 
                };
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //定义消费者                                      
                        var consumer = new EventingBasicConsumer(channel);
                        int i = 0;
                        consumer.Received += (model, ea) =>
                        {
                            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                            if (i < 50)
                            {
                                //手动确认  (下面模拟消息正常消费,告诉rabbitmq,可以删除该条消息)
                                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                                Console.WriteLine($"第{i + 1}次消费,内容为:{message}");
    
                                Thread.Sleep(10000000);
                            }
                            else
                            {
                                //手动确认  (下面模拟消息异常消费,告诉rabbitmq,可以删除该条消息,也可以重新写入队列)
                                // requeue: true:重新写入到队列里去; false: 删除消息
                                channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
                            }
                            i++;
                        };
                        Console.WriteLine("------------------消费者准备就绪-------------------------");
    
                        //处理消息 autoAck: false  显示确认(手动确认) 
                        channel.BasicConsume(queue: "ConsumerConfirmQueue", autoAck: false, consumer: consumer);
                        Console.ReadKey();
                    }
                }
            }
        }
    View Code

    结果现象截图

     

    !

    • 作       者 : Yaopengfei(姚鹏飞)
    • 博客地址 : http://www.cnblogs.com/yaopengfei/
    • 声     明1 : 如有错误,欢迎讨论,请勿谩骂^_^。
    • 声     明2 : 原创博客请在转载时保留原文链接或在文章开头加上本人博客地址,否则保留追究法律责任的权利。
     
  • 相关阅读:
    八. 输入输出(IO)操作2.面向字符的输入流
    八. 输入输出(IO)操作1.输入输出基本概念
    七. 多线程编程11.线程的挂起、恢复和终止
    七. 多线程编程10.线程死锁
    nginx 配置身份验证 http_auth_basic_module
    liunx mysql 备份
    8080 端口只允许内网访问配置
    nginx 配置白名单
    liunx tomcat 运行模式apr
    liunx contos 7.4 安装redis集群
  • 原文地址:https://www.cnblogs.com/yaopengfei/p/14679576.html
Copyright © 2011-2022 走看看