zoukankan      html  css  js  c++  java
  • asp.net 使用rabbitmq事例

    本例asp.net 使用rabbitmq需求背景:为了提升用户体验,用户点击下单按钮,后台先做一些简单必要的操作,返回给用户一个友好提示(比如提示处理中,或者订单状态为处理中),然后发通过发消息给队列,把耗时久的操作留给rabbitmq队列处理。

     1、生产者封装类:

    public class Publisher
        {
            private readonly string _exchange;
            private readonly string _hostName;
            private readonly string _password;
            private readonly Uri _uri;
            private readonly string _userName;
            private readonly string _virtualHost;
    
            /// <param name="exchange"></param>
            /// <param name="hostName"></param>
            /// <param name="userName"></param>
            /// <param name="password"></param>
            /// <param name="virtualHost"></param>
            /// <param name="uri">AMQP Address</param>
            public Publisher(string exchange, string hostName, string userName, string password, string virtualHost, Uri uri)
            {
                _hostName = hostName;
                _exchange = exchange;
                _userName = userName;
                _password = password;
                _virtualHost = virtualHost;
                _uri = uri;
    
                Factory = new ConnectionFactory
                {
                    HostName = _hostName,
                    UserName = _userName,
                    Password = _password,
                    VirtualHost = _virtualHost,
                    Endpoint = new AmqpTcpEndpoint(_uri),
                    RequestedHeartbeat = 0
                };
                Factory.RequestedHeartbeat = 0;
            }
    
            public string HostName
            {
                get { return _hostName; }
            }
    
            public string Exchange
            {
                get { return _exchange; }
            }
    
            public string UserName
            {
                get { return _userName; }
            }
    
            public string Password
            {
                get { return _password; }
            }
    
            public string VirtualHost
            {
                get { return _virtualHost; }
            }
    
            public Uri Uri
            {
                get { return _uri; }
            }
    
            public ConnectionFactory Factory { get; private set; }
    
            /// <summary>
            /// 直连式交换机,发消息
            /// </summary>
            /// <param name="queueName">队列名</param>
            /// <param name="message">消息</param>
            /// <param name="durable">消息是否持久化</param>
            public void PublishDirectMessage(string queueName, string message, bool durable=false)
            {
                if (null == Factory)
                {
                    throw new ArgumentException("connection factory initialization error");
                }
    
                if (string.IsNullOrWhiteSpace(message))
                {
                    throw new ArgumentNullException("message can not be null.");
                }
    
                if (string.IsNullOrWhiteSpace(Exchange))
                {
                    throw new ArgumentNullException("exchange can not be null.");
                }
    
                try
                {
                    using (var connection = Factory.CreateConnection())
                    {
                        //通道 (Channel),在C#客户端里叫Model(不明白为什么这么取名字),其他客户端基本都叫Channel
                        using (var channel = connection.CreateModel())
                        {
                            //定义交换机
                            channel.ExchangeDeclare(Exchange, ExchangeType.Direct, durable: durable,
                                autoDelete: false, arguments: null);
    
                            //定义队列,如果名称相同不会重复创建
                            channel.QueueDeclare(queueName, durable: durable, exclusive: false,
                                autoDelete: false, arguments: null);
    
                            //绑定
                            channel.QueueBind(queueName, Exchange, routingKey: queueName);
    
                            //消息可持久化
                            IBasicProperties props = null;
                            if (durable) {
                                props = channel.CreateBasicProperties();
                                props.SetPersistent(true);
                            }                        
    
                            //发送消息到队列
                            var msgBody = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(Exchange, routingKey: queueName, basicProperties: props,
                                body: msgBody);
                        }
                    }
                }
                catch (Exception ex)
                {
                    LogHelper.Log(LogCategory.Error, ex.Message, ex);
                }
            }
    
            /// <summary>
            /// Fanout(广播)式交换机
            /// </summary>
            /// <param name="message"></param>
            public void PublishFanoutMessage(string message)
            {
                if (null == Factory)
                {
                    throw new ArgumentException("connection factory initialization error");
                }
    
                if (string.IsNullOrWhiteSpace(message))
                {
                    throw new ArgumentNullException("message can not be null.");
                }
    
                if (string.IsNullOrWhiteSpace(Exchange))
                {
                    throw new ArgumentNullException("exchange can not be null.");
                }
    
                try
                {
                    using (var connection = Factory.CreateConnection())
                    {
                        using (var channel = connection.CreateModel())
                        {
                            channel.ExchangeDeclare(Exchange, ExchangeType.Fanout);
                            byte[] body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(Exchange, "", null, body);
                        }
                    }
                }
                catch (Exception ex)
                {
                    LogHelper.Log(LogCategory.Error, ex.Message, ex);
                }
            }
        }
    生产者

    2、消费者封装类:

    public delegate void ReceiveMessageHandle(string inputStr);
        public delegate bool ReceiveAnswerMessageHandle(string inputStr);
    
        /// <typeparam name="T">要接收的数据类型</typeparam>
        public class Subscriber<T> : IDisposable
        {
            private readonly string _exchange;
            private readonly string _hostName;
            private readonly string _password;
            private readonly Uri _uri;
            private readonly string _userName;
            private readonly string _virtualHost;
    
            /// <param name="exchange"></param>
            /// <param name="hostName"></param>
            /// <param name="userName"></param>
            /// <param name="password"></param>
            /// <param name="virtualHost"></param>
            /// <param name="uri">AMQP Address</param>
            public Subscriber(string exchange, string hostName, string userName, string password, string virtualHost,
                Uri uri)
            {
                _hostName = hostName;
                _exchange = exchange;
                _userName = userName;
                _password = password;
                _virtualHost = virtualHost;
                _uri = uri;
    
                Factory = new ConnectionFactory
                {
                    HostName = _hostName,
                    UserName = _userName,
                    Password = _password,
                    VirtualHost = _virtualHost,
                    Endpoint = new AmqpTcpEndpoint(_uri),
                    RequestedHeartbeat = 0
                };
    
                Connection = Factory.CreateConnection();
                Channel = Connection.CreateModel();
            }
    
            public string HostName
            {
                get { return _hostName; }
            }
    
            public string Exchange
            {
                get { return _exchange; }
            }
    
            public string UserName
            {
                get { return _userName; }
            }
    
            public string Password
            {
                get { return _password; }
            }
    
            public string VirtualHost
            {
                get { return _virtualHost; }
            }
    
            public Uri Uri
            {
                get { return _uri; }
            }
    
            public ConnectionFactory Factory { get; private set; }
    
            public IModel Channel { get; private set; }
            public IConnection Connection { get; private set; }
            public EventingBasicConsumer Consumer { get; private set; }
    
            private string QueueName { get; set; }
    
            public string Message { get; set; }
    
            //public delegate string MessageHandle();
    
            /// <summary>
            /// 手动释放
            /// </summary>
            void IDisposable.Dispose()
            {
                if (Channel != null)
                {
                    Consumer = null;
                    Channel.Close();
                    Channel.Dispose();
                }
    
                if (Connection != null)
                {
                    Consumer = null;
                    Connection.Close();
                    Connection.Dispose();
                }
    
                GC.SuppressFinalize(this);
            }
    
            /// <summary>
            /// 托管释放
            /// </summary>
            ~Subscriber()
            {
                if (Channel != null)
                {
                    Consumer = null;
                    Channel.Close();
                    Channel.Dispose();
                }
    
                if (Connection != null)
                {
                    Consumer = null;
                    Connection.Close();
                    Connection.Dispose();
                }
            }
    
            public ReceiveMessageHandle ReceiveMessageHandler { get; set; }
            public ReceiveAnswerMessageHandle ReceiveAnswerMessageHandle { get; set; }
    
            /// <summary>
            /// 接受广播消息
            /// <param name="tryTimes">消费失败后,继续尝试消费的次数</param>
            /// </summary>
            public void ReceiveFanoutMessage(int tryTimes = 3)
            {
                try
                {
                    //Channel.ExchangeDeclare(Exchange, ExchangeType.Fanout);
                    //QueueName = Channel.QueueDeclare().QueueName;
                    //Channel.QueueBind(QueueName, Exchange, "");
                    //Consumer = new EventingBasicConsumer(Channel);
                    //Consumer.Received += (model, dlvrArgs) =>
                    //{
                    //    byte[] body = dlvrArgs.Body;
                    //    Message = Encoding.UTF8.GetString(body);
                    //    ReceiveMessageHandler(Message);
                    //};
                    //Channel.BasicConsume(QueueName, true, Consumer);
    
                    Channel.ExchangeDeclare(Exchange, ExchangeType.Fanout);
                    QueueName = Channel.QueueDeclare().QueueName;
                    Channel.QueueBind(QueueName, Exchange, "");
                    Consumer = new EventingBasicConsumer(Channel);
                    Consumer.Received += (model, dlvrArgs) =>
                    {
                        byte[] body = dlvrArgs.Body;
                        if (body != null && body.Length > 0)
                        {
                            Message = Encoding.UTF8.GetString(body);
                            if (!string.IsNullOrWhiteSpace(Message))
                            {
                                bool isConsumeSuccess = false;// 是否消费成功
                                int consumeCount = 0;//尝试消费次数
                                while (!isConsumeSuccess)
                                {
                                    consumeCount++;
                                    isConsumeSuccess = ReceiveAnswerMessageHandle(Message);
                                    if (isConsumeSuccess || consumeCount >= tryTimes)
                                    {
                                        Channel.BasicAck(dlvrArgs.DeliveryTag, false);//将队列里面的消息进行释放
                                        isConsumeSuccess = true;
                                    }
                                    else
                                    {
                                        //重新放入队列,等待再次消费
                                        Channel.BasicAck(dlvrArgs.DeliveryTag, true);
                                    }
                                }
                            }
                        }
                    };
                    Channel.BasicConsume(QueueName, false, Consumer);
                }
                catch (Exception ex)
                {
                    LogHelper.Log(LogCategory.Error, ex.Message, ex);
                }
            }
    
            /// <summary>
            /// 接受直连交换机消息
            /// </summary>
            /// <param name="queueName">队列名</param>
            /// <param name="durable">消息是否持久化</param>
            /// <param name="tryTimes">消费失败后,继续尝试消费的次数</param>
            /// <returns></returns>
            public void ReceiveDirectMessage(string queueName, bool durable = false, int tryTimes = 3)
            {
                try
                {
                    Channel.ExchangeDeclare(Exchange, ExchangeType.Direct, durable: durable, autoDelete: false, arguments: null);
                    Channel.QueueDeclare(queueName, durable: durable, exclusive: false, autoDelete: false, arguments: null);
                    Channel.QueueBind(queueName, Exchange, routingKey: queueName);
                    //订阅模式 (有消息到达将被自动接收) 消费者 
                    Consumer = new EventingBasicConsumer(Channel);
                    //绑定消息接收后的事件委托 
                    Consumer.Received += (model, dlvrArgs) =>
                    {
                        byte[] body = dlvrArgs.Body;
                        if (body != null && body.Length > 0)
                        {
                            Message = Encoding.UTF8.GetString(body);
                            if (!string.IsNullOrWhiteSpace(Message))
                            {
                                bool isConsumeSuccess = false;// 是否消费成功
                                int consumeCount = 0;//尝试消费次数
                                while (!isConsumeSuccess)
                                {
                                    consumeCount++;
                                    isConsumeSuccess = ReceiveAnswerMessageHandle(Message);
                                    if (isConsumeSuccess || consumeCount >= tryTimes)
                                    {
                                        Channel.BasicAck(dlvrArgs.DeliveryTag, false);//将队列里面的消息进行释放
                                        isConsumeSuccess = true;
                                    }
                                    else
                                    {
                                        //重新放入队列,等待再次消费
                                        Channel.BasicAck(dlvrArgs.DeliveryTag, true);
                                    }
                                }
                            }
                        }
                    };
                    Channel.BasicConsume(queueName, false, Consumer);
                }
    
                catch (Exception ex)
                {
                    LogHelper.Log(LogCategory.Error, ex.Message, ex);
                }
            }
    
            public T ToJson()
            {
                return JsonConvert.DeserializeObject<T>(Message);
            }
        }
    消费者

    3、新建控制台程序,发送消息:

     class Program
        {
            public static string MqUri = ConfigHelper.GetConfig("RabbitMQ", "MqUri");
            public static string MqExchange = ConfigHelper.GetConfig("RabbitMQ", "MqExchange");
            public static string MqHostName = ConfigHelper.GetConfig("RabbitMQ", "MqHostName");
            public static string MqUserName = ConfigHelper.GetConfig("RabbitMQ", "MqUserName");
            public static string MqPassword = ConfigHelper.GetConfig("RabbitMQ", "MqPassword");
    
            static void Main(string[] args)
            {
                string userCommand = "";  
                while (userCommand != "exit")
                {
                    Console.WriteLine("请输入:");
                    userCommand = Console.ReadLine();
    
                    //发送消息
                    var publisher = new Publisher(MqExchange, MqHostName,
                                    MqUserName, MqPassword, "/", new Uri(MqUri));
    
                    publisher.PublishFanoutMessage(userCommand);
                }           
            }
        }
    发送消息控制台应用

    4、新建控制台程序,接受消息:

    internal class Program
        {
            public static string MqUri = ConfigHelper.GetConfig("RabbitMQ", "MqUri");
            public static string MqExchange = ConfigHelper.GetConfig("RabbitMQ", "MqExchange");
            public static string MqHostName = ConfigHelper.GetConfig("RabbitMQ", "MqHostName");
            public static string MqUserName = ConfigHelper.GetConfig("RabbitMQ", "MqUserName");
            public static string MqPassword = ConfigHelper.GetConfig("RabbitMQ", "MqPassword");
    
            static void Main(string[] args)
            {
                Console.WriteLine("Start process data {0}", DateTime.Now);
                try
                {
                    var subscriber = new Subscriber<string>(
                        MqExchange, MqHostName, MqUserName, MqPassword, "/",
                        new Uri(MqUri))
                    {
                        ReceiveAnswerMessageHandle = SubscriberHandler1
                    };
    
                    subscriber.ReceiveFanoutMessage();
    
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                }
            }
    
            private static bool SubscriberHandler1(string msg)
            {
                Console.WriteLine(msg);
                return true;
            }
        }
    接受消息控制台应用

    5、配置文件:

    上面两个控制台的配置文件一样,如下:

    <RabbitMQ>
    <add key="MqUri" value="amqp://localhost/" />
    <add key="MqExchange" value="MyExchange" />
    <add key="MqHostName" value="localhost" />
    <add key="MqUserName" value="root" />
    <add key="MqPassword" value="root" />
    </RabbitMQ>

    6、结果图:

  • 相关阅读:
    tableau学习笔记—1
    sql学习笔记1
    rpy2的安装问题?【解决】
    python_广州房价热力图
    数据清洗记录——.图书馆学号去敏
    python argparse
    利用torch.nn实现前馈神经网络解决 多分类 任务
    R7000P Ubuntu20.04 安装 Realtek 8852 无线网卡驱动
    邻接矩阵的相乘的意义
    分类问题中评价指标
  • 原文地址:https://www.cnblogs.com/qk2014/p/9173269.html
Copyright © 2011-2022 走看看