zoukankan      html  css  js  c++  java
  • .Net下的MSMQ(微软消息队列)的同步异步调用

    一、MSMQ简介

     MSMQ(微软消息队列)是Windows操作系统中消息应用程序的基础,是用于创建分布式、松散连接的消息通讯应用程序的开发工具。消息队列

    和电子邮件有着很多相似处,他们都包含多个属性,用于保存消息,消息类型中都指出发送者和接收者的地址;然而他们的用处却有着很大的

    区别:消息队列的发送者和接收者是应用程序,而电子邮件的发送者和接收者通常是人。如同电子邮件一样,消息队列的发送和接收也不需要

    发送者和接收者同时在场,可以存储在消息队列或是邮件服务器中。

    二、消息队列的安装

     默认情况下安装操作系统是不安装消息队列的,你可以在控制面板中找到添加/删除程序,然后选择添加/删除Windows组件一项,然后选择应

    用程序服务器,双击它进入详细资料中选择消息队列一项进行安装,如图:







    三、消息队列类型

    消息对列分为3类:
     
    公共队列
     MachineNameQueueName
     能被别的机器所访问,如果你的多个项目中用到消息队列,那么你可以把队列定义为公共队列
     
    专用队列
     MachineNamePrivate$QueueName
     只针对于本机的程序才可以调用的队列,有些情况下为了安全起见定义为私有队列。
    日志队列
     MachineNameQueueNameJournal$
     

    四、消息队列的创建

    MessageQueue Mq=new MessageQueue(“.\private$\Mymq”);

    通过Path属性引用消息队列的代码也十分简单:

    MessageQueue Mq=new MessageQueue();

    Mq.Path=”.\private$\Mymq”;

    使用 Create 方法可以在计算机上创建队列:

    System.Messaging.MessageQueue.Create(@".private$Mymq");

    这里注意由于在C#中要记住用反斜杠将“”转义。

    由于消息对列所放置的地方经常改变,所以建议消息队列路径不要写死,建议放在配置文件中。

    五、消息的发送

    消息的发送可以分为简单消息和复杂消息,简单消息类型就是常用的数据类型,例如整型、字符串等数据;复杂消息的数据类型通常对应于系

    统中的复杂数据类型,例如结构,对象等等。

    Mq.Send("Hello!");
    在这里建议你可以事先定义一个对象类,然后发送这个对象类的实例对象,这样以后无论在增加什么发送信息,只需在对象类中增加
    相应的属性即可。

    六、消息的接收和阅读

    (1)同步接收消息

      接收消息的代码很简单:

     Mq.Receive();
            Mq.Receive(TimeSpan timeout); //设定超时时间
     Mq.ReceiveById(ID);
            Mq.Peek(); 
     
     通过Receive方法接收消息同时永久性地从队列中删除消息;
     通过Peek方法从队列中取出消息而不从队列中移除该消息。
     如果知道消息的标识符(ID),还可以通过ReceiveById方法和PeekById方法完成相应的操作。

    (2)异步接受消息
       
       利用委托机制:

     MessQueue.ReceiveCompleted +=new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
     


    (3)消息阅读

    在应用程序能够阅读的消息和消息队列中的消息格式不同,应用程序发送出去的消息经过序列化以后才发送给了消息队列
    而在接受端必须反序列化,利用下面的代码可以实现:

     public void mq_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e)
      {
       System.Messaging.Message m = MessQueue.EndReceive(e.AsyncResult);
       m.Formatter = new System.Messaging.XmlMessageFormatter(new string[]{"System.String,mscorlib"});
       Console.WriteLine("Message: " + (string)m.Body);
       MessQueue.BeginReceive() ;

      }

    反序列化还有另一种写法:
     m.Formatter = new XmlMessageFormatter ( new Type [] { typeof (string) } );

    七、由于消息队列的代码有些是固定不便的,所以把这些代码封装成一个类方便以后使用:

      1using System;
      2using System.Messaging;
      3using System.Threading;

      5
      6namespace LoveStatusService
      7{
      8    /// <summary>
      9    /// Summary description for Msmq.
     10    /// </summary>

     11    public class Msmq
     12    {
     13        public Msmq()
     14        {
     15            //
     16            // TODO: Add constructor logic here
     17            //
     18        }

     19
     20        
     21        private MessageQueue _messageQueue=null;
     22        //最大并发线程数 
     23        private static int MAX_WORKER_THREADS=Convert.ToInt32( System.Configuration.ConfigurationSettings.AppSettings["MAX_WORKER_THREADS"].ToString());
     24        //Msmq路径
     25        private static string MsmqPath=System.Configuration.ConfigurationSettings.AppSettings["LoveStatusMQPath"];
     26        //等待句柄
     27        private WaitHandle[] waitHandleArray = new WaitHandle[MAX_WORKER_THREADS];
     28        //任务类型
     29        //1. Send Email 2. Send Message  3. Send Email and Message
     30        private string TaskType=System.Configuration.ConfigurationSettings.AppSettings["TaskType"];
     31        public MessageQueue MessQueue
     32        {
     33            get
     34            {
     35            
     36                if (_messageQueue==null)
     37                {
     38                    if(MessageQueue.Exists(MsmqPath))
     39                    {
     40                        _messageQueue = new MessageQueue(MsmqPath);    
     41                    }

     42                    else
     43                    {
     44                        _messageQueue = MessageQueue.Create(MsmqPath);    
     45                    }
        
     46                }

     47                
     48
     49                return _messageQueue;
     50            }

     51        }

     52        
     53
     54    #region Private Method
     55
     56        private void mq_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e)
     57        {
     58            MessageQueue mqq = (MessageQueue)sender;
     59            System.Messaging.Message m = mqq.EndReceive(e.AsyncResult);
     60            //m.Formatter = new System.Messaging.XmlMessageFormatter(new string[]{"System.String,mscorlib"});
     61            m.Formatter =new System.Messaging.XmlMessageFormatter(new Type[] {typeof(UserObject)}) ;
     62            //log.Info("Receive UserID: " + (string)m.Body) ;
     63            UserObject obj=(UserObject)m.Body ;
     64            long curUserId=obj.curUserID ;
     65            long oppUserId=obj.oppUserID;
     66            string curUserName=obj.curUserName;
     67            string oppUserName=obj.oppUserName;
     68            string curEmail=obj.curEmail ;
     69            string oppEmail=obj.oppEmail;
     70            string subject =obj.subject ;
     71            string body=obj.body ;
     72            //AppLog.log.Info("curUserId:"+curUserId) ;
     73            //AppLog.log.Info("oppUserId:"+oppUserId) ;
     74            AppLog.log.Info("==type="+TaskType) ;
     75            switch(TaskType)
     76            {
     77                //Email
     78                case "1":
     79                    EmailForMQ.SendEmailForLoveStatus(curEmail,oppEmail,curUserName,oppUserName,subject) ;
     80                    AppLog.log.Info("==Send to=="+oppEmail) ;
     81                    break;
     82                //Message
     83                case "2":
     84                    MessageForMQ.SendMessage(curUserId,oppUserId,subject,body) ;
     85                    AppLog.log.Info("==Send Msg to=="+oppUserId) ;
     86                    break;
     87                //Email and Message        
     88                case "3":
     89                    EmailForMQ.SendEmailForLoveStatus(curEmail,oppEmail,curUserName,oppUserName,subject) ;
     90                    AppLog.log.Info("==Send to=="+oppEmail) ;
     91                    MessageForMQ.SendMessage(curUserId,oppUserId,subject,body) ;
     92                    AppLog.log.Info("==Send Msg to=="+oppUserId) ;
     93                    break;
     94                default:
     95                    break;
     96
     97            }

     98            mqq.BeginReceive() ;
     99
    100        }

    101
    102    #endregion

    103
    104    #region Public Method
    105
    106        //一个将对象发送到队列的方法,这里发送的是对象
    107        public void SendUserIDToMQ(object arr)
    108        {
    109            MessQueue.Send(arr) ;
    110            Console.WriteLine("Ok") ;
    111            Console.Read() ;
    112        }

    113
    114        //同步接受队列内容的方法
    115        public void ReceiveFromMQ()
    116        {
    117            Message ms=new Message() ;
    118            
    119            //ms=MessQueue.Peek(); 
    120            try
    121            {
    122                ms=MessQueue.Receive(new TimeSpan(0,0,5));
    123                if(ms!=null)
    124                {
    125                    ms.Formatter = new XmlMessageFormatter ( new Type [] typeof (string) } );
    126                    AppLog.log.Info((string)ms.Body)  ; 
    127                }

    128            }

    129            catch(Exception ex)
    130            {
    131                
    132            }

    133            
    134        
    135        }

    136
    137        //开始监听工作线程
    138        public  void startListen()
    139        {
    140            AppLog.log.Info("--Thread--"+MAX_WORKER_THREADS) ;
    141            MessQueue.ReceiveCompleted +=new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
    142            
    143            //异步方式,并发
    144            
    145            for(int i=0; i<MAX_WORKER_THREADS; i++)
    146            {
    147                // Begin asynchronous operations.
    148                waitHandleArray[i] = MessQueue.BeginReceive().AsyncWaitHandle;
    149            }

    150
    151            AppLog.log.Info("------Start Listen--------") ;
    152
    153            return;
    154
    155        }

    156
    157 
    158        //停止监听工作线程
    159        public void stopListen()
    160        {
    161
    162            for(int i=0;i<waitHandleArray.Length;i++)
    163            {
    164
    165                try
    166                {
    167                    waitHandleArray[i].Close();
    168                }

    169                catch
    170                {
    171                    AppLog.log.Info("---waitHandleArray[i].Close() Error!-----") ;
    172                }

    173
    174            }

    175
    176            try
    177            {
    178                // Specify to wait for all operations to return.
    179                WaitHandle.WaitAll(waitHandleArray,1000,false);
    180            }

    181            catch
    182            {
    183                AppLog.log.Info("---WaitHandle.WaitAll Error!-----") ;
    184            }

    185            AppLog.log.Info("------Stop Listen--------") ;
    186
    187        }

    188
    189    #endregion

    190    
    191    
    192
    193    
    194    }

    195}

    196

    UserObject的代码

      1using System;
      2
      3namespace Goody9807
      4{
      5    /// <summary>
      6    /// 用与在MQ上传输数据的对象
      7    /// </summary>

      8    public class UserObject
      9    {
     10        public UserObject()
     11        {
     12            //
     13            // TODO: Add constructor logic here
     14            //
     15        }

     16
     17        private long _curUserID;
     18        public long curUserID
     19        {
     20            get
     21            {
     22                return _curUserID;
     23            }

     24            set
     25            {
     26                _curUserID=value;
     27            }

     28        }

     29
     30        private  string _curUserName="";
     31        public string curUserName
     32        {
     33            get
     34            {
     35                return _curUserName;
     36            }

     37            set
     38            {
     39                _curUserName=value;
     40            }

     41        }

     42
     43        private string _curEmail="";
     44        public string curEmail
     45        {
     46            get
     47            {
     48                return _curEmail;
     49            }

     50            set
     51            {
     52                _curEmail=value;
     53            }

     54        }

     55
     56
     57        private long _oppUserID;
     58        public long oppUserID
     59        {
     60            get
     61            {
     62                return _oppUserID;
     63            }

     64            set
     65            {
     66                _oppUserID=value;
     67            }

     68        }

     69
     70        private  string _oppUserName="";
     71        public string oppUserName
     72        {
     73            get
     74            {
     75                return _oppUserName;
     76            }

     77            set
     78            {
     79                _oppUserName=value;
     80            }

     81        }

     82
     83        private string _oppEmail="";
     84        public string oppEmail
     85        {
     86            get
     87            {
     88                return _oppEmail;
     89            }

     90            set
     91            {
     92                _oppEmail=value;
     93            }

     94        }

     95
     96        private string _subject ="";
     97        public string subject
     98        {
     99            get
    100            {
    101                return _subject;
    102            }

    103            set
    104            {
    105                _subject=value;
    106            }

    107        }

    108
    109        private string _body="";
    110        public string body
    111        {
    112            get
    113            {
    114                return _body;
    115            }

    116            set
    117            {
    118                _body=value;
    119            }

    120        }

    121    }

    122}

    123

    另一个同事写的封装类


      1using System;
      2
      3using System.Threading;
      4
      5using System.Messaging;
      6
      7 
      8
      9namespace Wapdm.SmsApp
     10
     11{
     12
     13     /// <summary>
     14
     15     /// <para>
     16
     17     /// A Logger implementation that writes messages to a message queue.
     18
     19     /// The default event formatter used is an instance of XMLEventFormatter
     20
     21     /// </para>
     22
     23     /// </summary>

     24
     25     public sealed class MsgQueue 
     26
     27     {
     28
     29 
     30
     31         private const string BLANK_STRING                   = "";
     32
     33         private const string PERIOD                         = @".private$";  //".";
     34
     35         private const string ELLIPSIS                       = "";    
     36
     37    
     38
     39         private string serverAddress;
     40
     41         private string queueName;
     42
     43         private string queuePath;
     44
     45         
     46
     47         private bool IsContextEnabled;  
     48
     49    
     50
     51         private MessageQueue queue;
     52
     53    
     54
     55         private object queueMonitor                         = new object();
     56
     57    
     58
     59         private MsgQueue() {}
     60
     61 
     62
     63         public static MsgQueue mq = null;
     64
     65         public static WaitHandle[] waitHandleArray = new WaitHandle[Util.MAX_WORKER_THREADS];
     66
     67     
     68
     69         public MsgQueue(string _serverAddress, string _queueName, string _summaryPattern) 
     70
     71         {
     72
     73              if ((_serverAddress == null) || (_queueName == null) || (_summaryPattern == null)) 
     74
     75              {
     76
     77                   throw new ArgumentNullException();
     78
     79              }

     80
     81              ServerAddress = _serverAddress;
     82
     83              QueueName = _queueName;
     84
     85              IsContextEnabled = true;             
     86
     87         }

     88
     89    
     90
     91         public MsgQueue(string _serverAddress, string _queueName) 
     92
     93         {
     94
     95              if ((_serverAddress == null) || (_queueName == null)) 
     96
     97              {
     98
     99                   throw new ArgumentNullException();
    100
    101              }

    102
    103              ServerAddress = _serverAddress;
    104
    105              QueueName = _queueName;
    106
    107              IsContextEnabled = true;
    108
    109         }

    110
    111    
    112
    113         public MsgQueue(string _queueName) 
    114
    115         {
    116
    117              if (_queueName == null
    118
    119              {
    120
    121                   throw new ArgumentNullException();
    122
    123              }

    124
    125              serverAddress = PERIOD;
    126
    127              QueueName = _queueName;
    128
    129              IsContextEnabled = true;             
    130
    131              if ( IsContextEnabled == false )
    132
    133                   throw new ArgumentNullException();
    134
    135         }

    136
    137    
    138
    139         public string ServerAddress 
    140
    141         {
    142
    143              get 
    144
    145              {
    146
    147                   return serverAddress;
    148
    149              }

    150
    151              set 
    152
    153              {
    154
    155                   if (value == null
    156
    157                   {
    158
    159                       value = PERIOD;
    160
    161                   }

    162
    163                   value = value.Trim();
    164
    165                   if (value.Equals(BLANK_STRING)) 
    166
    167                   {
    168
    169                       throw new ArgumentException("Invalid value (must contain non-whitespace characters)");
    170
    171                  }

    172
    173                   lock (queueMonitor) 
    174
    175                   {
    176
    177                       serverAddress = value;
    178
    179                       queuePath = serverAddress + '\' + queueName;
    180
    181                       InitializeQueue();
    182
    183                   }

    184
    185              }

    186
    187         }

    188
    189 
    190
    191         public string QueueName 
    192
    193         {
    194
    195              get 
    196
    197              {
    198
    199                   return queueName;
    200
    201              }

    202
    203              set 
    204
    205              {
    206
    207                   if (value == null
    208
    209                   {
    210
    211                       throw new ArgumentNullException();
    212
    213                   }

    214
    215                   value = value.Trim();
    216
    217                   if (value.Equals(BLANK_STRING)) 
    218
    219                   {
    220
    221                       throw new ArgumentException("Invalid value (must contain non-whitespace characters)");
    222
    223                   }

    224
    225                   lock (queueMonitor) 
    226
    227                   {
    228
    229                       queueName = value;
    230
    231                       queuePath = serverAddress + '\' + queueName;
    232
    233                       InitializeQueue();
    234
    235                   }

    236
    237              }

    238
    239         }

    240
    241    
    242
    243         private void InitializeQueue() 
    244
    245         {
    246
    247              lock (queueMonitor) 
    248
    249              {             
    250
    251                   if (queue != null
    252
    253                   {
    254
    255                       try { queue.Close(); } 
    256
    257                       catch {}
    258
    259                       queue = null;
    260
    261                   }

    262
    263 
    264
    265                   try 
    266
    267                   {
    268
    269                       if(!MessageQueue.Exists(queuePath))
    270
    271                            MessageQueue.Create(queuePath);
    272
    273                   }
     
    274
    275                   catch {}
    276
    277                   try 
    278
    279                   {
    280
    281                       queue = new MessageQueue(queuePath);
    282
    283                       queue.SetPermissions("EveryOne",MessageQueueAccessRights.FullControl);
    284
    285                       queue.Formatter = new XmlMessageFormatter(new Type[] {typeof(MoMsg)});
    286
    287                   }
     
    288
    289                   catch (Exception e) 
    290
    291                   {
    292
    293                       try { queue.Close(); } 
    294
    295                       catch {}
    296
    297                       queue = null;
    298
    299                       throw new ApplicationException("Couldn't open queue at '" + queuePath + "': " + e.GetType().FullName + ": " + e.Message);
    300
    301                   }

    302
    303 
    304
    305              }

    306
    307         }

    308
    309    
    310
    311         private  void AcquireResources() 
    312
    313         {
    314
    315              InitializeQueue();
    316
    317         }

    318
    319    
    320
    321         public  void ReleaseResources() 
    322
    323         {
    324
    325              lock (queueMonitor) 
    326
    327              {
    328
    329                   if (queue != null
    330
    331                   {
    332
    333                       try 
    334
    335                       {
    336
    337                            queue.Close();
    338
    339                       }
     
    340
    341                       catch {}
    342
    343                       queue = null;
    344
    345                   }

    346
    347              }
        
    348
    349         }

    350
    351    
    352
    353         //阻塞方式
    354
    355         public MoMsg Read( ) 
    356
    357         {
    358
    359              MoMsg _event = null;             
    360
    361              lock (queueMonitor) 
    362
    363              {
    364
    365                   if (queue == null
    366
    367                   {
    368
    369                       InitializeQueue();
    370
    371                   }

    372
    373                   try 
    374
    375                   {
    376
    377                       Message message = queue.Receive( new TimeSpan(0,0,1) );//等待10秒
    378
    379                       _event = (MoMsg) (message.Body);
    380
    381                       return _event;
    382
    383                   }

    384
    385                   catch (Exception ) 
    386
    387                   {
    388
    389                       try { queue.Close(); } 
    390
    391                       catch {}
    392
    393                       queue = null;
    394
    395                   }
                
    396
    397              }

    398
    399              return null;
    400
    401         }

    402
    403 
    404
    405         public void Write(MoMsg _event) 
    406
    407         {
    408
    409              if (_event == null
    410
    411              {
    412
    413                   return;
    414
    415              }

    416
    417              lock (queueMonitor) 
    418
    419              {
    420
    421                   try 
    422
    423                   {
    424
    425                       if (queue == null
    426
    427                       {
    428
    429                            InitializeQueue();
    430
    431                       }

    432
    433                   
    434
    435                       Message message = new Message();
    436
    437                       message.Priority = _event.Priority;
    438
    439                       message.Recoverable = true;
    440
    441                       message.Body = _event; //eventFormatter.Format(_event);
    442
    443 
    444
    445                       queue.Send(message);
    446
    447                   }

    448
    449                   catch (Exception e)
    450
    451                   {
    452
    453                       try { queue.Close(); } 
    454
    455                       catch {}
    456
    457                       queue = null;
    458
    459                       Util.Log.log("Couldn't write Message (" + e.GetType().FullName + ": " + e.Message + ")");
    460
    461                   }
                
    462
    463              }

    464
    465         }

    466
    467 
    468
    469         public static bool statusTest()
    470
    471         {
    472
    473              bool reValue = false;
    474
    475              try
    476
    477              {
    478
    479                   MessageEnumerator re = mq.queue.GetMessageEnumerator();
    480
    481                   bool rev = re.MoveNext();
    482
    483                   reValue = true;
    484
    485              }

    486
    487              catch
    488
    489              {
    490
    491                   reValue = false;
    492
    493              }

    494
    495 
    496
    497              return reValue;
    498
    499         }

    500
    501 
    502
    503         public static void startListen()
    504
    505         {
    506
    507              mq = new MsgQueue(Util.MqName);
    508
    509 
    510
    511              mq.queue.ReceiveCompleted +=new ReceiveCompletedEventHandler(queue_ReceiveCompleted);
    512
    513              
    514
    515              //异步方式,并发
    516
    517              for(int i=0; i<Util.MAX_WORKER_THREADS; i++)
    518
    519              {
    520
    521                   // Begin asynchronous operations.
    522
    523                   waitHandleArray[i] = 
    524
    525                       mq.queue.BeginReceive().AsyncWaitHandle;
    526
    527              }

    528
    529 
    530
    531              return;
    532
    533         }

    534
    535 
    536
    537         public static void stopListen()
    538
    539         {
    540
    541 
    542
    543              for(int i=0;i<waitHandleArray.Length;i++)
    544
    545              {
    546
    547                   try
    548
    549                   {
    550
    551                       waitHandleArray[i].Close();
    552
    553                   }

    554
    555                   catch
    556
    557                   {
    558
    559                       //忽略错误
    560
    561                   }

    562
    563              }

    564
    565 
    566
    567              try
    568
    569              {
    570
    571                  // Specify to wait for all operations to return.
    572
    573                   WaitHandle.WaitAll(waitHandleArray,1000,false);
    574
    575              }

    576
    577              catch
    578
    579              {
    580
    581                   //忽略错误
    582
    583              }

    584
    585         }

    586
    587 
    588
    589         private static void queue_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    590
    591         {
    592
    593              // Connect to the queue.
    594
    595              MessageQueue mqq = (MessageQueue)sender;
    596
    597 
    598
    599              // End the asynchronous Receive operation.
    600
    601              Message m = mqq.EndReceive(e.AsyncResult);
    602
    603 
    604
    605              Util.ProcessMo((MoMsg)(m.Body));
    606
    607 
    608
    609              if(Util.isRunning)
    610
    611              {
    612
    613                   // Restart the asynchronous Receive operation.
    614
    615                   mqq.BeginReceive();
    616
    617              }

    618
    619            
    620
    621              return;
    622
    623         }

    624
    625     }

    626
    627}

    628
    629来源:http://www.cnblogs.com/goody9807/archive/2007/05/18/546572.html
  • 相关阅读:
    sublime显示当前文件的编码格式
    关于jquery中html()、text()、val()的区别
    bit,Byte,B,KB,MB,GB
    python之序列操作
    编程常用密匙
    js数组操作
    ob函数的使用
    php使用zlib实现gzip压缩
    js兼容性汇总
    centos7下源码编译安装mysql5.7
  • 原文地址:https://www.cnblogs.com/gjhjoy/p/3531047.html
Copyright © 2011-2022 走看看