zoukankan      html  css  js  c++  java
  • 基于.NET Socket Tcp的发布-订阅框架

    基于.NET Socket Tcp的发布-订阅框架

    一、分布式消息总线

         在很多MIS项目之中都有这样的需求,需要一个及时、高效的的通知机制,即比如当使用者A完成了任务X,就需要立即告知使用者B任务X已经完成,在通常的情况下,开发人中都是在使用者B所使用的程序之中写数据库轮循代码,这样就会产品一个很严重的两个问题,第一个问题是延迟,轮循机制要定时执行,必须会引起延迟,第二个问题是数据库压力过大,当进行高频度的轮循会生产大量的数据库查询,并且如果有大量的使用者进行轮循,那数据库的压力就更大了。

         那么在这个时间,就需要一套能支持发布-订阅模式的分布式消息总线,那这个问题就可以很好的解决了,比如采用一些成熟的消息总线进行实现,比如MSMQ或者采用比如开源的NServiceBus的发布订阅机制就可以实现处理这种需求,其系统结构就会变成如下所示:

    2010-11-29-11-30-48

        本分布式消息总线,目前广泛的被应用于分布式缓存的更新通知,当在N百台客户短在使用缓存的过程之中,某个操作修改了缓存的数据,必须会导致其他终端缓存的失效,那么使用基于Socket的分布式消息总线之后,我们可以做了修改了即可实时通知,做到缓存数据保持最新,再比如医疗应用之中的危急值管理,当发现检验、检查危急值之后,需要及时通知病区启动声光报警系统等,提醒医护工作人员及相关领导做出相应的措施,再比如应用于异构系统整合,当检验系统做出检验报告,通过消息总线进行发布,HIS系统则即时会收到检验报告数据而实现系统的整合。

    二、基于Socket的实现

         目前能够实现发布订阅模式的开源产品非常之多,为什么还要制造轮子呢,其主要原因有以下几点

         1)像NServiceBus这种东西基于MSMQ,在大量的发布者-订阅者的情况下性能不佳。

         2)此类东西太过于庞大、不易使用和配置。

         3)学习成本过高。

         那为什么要使用Socket技术进行实现呢,其主要原因是有以下几点:

         1)使用高效的Socket通信技术,高效、支持更多的客户端。

         2)使用简单,不需要定义太多额外的东西,只需要定义主题和消息即可使用。

         目前本发布订阅框架是基于AgileEAS.NET SOA中间件平台Socket框架实现的,有关于些Socket框架的技术细节请参考AgileEAS.NET SOA 中间件平台.Net Socket通信框架-介绍AgileEAS.NET SOA 中间件平台.Net Socket通信框架-简单例子-实现简单的服务端客户端消息应答AgileEAS.NET SOA 中间件平台.Net Socket通信框架-完整应用例子-在线聊天室系统-下载配置AgileEAS.NET SOA 中间件平台.Net Socket通信框架-完整应用例子-在线聊天室系统-代码解析文章进行了解和学习。

         目前本发布订阅框架并直接集成于AgileEAS.NET SOA Socket通信框架之中并且随其一并发布,下面简单介绍一下其API:

    在本框架之中定义了一个消息总线接口IMessageBus:

       1: using System;
       2: using System.Collections.Generic;
       3: using System.Linq;
       4: using System.Text;
       5: using System.Collections;
       6:  
       7: namespace EAS.Messages
       8: {
       9:     /// <summary>
      10:     /// 消息总线接口定义。
      11:     /// </summary>
      12:     public interface IMessageBus : IDisposable
      13:     {
      14:         /// <summary>
      15:         /// 注册发布者。
      16:         /// </summary>
      17:         /// <param name="publisher">发布者。</param>
      18:         void AddPublisher(string publisher);
      19:  
      20:         /// <summary>
      21:         /// 注册发布者。
      22:         /// </summary>
      23:         /// <param name="publisher">发布者。</param>
      24:         /// <param name="topic">主题。</param>
      25:         void AddPublisher(string publisher, string topic);
      26:  
      27:         /// <summary>
      28:         /// 发布一条消息到总线。
      29:         /// </summary>
      30:         /// <param name="topic">主题。</param>
      31:         /// <param name="message">发布的消息。</param>
      32:         void Publish(string topic, object message);
      33:  
      34:         /// <summary>
      35:         /// 订阅消息。
      36:         /// </summary>
      37:         /// <param name="subscriber">订阅者。</param>
      38:         /// <param name="topic">主题。</param>
      39:         /// <param name="notifyHandler">订阅通知。</param>
      40:         void Subscribe(object subscriber, string topic, MessageNotifyHandler notifyHandler);
      41:  
      42:         /// <summary>
      43:         /// 订阅消息。
      44:         /// </summary>
      45:         /// <param name="subscriber">订阅者。</param>
      46:         /// <param name="friendName">订阅者名称,用于处理离线订阅。</param>
      47:         /// <param name="topic">主题。</param>
      48:         /// <param name="notifyHandler">订阅通知。</param>
      49:         void Subscribe(object subscriber,string friendName ,string topic, MessageNotifyHandler notifyHandler);
      50:  
      51:         /// <summary>
      52:         /// 订阅消息。
      53:         /// </summary>
      54:         /// <param name="subscriber">订阅者。</param>
      55:         /// <param name="friendName">订阅者名称,用于处理离线订阅。</param>
      56:         /// <param name="topic">主题。</param>
      57:         /// <param name="notifyHandler">订阅通知。</param>
      58:         /// <param name="changedHandler">发布者状态变化委托。</param>
      59:         void Subscribe(object subscriber, string friendName, string topic, MessageNotifyHandler notifyHandler,PublisherSstatusChangedHandler changedHandler);
      60:  
      61:         /// <summary>
      62:         /// 退订消息。
      63:         /// </summary>
      64:         /// <param name="subscriber">订阅者。</param>
      65:         void Unsubscribe(object subscriber);
      66:  
      67:         /// <summary>
      68:         /// 退订消息。
      69:         /// </summary>
      70:         /// <param name="subscriber">订阅者。</param>
      71:         /// <param name="topic">主题。</param>
      72:         void Unsubscribe(object subscriber, string topic);
      73:  
      74:         /// <summary>
      75:         /// 退订消息。
      76:         /// </summary>
      77:         /// <param name="subscriber">订阅者。</param>
      78:         /// <param name="friendName">订阅者名称,用于处理离线订阅。</param>
      79:         /// <param name="topic">主题。</param>
      80:         void Unsubscribe(object subscriber, string friendName, string topic);
      81:     }
      82: }

        IMessageBus就基于Socket发布订阅消息总线的灵魂接口,也是基唯一的发布者调用者功能入口,也就是说不管你是发布者还是订阅者都需要调用这个接口,如果你是发布者请调用IMessageBus接口的Publish方法向消息总线发布消息,如果是你订阅者请通过IMessageBus的订阅方法进行订阅,当你订阅了某个主题之后,有发布者发布该主题的消息,你即可以收到消息并调用订阅回调函数进行处理。

    三、实现一个简单的例子

         现在我们开始一个简单的应用消息总线的例子,本例子代码解决方案由下图4个项目组成:

    (`I6]J`YK1((6M3M8Q2}(5X

         其中:Demo.Messages项目定义发布者、订阅者所使用的消息对象和消息主题。

                    Demo.Publisher项目为发布者代码。

                    Demo.Subscriber项目为订阅者代码。

                    Demo.Server项目为服务端代码。

         在Demo.Messages项目之中,我们定义了一个消息Message:

       1: using System;
       2: using System.Collections.Generic;
       3: using System.Linq;
       4: using System.Text;
       5: using System.Xml.Serialization;
       6:  
       7: namespace Demo.Messages
       8: {
       9:     [Serializable]
      10:     public class Message
      11:     {
      12:         [XmlElement]
      13:         public Guid ID
      14:         {
      15:             get;
      16:             set;
      17:         }
      18:     }
      19: }

         消息Message很简单,只有一个属性ID,同时 还需要定义一个消息主题:

       1: using System;
       2: using System.Collections.Generic;
       3: using System.Linq;
       4: using System.Text;
       5:  
       6: namespace Demo.Messages
       7: {
       8:     public class Topics
       9:     {
      10:         public static readonly string DEMO_TOPIC = "演示消息";
      11:     }
      12: }

         我们定义了一个消息主题为“演示消息”。

         在Demo.Publisher项目之中,没有太多额外的代码,只有在Program.cs写了以下简单的调用代码:

       1: using System;
       2: using System.Collections.Generic;
       3: using System.Linq;
       4: using System.Text;
       5: using EAS.Messages;
       6:  
       7: namespace Demo.Publisher
       8: {
       9:     class Program
      10:     {
      11:         static void Main(string[] args)
      12:         {
      13:             var container = EAS.Objects.ContainerBuilder.BuilderDefault();
      14:             var bus = container.GetComponentInstance("MessageBus") as IMessageBus;
      15:             System.Console.WriteLine("Publisher");
      16:  
      17:             while (System.Console.ReadLine()!="exit")
      18:             {
      19:                 var m = new Messages.Message { ID = Guid.NewGuid() };
      20:                 bus.Publish(Demo.Messages.Topics.DEMO_TOPIC, m);
      21:                 System.Console.WriteLine(string.Format("Publish:{0}", m.ID));
      22:             }
      23:         }
      24:     }
      25: }

         从IOC容器获取一个消息总线IMessageBus对象,并调用Publish函数发布消息”bus.Publish(Demo.Messages.Topics.DEMO_TOPIC, m);“。

         当然了,使用了IOC容器,就离来开配置文件了,其App.config文件内容如下:

       1: <?xml version="1.0" encoding="utf-8"?>
       2: <configuration>
       3:   <configSections>
       4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />
       5:   </configSections>
       6:   <eas>
       7:     <objects>
       8:       <!--消息总线-->
       9:       <object name="MessageBus" assembly="EAS.MicroKernel" type="EAS.Sockets.Bus.SocketBus" LifestyleType="Singleton">
      10:         <property name="Url" type="string" value="socket.tcp://127.0.0.1:6606/"/>
      11:       </object>
      12:     </objects>
      13:   </eas>
      14: </configuration>

         在Demo.Subscriber项目之中,使用与Demo.Publisher一模一样的配置文件,其Program.cs代码如下:

       1: using System;
       2: using System.Collections.Generic;
       3: using System.Linq;
       4: using System.Text;
       5: using EAS.Messages;
       6:  
       7: namespace Demo.Subscriber
       8: {
       9:     class Program
      10:     {
      11:         static void Main(string[] args)
      12:         {
      13:             var container = EAS.Objects.ContainerBuilder.BuilderDefault();
      14:             var bus = container.GetComponentInstance("MessageBus") as IMessageBus;
      15:             System.Console.WriteLine("Subscriber");
      16:  
      17:             bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);
      18:             System.Console.ReadLine();
      19:         }
      20:  
      21:         static void MessageNotify(object m)
      22:         {
      23:             Demo.Messages.Message message = m as Demo.Messages.Message;
      24:             System.Console.WriteLine(string.Format("Subscribe:{0}", message.ID));
      25:         }
      26:     }
      27: }

         其中代码bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);:完成把消息订阅到MessageNotify通知函数之中。

         在Demo.Server项目之中,启动服务器并且开始接收订阅和发布:

       1: using System;
       2: using System.Collections.Generic;
       3: using System.Linq;
       4: using System.Text;
       5: using EAS.Sockets;
       6:  
       7: namespace Demo.Server
       8: {
       9:     class Program
      10:     {
      11:         static void Main(string[] args)
      12:         {
      13:             SocketServer socketServer = new SocketServer(128);
      14:             socketServer.Port = 6606;
      15:             socketServer.Logger = new EAS.Loggers.ConsoleLogger(); 
      16:             socketServer.Initialize();
      17:             System.Console.WriteLine("Server Starting...");
      18:             socketServer.StartServer();
      19:             System.Console.WriteLine("Server Startup!");
      20:             System.Console.ReadLine();
      21:         }
      22:     }
      23: }

         到此为止,所有代码均已完成,是不是很简单,接下来,我们跑起来验证一下效果。

    四、验证效果

         我们在编译输入目录Publish下先启动Demo.Server.exe,再启动两个Demo.Subscriber.exe,再启动一个Demo.Publisher.exe,在Demo.Publisher.exe控制台按回车键:

    image

    OK,搞定。

    五、源代码下载

         本程序的源代码已上传到服务器,请通过http://42.121.30.77/downloads/eas/Demo.Pub_Sub.rar进行下载,如果在开发过程之中想要了解更多有关Socket通信框架以及更多AgileEAS.NET SOA中间件平台的技术资源,请通过AgileEAS.NET SOA 网站:http://www.smarteas.net最新下载栏目进行下载。    

    六、问题反馈

         麻烦大家在通过视频进行学习的时候能及时把问题反馈给楼主,或者有什么需要改进的一些建议都请向楼主直接反馈,以下是联系方式:

    AgileEAS.NET SOA 网站:http://www.smarteas.net

    官方博客:http://eastjade.cnblogs.com

    楼主QQ:47920381,AgileEAS.NET

    QQ群:113723486(AgileEAS SOA 平台)/上限1000人

    199463175(AgileEAS SOA 交流)/上限1000人

    120661978(AgileEAS.NET 平台交流)/上限1000人

    邮件:james@agilelab.cn,mail.james@qq.com,

    电话:18629261335。

  • 相关阅读:
    服务端集成支付宝踩过的坑RSA、RSA2
    javascript预览图片——IT轮子系列(九)
    winform 写App.config配置文件——IT轮子系列(八)
    登录-添加页面
    登录-Login页面
    登录-控制器
    登录-控制器验证
    登录-dal
    登录-数据库模型
    登录-存储过程
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/3913691.html
Copyright © 2011-2022 走看看