zoukankan      html  css  js  c++  java
  • rabbitmq系列——(5 消息确认 -- 消费者 自动确认和手动确认)

      消费者消息确认分两种:自动确认、手动确认。

      自动确认,消费者消费消息时,只要收到消息就回馈rabbitmq服务,

        并且消费成功一条消息后,rabbitmq会认为所有消息全部成功消费,队列中移除所有消息,会导致消息的丢失;

      手动确认,消费一条消息,回馈rabbitmq服务,rabbitmq只移除队列中消费了的消息;

    1. 生产者

    using RabbitMQMsgProducer.MessageProducer;
    using Microsoft.Extensions.Configuration;
    using System;
    using System.IO;
    using RabbitMQMsgProducer.ExchangeDemo;
    using RabbitMQMsgProducer.MessageConfirm;
    
    namespace RabbitMQMsgProducer
    {
        class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    {
                        // 消费者 确认消息
                        ConsumerMsgConfirm.Send();
                    }
                    Console.ReadLine();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    }
    using RabbitMQ.Client;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    
    namespace RabbitMQMsgProducer.MessageConfirm
    {
        public class ConsumerMsgConfirm
        {
            public static void Send()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//服务地址
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                string queueName = "ConsumerMsgConfirmQueue";
                string exchangeName = "ConsumerMsgConfirmExchange";
                string routingKeyName = "ConsumerMsgConfirmRoutingKey";
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        // 声明队列
                        channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                        // 声明交换机exchange
                        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        // 绑定exchange和queue
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKeyName);
    
                        for (int i = 1; i <= 100; i++)
                        {
                            string message = $"the message is : {i} .";
                            channel.BasicPublish(exchange: exchangeName, routingKey: routingKeyName, basicProperties: null, body: Encoding.UTF8.GetBytes(message));
    
                            Thread.Sleep(300);
                            Console.WriteLine($"the message is : {i} . is send .");
                        }
                        Console.Read();
    
                    }
                }
            }
        }
    }

    2. 消费者

    using RabbitMQMsgConsumer001.ExchangeDemo;
    using RabbitMQMsgConsumer001.MessageConfirm;
    using RabbitMQMsgConsumer001.MessageConsumer;
    using System;
    using System.Threading.Tasks;
    
    namespace RabbitMQMsgConsumer001
    {
        class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    {
                        // 消费者 确认消息
                        ConsumerMsgConfirm.Receive();
                    }
                    Console.ReadLine();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    }
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Text;
    
    namespace RabbitMQMsgConsumer001.MessageConfirm
    {
        public class ConsumerMsgConfirm
        {
            public static void Receive()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//服务地址
                factory.UserName = "guest";//用户名
                factory.Password = "guest";//密码 
                string queueName = "ConsumerMsgConfirmQueue";
                string exchangeName = "ConsumerMsgConfirmExchange";
                string routingKeyName = "ConsumerMsgConfirmRoutingKey";
                using (var connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        Console.WriteLine("the consumer is ready !");
                        // 声明队列
                        channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                        // 声明交换机exchange
                        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        // 绑定exchange和queue
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKeyName);
    
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        int i = 0;
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var msg = Encoding.UTF8.GetString(body.ToArray());
                            #region 自动确认
                            // 调试运行 消费第一条消息时,rabbitmq中的队列已经显示全部消费了
                            // Console.WriteLine($"the consumer received : {msg} over.");
                            #endregion
                            #region 手动确认
                            if (i < 20)
                            {
                                // 手动确认 消息已消费,通知broker 可从队列中移除这条消息
                                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                                Console.WriteLine($"the consumer received : {msg} over.");
                            }
                            else
                            {
                                // 模拟未消费此消息或消费失败,通知broker;
                                // requeue: true 重新写入队列; false 不重新写入,直接移除掉此消息
                                channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
                            }
                            #endregion
                            i++;
                        };
                        {
                            // 1.消费者自动确认 autoAck:true
                            //channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
                        }
                        {
                            // 2.消费者手动确认 autoAck:false
                            channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                        }
    
                        Console.Read();
                    }
                }
            }
        }
    }

    3. 结果

     生产者,停止生产, 重新启动消费者,将继续消费20条消息:

  • 相关阅读:
    基本二叉搜索树的第K小元素
    sklearn常见分类器(二分类模板)
    python图论包networks(最短路,最小生成树带包)
    PAT 甲级 1030 Travel Plan (30 分)(dijstra,较简单,但要注意是从0到n-1)
    PAT 甲级 1029 Median (25 分)(思维题,找两个队列的中位数,没想到)*
    Oracle 10g ORA-12154: TNS: could not resolve the connect identifier specified 问题解决! 我同事遇到的问题。 username/
    JavaScritpt的DOM初探之Node(一)
    怎样实现动态加入布局文件(避免 The specified child already has a parent的问题)
    Ubuntu 14.04下单节点Ceph安装(by quqi99)
    卡片游戏
  • 原文地址:https://www.cnblogs.com/Fletcher/p/14241994.html
Copyright © 2011-2022 走看看