zoukankan      html  css  js  c++  java
  • 【c#】队列(Queue)和MSMQ(消息队列)的基础使用

    原文:https://www.cnblogs.com/yanbigfeg/p/9674238.html

        首先我们知道队列是先进先出的机制,所以在处理并发是个不错的选择。然后就写两个队列的简单应用。

    Queue

    命名空间

        命名空间:System.Collections,不在这里做过多的理论解释,这个东西非常的好理解。

        可以看下官方文档:https://docs.microsoft.com/zh-cn/dotnet/api/system.collections.queue?view=netframework-4.7.2

    示例代码

    我这里就是为了方便记忆做了一个基本的例子,首先创建了QueueTest类:

    包含了获取队列的数量,入队和出队的实现

    复制代码
     1  public class QueueTest
     2     {
     3         public static Queue<string> q = new Queue<string>();
     4 
     5         #region 获取队列数量
     6         public int GetCount()
     7         {
     8 
     9             return q.Count;
    10         }
    11         #endregion
    12 
    13         #region 队列添加数据
    14         public void IntoData(string qStr)
    15         {
    16             string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
    17             q.Enqueue(qStr);
    18             Console.WriteLine($"队列添加数据: {qStr};当前线程id:{threadId}");
    19         }
    20         #endregion
    21 
    22         #region 队列输出数据
    23 
    24         public string OutData()
    25         {
    26             string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
    27             string str = q.Dequeue();
    28             Console.WriteLine($"队列输出数据: {str};当前线程id:{threadId}");
    29             return str;
    30         }
    31         #endregion
    32 
    33     }
    复制代码

    为了模拟并发情况下也不会出现重复读取和插入混乱的问题所以写了TaskTest类里面开辟了两个异步线程进行插入和读取:

    这里只是证明了多线程插入不会造成丢失。无忧证明并发的先进先出

    复制代码
     1     class TaskTest
     2     {
     3 
     4         #region 队列的操作模拟
     5         public static void QueueMian()
     6         {
     7             QueueA();
     8             QueueB();
     9         }
    10         private static async void QueueA()
    11         {
    12             QueueTest queue = new QueueTest();
    13             var task = Task.Run(() =>
    14             {
    15                 for (int i = 0; i < 20; i++)
    16                 {
    17                     queue.IntoData("QueueA" + i);
    18                 }
    19             });
    20             await task;
    21             Console.WriteLine("QueueAA插入完成,进行输出:");
    22 
    23             while (queue.GetCount() > 0)
    24             {
    25                 queue.OutData();
    26             }
    27         }
    28 
    29         private static async void QueueB()
    30         {
    31             QueueTest queue = new QueueTest();
    32             var task = Task.Run(() =>
    33             {
    34                 for (int i = 0; i < 20; i++)
    35                 {
    36                     queue.IntoData("QueueB" + i);
    37                 }
    38             });
    39             await task;
    40             Console.WriteLine("QueueB插入完成,进行输出:");
    41 
    42             while (queue.GetCount() > 0)
    43             {
    44                 queue.OutData();
    45             }
    46         }
    47         #endregion
    48 
    49     }
    复制代码

    效果展示

    然后在main函数直接调用即可:

    通过上面的截图可以看出插入线程是无先后的。

    这张图也是线程无先后。

    补充:通过园友的提问,我发现我一开始测试的不太仔细,只注意多线程下的插入,没有注意到输出其实不是跟插入的顺序一致,对不起,这说明queue不是线程安全的,所以这个就当是入队,出队的基础例子并不能说明并发。后面有一个补充的ConcurrentQueue队列是说明了并发线程的先进先出。

    MSMQ

    msmq是微软提供的消息队列,本来在windows系统中就存在,但是默认没有开启。需要开启。

    开启安装

    打开控制面板=>程序和功能=> 启动或关闭windows功能 => Microsoft Message Queue(MSMQ)服务器=>Microsoft Message Queue(MSMQ)服务器核心

    一般选择:MSMQ Active Directory域服务继承和MSMQ HTTP支持即可。

    点击确定等待安装成功。

    命名空间

    需要引用System.Messaging.DLL

    命名空间:System.Messaging

    官方资料文档:https://docs.microsoft.com/zh-cn/dotnet/api/system.messaging.messagequeue?view=netframework-4.7.2

    示例代码

    与上面queue同样的示例方式,创建一个MSMQ类,实现创建消息队列,查询数据,入列,出列功能:

    复制代码
      1  /// <summary>
      2     /// MSMQ消息队列
      3     /// </summary>
      4     class MSMQ
      5     {
      6         static string path = ".\Private$\myQueue";
      7         static MessageQueue queue;
      8         public static void Createqueue(string queuePath)
      9         {
     10             try
     11             {
     12                 if (MessageQueue.Exists(queuePath))
     13                 {
     14                     Console.WriteLine("消息队列已经存在");
     15                     //获取这个消息队列
     16                     queue = new MessageQueue(queuePath);
     17                 }
     18                 else
     19                 {
     20                     //不存在,就创建一个新的,并获取这个消息队列对象
     21                     queue = MessageQueue.Create(queuePath);
     22                     path = queuePath;
     23                 }
     24             }
     25             catch (Exception e)
     26             {
     27                 Console.WriteLine(e.Message);
     28             }
     29 
     30         }
     31 
     32 
     33         #region 获取消息队列的数量
     34         public static int GetMessageCount()
     35         {
     36             try
     37             {
     38                 if (queue != null)
     39                 {
     40                     int count = queue.GetAllMessages().Length;
     41                     Console.WriteLine($"消息队列数量:{count}");
     42                     return count;
     43                 }
     44                 else
     45                 {
     46                     return 0;
     47                 }
     48             }
     49             catch (MessageQueueException e)
     50             {
     51 
     52                 Console.WriteLine(e.Message);
     53                 return 0;
     54             }
     55 
     56 
     57         }
     58         #endregion
     59 
     60         #region 发送消息到队列
     61         public static void SendMessage(string qStr)
     62         {
     63             try
     64             {
     65                 //连接到本地队列
     66 
     67                 MessageQueue myQueue = new MessageQueue(path);
     68 
     69                 //MessageQueue myQueue = new MessageQueue("FormatName:Direct=TCP:192.168.12.79//Private$//myQueue1");
     70 
     71                 //MessageQueue rmQ = new MessageQueue("FormatName:Direct=TCP:121.0.0.1//private$//queue");--远程格式
     72 
     73                 Message myMessage = new Message();
     74 
     75                 myMessage.Body = qStr;
     76 
     77                 myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
     78 
     79                 //发生消息到队列中
     80 
     81                 myQueue.Send(myMessage);
     82 
     83                 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
     84                 Console.WriteLine($"消息发送成功: {qStr};当前线程id:{threadId}");
     85             }
     86             catch (MessageQueueException e)
     87             {
     88                 Console.WriteLine(e.Message);
     89             }
     90         }
     91         #endregion
     92 
     93         #region 连接消息队列读取消息
     94         public static void ReceiveMessage()
     95         {
     96             MessageQueue myQueue = new MessageQueue(path);
     97 
     98 
     99             myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
    100 
    101             try
    102 
    103             {
    104 
    105                 //从队列中接收消息
    106 
    107                 Message myMessage = myQueue.Receive(new TimeSpan(10));// myQueue.Peek();--接收后不消息从队列中移除
    108                 myQueue.Close();
    109 
    110                 string context = myMessage.Body.ToString();
    111                 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
    112                 Console.WriteLine($"--------------------------消息内容: {context};当前线程id:{threadId}");
    113 
    114             }
    115 
    116             catch (System.Messaging.MessageQueueException e)
    117 
    118             {
    119 
    120                 Console.WriteLine(e.Message);
    121 
    122             }
    123 
    124             catch (InvalidCastException e)
    125 
    126             {
    127 
    128                 Console.WriteLine(e.Message);
    129 
    130             }
    131 
    132         }
    133         #endregion
    134     }
    复制代码

    这里说明一下path这个字段,这是消息队列的文件位置和队列名称,我这里写的“.”(点)就是代表的位置MachineName字段,,代表本机的意思

    然后TaskTest类修改成这个样子:

    复制代码
     1 class TaskTest
     2     {
     3 
     4         #region 消息队列的操作模拟
     5         public static void MSMQMian()
     6         {
     7             MSMQ.Createqueue(".\Private$\myQueue");
     8             MSMQA();
     9             MSMQB();
    10             Console.WriteLine("MSMQ结束");
    11         }
    12         private static async void MSMQA()
    13         {
    14             var task = Task.Run(() =>
    15             {
    16                 for (int i = 0; i < 20; i++)
    17                 {
    18                     MSMQ.SendMessage("MSMQA" + i);
    19                 }
    20             });
    21             await task;
    22             Console.WriteLine("MSMQA发送完成,进行读取:");
    23 
    24             while (MSMQ.GetMessageCount() > 0)
    25             {
    26                 MSMQ.ReceiveMessage();
    27             }
    28         }
    29 
    30         private static async void MSMQB()
    31         {
    32             var task = Task.Run(() =>
    33             {
    34                 for (int i = 0; i < 20; i++)
    35                 {
    36                     MSMQ.SendMessage("MSMQB" + i);
    37                 }
    38             });
    39             await task;
    40             Console.WriteLine("MSMQB发送完成,进行读取:");
    41 
    42             while (MSMQ.GetMessageCount() > 0)
    43             {
    44                 MSMQ.ReceiveMessage();
    45             }
    46         }
    47         #endregion
    复制代码

     效果展示

    本机查看消息队列

    创建成功的消息队列我们可以在电脑上查看:我的电脑=>管理 =>计算机管理 =>服务与应用程序 =>消息队列 =>专用队列就看到我刚才创建的消息队列

     补充感谢

    感谢 virtual1988 提出的queue不是线程安全这个问题,是我没搞清楚。线程安全要使用ConcurrentQueue队列。

    谢谢提出的宝贵意见。

    ConcurrentQueue

    所以我有修改了一下写了个ConcurrentQueue队列的:

    修改代码如下:

     //public static Queue<string> q = new Queue<string>();
            public static ConcurrentQueue<string> q = new ConcurrentQueue<string>();
            //public static Queue q =Queue.Synchronized(new Queue());
    
            #region 获取队列数量
            public static int GetCount()
            {
    
                return q.Count;
            }
            #endregion
    
            #region 队列添加数据
            public static void IntoData(string qStr)
            {
                string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
                q.Enqueue(qStr);
                System.Threading.Thread.Sleep(10);
                Console.WriteLine($"队列添加数据: {qStr};当前线程id:{threadId}");
            }
            #endregion
    
            #region 队列输出数据
            public static string OutData2()
            {
                string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
                foreach (var item in q)
                {
    
                    Console.WriteLine($"------队列输出数据: {item};当前线程id:{threadId}");
                    string d="";
                    q.TryDequeue( out d);
                }
    
                return "211";
            }
            #endregion
    View Code

    task类:

     #region 队列的操作模拟
            public static async void QueueMian()
            {
                QueueA();
                QueueB();
            }
            private static async void QueueA()
            {
                var task = Task.Run(() =>
                {
                    for (int i = 0; i < 20; i++)
                    {
                        QueueTest.IntoData("QueueA" + i);
                    }
                });
                await task;
                Console.WriteLine("QueueA插入完成,进行输出:");
            }
    
            private static async void QueueB()
            {
                var task = Task.Run(() =>
                {
                    for (int i = 0; i < 20; i++)
                    {
                        QueueTest.IntoData("QueueB" + i);
                    }
                });
                await task;
                Console.WriteLine("QueueB插入完成,进行输出:");
    
            }
    
            public static void QueueC()
            {
                Console.WriteLine("Queue插入完成,进行输出:");
                while (QueueTest.GetCount() > 0)
                {
                    QueueTest.OutData2();
                }
            }
            #endregion
    View Code

    Main函数调用:

    复制代码
     static void Main(string[] args)
            {
    
    
                try
                {
                    Stopwatch stopWatch = new Stopwatch();
                    TaskTest.QueueMian();
                    Console.ReadLine();
                    TaskTest.QueueC();
                    Console.ReadLine();
                }
                catch (Exception e)
                {
    
                    throw;
                }
            }
    复制代码

    插入效果:

    输出效果:

  • 相关阅读:
    模拟城市:我是市长
    IOTA私有链简单搭建
    SOUL软件小结
    ubuntu 16.04 安装node.js 8.x
    Ubuntu下Hyperledger Fabric v0.6安装部署
    区块链关键术语与概念
    Windows Server 2019安装OpenSSH Server简明教程
    Windows10和Windows Server 2019支持OpenSSH
    TypeError: __init__() got an unexpected keyword argument 'serialized_options'
    无法从路径’NuGet.CommandLine.2.7.1.nupkg’读取包
  • 原文地址:https://www.cnblogs.com/zhang1f/p/12902750.html
Copyright © 2011-2022 走看看