zoukankan      html  css  js  c++  java
  • C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率

        一、引言

        使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。本例使用多线程来创建多信道并绑定队列,达到多workers的目的。

        二、示例

        2.1、环境准备

        在NuGet上安装RabbitMQ.Client。

        2.2、工厂类

        添加一个工厂类RabbitMQFactory:

        /// <summary>
        /// 多路复用技术(Multiplexing)目的:为了避免创建多个TCP而造成系统资源的浪费和超载,从而有效地利用TCP连接。
        /// </summary>
        public static class RabbitMQFactory
        {
            private static IConnection sharedConnection;
            private static int ChannelCount { get; set; }
            private static readonly object _locker = new object();
    
            public static IConnection SharedConnection
            {
                get
                {
                    if (ChannelCount >= 1000)
                    {
                        if (sharedConnection != null && sharedConnection.IsOpen)
                        {
                            sharedConnection.Close();
                        }
                        sharedConnection = null;
                        ChannelCount = 0;
                    }
                    if (sharedConnection == null)
                    {
                        lock (_locker)
                        {
                            if (sharedConnection == null)
                            {
                                sharedConnection = GetConnection();
                                ChannelCount++;
                            }
                        }
                    }
                    return sharedConnection;
                }
            }
    
            private static IConnection GetConnection()
            {
                var factory = new ConnectionFactory
                {
                    HostName = "192.168.2.242",
                    UserName = "hello",
                    Password = "world",
                    Port = AmqpTcpEndpoint.UseDefaultPort,//5672
                    VirtualHost = ConnectionFactory.DefaultVHost,//使用默认值:"/"
                    Protocol = Protocols.DefaultProtocol,
                    AutomaticRecoveryEnabled = true
                };
                return factory.CreateConnection();
            }
        }
    View Code

        2.3、主窗体

        代码如下:

        public partial class RabbitMQMultithreading : Form
        {
            public delegate void ListViewDelegate<T>(T obj);
    
            public RabbitMQMultithreading()
            {
                InitializeComponent();
            }
    
            /// <summary>
            /// ShowMessage重载
            /// </summary>
            /// <param name="msg"></param>
            private void ShowMessage(string msg)
            {
                if (InvokeRequired)
                {
                    BeginInvoke(new ListViewDelegate<string>(ShowMessage), msg);
                }
                else
                {
                    ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), msg });
                    lvwMsg.Items.Insert(0, item);
                }
            }
    
            /// <summary>
            /// ShowMessage重载
            /// </summary>
            /// <param name="format"></param>
            /// <param name="args"></param>
            private void ShowMessage(string format, params object[] args)
            {
                if (InvokeRequired)
                {
                    BeginInvoke(new MethodInvoker(delegate ()
                    {
                        ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) });
                        lvwMsg.Items.Insert(0, item);
                    }));
                }
                else
                {
                    ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) });
                    lvwMsg.Items.Insert(0, item);
                }
            }
    
            /// <summary>
            /// 生产者
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void btnSend_Click(object sender, EventArgs e)
            {
                int messageCount = 100;
                var factory = new ConnectionFactory
                {
                    HostName = "192.168.2.242",
                    UserName = "hello",
                    Password = "world",
                    Port = AmqpTcpEndpoint.UseDefaultPort,//5672
                    VirtualHost = ConnectionFactory.DefaultVHost,//使用默认值:"/"
                    Protocol = Protocols.DefaultProtocol,
                    AutomaticRecoveryEnabled = true
                };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        string message = "Hello World";
                        var body = Encoding.UTF8.GetBytes(message);
                        for (int i = 1; i <= messageCount; i++)
                        {
                            channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
                            ShowMessage($"Send {message}");
                        }
                    }
                }
            }
    
            /// <summary>
            /// 消费者
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private async void btnReceive_Click(object sender, EventArgs e)
            {
                Random random = new Random();
                int rallyNumber = random.Next(1, 1000);
                int channelCount = 0;
    
                await Task.Run(() =>
                {
                    try
                    {
                        int asyncCount = 10;
                        List<Task<bool>> tasks = new List<Task<bool>>();
                        var connection = RabbitMQFactory.SharedConnection;
                        for (int i = 1; i <= asyncCount; i++)
                        {
                            tasks.Add(Task.Factory.StartNew(() => MessageWorkItemCallback(connection, rallyNumber)));
                        }
                        Task.WaitAll(tasks.ToArray());
    
                        string syncResultMsg = $"集结号 {rallyNumber} 已吹起号角--" +
                            $"本次开启信道成功数:{tasks.Count(s => s.Result == true)}," +
                            $"本次开启信道失败数:{tasks.Count() - tasks.Count(s => s.Result == true)}" +
                            $"累计开启信道成功数:{channelCount + tasks.Count(s => s.Result == true)}";
                        ShowMessage(syncResultMsg);
                    }
                    catch (Exception ex)
                    {
                        ShowMessage($"集结号 {rallyNumber} 消费异常:{ex.Message}");
                    }
                });
            }
    
            /// <summary>
            /// 异步方法
            /// </summary>
            /// <param name="state"></param>
            /// <param name="rallyNumber"></param>
            /// <returns></returns>
            private bool MessageWorkItemCallback(object state, int rallyNumber)
            {
                bool syncResult = false;
                IModel channel = null;
                try
                {
                    IConnection connection = state as IConnection;
                    //不能使用using (channel = connection.CreateModel())来创建信道,让RabbitMQ自动回收channel。
                    channel = connection.CreateModel();
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        Thread.Sleep(1000);
                        ShowMessage($"集结号 {rallyNumber} Received {message}");
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
                    syncResult = true;
                }
                catch (Exception ex)
                {
                    syncResult = false;
                    ShowMessage(ex.Message);
                }
                return syncResult;
            }
        }
    View Code

        2.4、运行结果

        多点几次消费者即可增加信道,提升消费能力。

  • 相关阅读:
    vscode clang-format
    MyBatis中比较(大于、小于)符号的转义写法
    byte数组(byte[])与MultipartFile相互转化
    IDEA报错 Error:(24, 35) java: 常量字符串过长
    Nginx中配置反向代理的proxy_pass的不同斜杠的区别
    使用docker-compose一起安装kafka(zookeeper)
    docker启动报错:Failed to Setup IP tables: Unable to enable SKIP DNAT rule
    Xftp设置指定记事本(notepad++)打开文件
    Linux使用docker安装Nginx
    使用openssl生成证书,并通过Nginx配置
  • 原文地址:https://www.cnblogs.com/atomy/p/12680782.html
Copyright © 2011-2022 走看看