zoukankan      html  css  js  c++  java
  • 消息队列(MSMQ)实现多服务器应用程序之间消息实时交互

    我所介绍的例子是利用微软的消息队列(msmq)实现多个服务器之间消息实时传递。

    应用程序:基于dotnet平台采用WinForm+Webservice开发的应用程序。

    每个地区都有自己的数据库和Webservice服务器。

    Webservice服务器有很多台,这样多个服务器上用户互相交流就成了问题。思前想后采用了msmq,设计思想如下:

    例子:A服务器用户user1发送消息给B服务器上的user2

    1、首先是数据库结构是一样的使用sql2005同步用户信息表。
    2、在每台Webservice服务器上安装msmq。
    3、当user1发送消息给user2时,判断user2所在服务器,如果user2和发送者不在同一个服务器就使用msmq传递消息给B服务器。
    4、每个Webservice服务器都有一个消息接收服务程序,用来侦测本服务器消息队列里的消息,反序列化消息内容,写入数据库。
    5、user2直接读取数据库就可以了。
    6、实际上就是通过msmq在多个数据库服务器之间消息传递。
    下面是消息接收服务程序的源代码:


    using System;
    using System.Collections;
    using System.ComponentModel;
    using System.Data;
    using System.Diagnostics;
    using System.ServiceProcess;
    using System.Threading;
    using System.Messaging;
    using wzh.Db;//数据库操作类

    ///<summary>
    ///名称:消息服务程序
    ///用途:实时侦测远程机器发送到本机的消息队列的消息,写入本机数据库。
    ///作者:wzh
    ///编写日期:2008-9-28
    ///修改日期:wzh于2008-10-10修改了部分BUG
    ///</summary>
    namespace wzh_MsgService
    {
        
    public class MsgService : System.ServiceProcess.ServiceBase
        {
            
    /// <summary> 
            
    /// 必需的设计器变量。
            
    /// </summary>
            private System.ComponentModel.Container components = null;
            
    private bool servicePaused;
            
    private string mPath = null;// = @".\Private$\MsgQueue
            public MsgService()
            {
                
    // 该调用是 Windows.Forms 组件设计器所必需的。
                InitializeComponent();

                
    // TODO: 在 InitComponent 调用后添加任何初始化
            }

            
    // 进程的主入口点
            static void Main()
            {
                System.ServiceProcess.ServiceBase[] ServicesToRun;
        
                
    // 同一进程中可以运行多个用户服务。若要将
                
    //另一个服务添加到此进程,请更改下行
                
    // 以创建另一个服务对象。例如,
                
    //
                
    //   ServicesToRun = New System.ServiceProcess.ServiceBase[] {new Service1(), new MySecondUserService()};
                
    //
                ServicesToRun = new System.ServiceProcess.ServiceBase[] { new MsgService() };

                System.ServiceProcess.ServiceBase.Run(ServicesToRun);
            }

            
    /// <summary>
            
    /// 调试用在项目属性里,将输出类型(Output Type)改成Console Application,将Startup Object改成Namespace.ServiceName
            
    /// </summary>
            
    /// <param name="args"></param>
    //        public static void Main(string[] args)
    //        {
    //            MsgService s = new MsgService();
    //            s.OnStart(args);
    //            Console.ReadLine();
    //            s.OnStop();
    //        }
            /// <summary> 
            
    /// 设计器支持所需的方法 - 不要使用代码编辑器 
            
    /// 修改此方法的内容。
            
    /// </summary>
            private void InitializeComponent()
            {
                components 
    = new System.ComponentModel.Container();
                
    this.ServiceName = "QZCPNET_MsgService";
                
    this.CanPauseAndContinue = true;
                
    this.CanStop = true;
            }

            
    /// <summary>
            
    /// 清理所有正在使用的资源。
            
    /// </summary>
            protected override void Dispose( bool disposing )
            {
                
    if( disposing )
                {
                    
    if (components != null
                    {
                        components.Dispose();
                    }
                }
                
    base.Dispose( disposing );
            }

            
    /// <summary>
            
    /// 设置具体的操作,以便服务可以执行它的工作。
            
    /// </summary>
            protected override void OnStart(string[] args)
            {
                
    // TODO: 在此处添加代码以启动服务。
                
                servicePaused
    =false;
                mPath 
    = GetXmlData("//msgmsmqpath");
                Thread newThread
    =new Thread(new ThreadStart(RecevieMessage));

                newThread.IsBackground
    =true;
                newThread.Start();

            }
            
    /// <summary>
            
    /// 接收本地服务器上的MSMQ里的消息
            
    /// </summary>
            private void RecevieMessage()
            {
                
    if(!servicePaused)
                {
                    
    if(! MessageQueue.Exists(mPath))
                    {
                        MessageQueue.Create(mPath);
                        MessageQueue mqTemp 
    = new MessageQueue(mPath);
                        mqTemp.SetPermissions(
    "Everyone", MessageQueueAccessRights.FullControl);
                    }
                    MessageQueue MyQueue 
    = new MessageQueue(mPath);
                    MyQueue .Formatter 
    =new XmlMessageFormatter (new Type []{typeof (DataSet)});
                    MyQueue.ReceiveCompleted 
    += new ReceiveCompletedEventHandler(MyReceiveCompleted);
                    MyQueue.BeginReceive();
                }
            }
            
    private  void MyReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult)
            {
                MessageQueue mq 
    = (MessageQueue)source;
                
    try
                {
                    
                    Message ms 
    = mq.EndReceive(asyncResult.AsyncResult);
           
                    DataSet ds 
    = (DataSet)ms.Body;
                    ProcessMsg(ds);
                    
                }
                
    catch(Exception ex)
                {
                    
    if(ex.Source != "System.Messaging")
                        WriteSysLog(ex.ToString());
                }
                
    finally
                {
                    GC.Collect();
                    mq.BeginReceive();
                }
                
    return
            }
            
            
    /// <summary>
            
    /// 消息处理
            
    /// </summary>
            
    /// <param name="ds"></param>
            private void ProcessMsg(DataSet ds)
            {            
                
    //插入新消息
                try
                {
                    clsDBExec clsDb
    =new clsDBExec(GetXmlData("//conn") ,"消息管理-SaveMsg"); 
                    
    string strSql="";
                    DataRow  dr
    =ds.Tables[0].Rows[0];  
                    strSql
    ="INSERT INTO SM_MsgessSend ( MsgSId, MsgTitle, MsgSender,MsgSenderJgCode, MsgSenderJg, MsgAderrs,MsgSAderrs ,MsgContent, "+
                        
    " MsgAccessName, MsgAccessDes, MsgHz)"+
                        
    " VALUES ("+dr["MsgSId"].ToString()+",'"+ dr["MsgTitle"].ToString()+"','"+ dr["MsgSender"].ToString() +"','"+ dr["MsgSenderJgCode"].ToString() + "','" + dr["MsgSenderJg"].ToString() +"','"+ dr["MsgAderrs"].ToString() +"','"+
                        dr[
    "MsgSAderrs"].ToString() +"','"+dr["MsgContent"].ToString().Replace("'","''")  +"','"+ dr["MsgAccessName"].ToString().Replace("'","''"+"','"+  dr["MsgAccessDes"].ToString() +"','"+dr["MsgHz"].ToString() + "');";
                    strSql
    +="SELECT @@IDENTITY ";
                    
    string strV=clsDb.getOneVal(strSql);
                
                    
    //发入发件箱
                    strSql="INSERT INTO SM_MsgessUser(UserId, MsgId,MsgFolder,MsgIsRead ) values('"+ dr["MsgSender"].ToString()  +"',"+ strV +",'发件箱',2)";
                    clsDb.execNonQuery(strSql); 
                    
    //插入到用户
                    string strAderrs=dr["MsgSAderrs"].ToString();
                    
    string [] arrStr=strAderrs.Split(','); 
                    strSql
    ="";
                    
    foreach(string strUser in arrStr)
                    {
                        
    if (strUser.ToUpper()=="ALL" )
                        {
                            strSql
    ="INSERT INTO SM_MsgessUser(UserId, MsgId,MsgFolder ) SELECT UserName,"+ strV +",'收件箱' FROM SM_UserId";
                            
    break;
                        }

                        
    if (strUser.ToUpper()=="ALLONLINE" )
                        {
                            strSql
    ="INSERT INTO SM_MsgessUser(UserId, MsgId,MsgFolder ) SELECT DISTINCT UserId,"+ strV +",'收件箱' FROM SM_OnlineUser";
                            
    break;
                        }

                        
    if ((strUser.ToUpper()!="ALL" ) && strUser.ToUpper()!="ALLONLINE")
                        {
                            strSql
    +="INSERT INTO SM_MsgessUser(UserId, MsgId ,MsgFolder) SELECT UserName,"+ strV +",'收件箱' FROM SM_UserId where UserName='"+ strUser  +"';";
                        }

                    }
                    clsDb.execNonQuery(strSql);  
                }
                
    catch(Exception ex)
                {
                    WriteSysLog(ex.Message);
                }
            }
            
    /// <summary>
            
    /// 停止此服务。
            
    /// </summary>
            protected override void OnStop()
            {
                
    // TODO: 在此处添加代码以执行停止服务所需的关闭操作。
                servicePaused=true;
            }
            
    /// <summary>
            
    /// 获取XML文件内容
            
    /// </summary>
            
    /// <param name="nodepath"></param>
            
    /// <returns></returns>
            private string GetXmlData(string nodepath)
            {
                System.Xml.XmlDocument dom
    =new System.Xml.XmlDocument();
                dom.Load(
    @"e:\net\service\Serverconfig.xml");
                
    return dom.SelectSingleNode(nodepath).InnerText;
                
            }
            
    /// <summary>
            
    /// 写日志
            
    /// </summary>
            
    /// <param name="strMsg"></param>
            private void WriteSysLog(string strMsg)
            {
                EventLog objLog
    =new EventLog("Application") ;
                objLog.Source 
    ="信息网络管理系统--消息服务";
                objLog.WriteEntry(strMsg,EventLogEntryType.Error );
            }
        }
    }
  • 相关阅读:
    225. 用队列实现栈
    415. 字符串相加
    rabbitmq的基本使用
    3. 无重复字符的最长子串
    面试题59
    面试题30. 包含min函数的栈
    面试题09. 用两个栈实现队列
    287. 寻找重复数
    1137. 第 N 个泰波那契数
    70. 爬楼梯
  • 原文地址:https://www.cnblogs.com/lds85930/p/1488670.html
Copyright © 2011-2022 走看看