zoukankan      html  css  js  c++  java
  • Publisher/Subscriber 订阅-发布模式

    Publisher/Subscriber 订阅-发布模式

    本博后续将陆续整理这些年做的一些预研demo,及一些前沿技术的研究,与大家共研技术,共同进步。

    关于发布订阅有很多种实现方式,下面主要介绍WCF中的发布订阅,主要参考书籍《Programming WCF Services》,闲话不多说进入正题。使用传统的双工回调(例子 http://www.cnblogs.com/artech/archive/2007/03/02/661969.html)实现发布订阅模式存在许多缺陷,主要问题是,它会引入发布者和订阅者之间的高度耦合。订阅者必须先知道发布者在哪里,然后才能订阅它们,任何订阅者不知道的服务都无法通知事件的订阅者,部署好的应用程序中添加新的订阅者(或者移除已经存在的订阅者)是十分困难的事情。大致相同的是发布者也只能给它知道的订阅者发送通知消息,同时发布者还需要管理订阅者列表,这些与业务服务无关,这些逻辑增加了发布者的复杂度,另外在安全方面也存在订阅者与发布者也存在耦合,而且在发布者进程宕机时,所有订阅都会丢失。

    要解决上面提及的问题最常见的解决方案就是发布-订阅模式(Publish-Subscribe 【OBSERVER】),如图D-1所示。

    这里将订阅者区分为临时订阅者与持久订阅者,持久订阅者可以保存到磁盘上,当事件触发时可以通知订阅者,也可以很方便的通过传递回调使用回调机制,对于持久订阅者,需要记录订阅者地址,当触发事件时,发布服务将会调用持久订阅者地址 ,然后传递事件,因持久订阅者保存了订阅者地址至数据库或磁盘,因此当发布服务宕机时提高了管理性。

    以上主要介绍理论,下面进入实践阶段,首先下载ServiceModelEx(Programming WCF Services 里面书籍作者提供的简化WCF编程的动态库),  https://github.com/CaseyBurns/ServiceModelEx,我们暂时不需要服务总线所以我们引入ServiceModelEx (.NET 4.0 no service bus) ,建好测试服务端(这里为了方便测试使用GUI 应用程序作为宿主),客户端。

    管理临时订阅

    例子D-1使用ServiceModelEx 提供的ISubscriptionService接口管理临时订阅者

    复制代码
       [ServiceContract]
       public interface ISubscriptionService
       {
          [OperationContract]
          void Subscribe(string eventOperation);
    
          [OperationContract]
          void Unsubscribe(string eventOperation);
       }
    复制代码

    作为通用接口它不关心回调契约,然后添加临时订阅者契约继承通用接口,并设置回调契约

        [ServiceContract(CallbackContract = typeof(IMyEvents))]
        public interface IMySubscriptionService : ISubscriptionService
        {
        }

    回调契约

    复制代码
        [ServiceContract]
        public interface IMyEvents
        {
            [OperationContract(IsOneWay = true)]
            void OnEvent1();
            [OperationContract(IsOneWay = true)]
            void OnEvent2(int number);
            [OperationContract(IsOneWay = true)]
            void OnEvent3(int number, string text);
        }
    复制代码

    实现临时订阅服务.

        [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
        public class MySubscriptionService : SubscriptionManager<IMyEvents>, IMySubscriptionService,IPersistentSubscriptionService
        {
           
        }

    这里有几点需要注意:服务类型必须是会话服务(InstanceContextMode = InstanceContextMode.PerCall),会话服务才能够使用回调,另外ServiceModelEx 中的类 SubscriptionManager<T> 已经实现了通用接口所定义的添加订阅者与取消订阅接口,所以这里不需要我们再写任何代码。IPersistentSubscriptionService 作为持久订阅者接口,SubscriptionManager<T> 也实现了该接口,接下来会讲到。

    配置文件配置发布订阅者服务

    复制代码
      <system.serviceModel>
        <serviceHostingEnvironment multipleSiteBindingsEnabled="true" />
        <bindings>
          <netTcpBinding>
            <binding name="NetTcpBinding_IService1" receiveTimeout="00:25:00"
              maxBufferSize="2147483647" maxReceivedMessageSize="2147483647" transactionFlow="true">
              <reliableSession inactivityTimeout="00:25:00" enabled="true" />
              <security mode="None" />
            </binding>
          </netTcpBinding>
        </bindings>
        <services>
          <service behaviorConfiguration="MyBehavior" name="Service.sub.MySubscriptionService">
            <host>
              <baseAddresses>
                <add baseAddress="net.tcp://localhost:8022/"/>
                <add baseAddress="http://localhost:8023/"/>
              </baseAddresses>
            </host>
            <endpoint name="Sub" address="Sub" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1"
              contract="Service.sub.IMySubscriptionService" />
            <endpoint name="PersistentSub" address="PersistentSub" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1"
                contract="ServiceModelEx.IPersistentSubscriptionService" />
          </service>
          <service behaviorConfiguration="MyBehavior" name="Service.pub.MyPublishService">
            <host>
              <baseAddresses>
                <add baseAddress="net.tcp://localhost:8022/MyPub/"/>
                <add baseAddress="http://localhost:8023/MyPub/"/>
              </baseAddresses>
            </host>
            <endpoint name="PubMyEvents" address="PubMyEvents" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1"
              contract="Service.sub.IMyEvents" />
          </service>
        </services>
        <behaviors>
          <serviceBehaviors>
            <behavior name="MyBehavior">
              <serviceMetadata httpGetEnabled="true"/>
              <serviceThrottling maxConcurrentCalls="1000" maxConcurrentSessions="10000" />
              <serviceDebug includeExceptionDetailInFaults="true" />
            </behavior>
          </serviceBehaviors>
        </behaviors>
      </system.serviceModel>
    复制代码

    其中Service.pub.MyPublishService 服务为发布者服务配置 接下来会讲到。

    这样临时订阅者就实现了,接下来看持久订阅者.持久订阅者的通用接口使用ServiceModelEx中定义的IPersistentSubscriptionService

    复制代码
       [ServiceContract]
       public interface IPersistentSubscriptionService
       {
          [OperationContract(Name = "SubscribePersistent")]
          [TransactionFlow(TransactionFlowOption.Allowed)]
          void Subscribe(string address,string eventsContract,string eventOperation);
    
          [OperationContract(Name = "UnSubscribePersistent")]
          [TransactionFlow(TransactionFlowOption.Allowed)]
          void Unsubscribe(string address,string eventsContract,string eventOperation);
    
          [OperationContract]
          [TransactionFlow(TransactionFlowOption.Allowed)]
          PersistentSubscription[] GetAllSubscribers();
    
          [OperationContract]
          [TransactionFlow(TransactionFlowOption.Allowed)]
          PersistentSubscription[] GetSubscribersToContract(string eventsContract);
    
          [OperationContract]
          [TransactionFlow(TransactionFlowOption.Allowed)]
          string[] GetSubscribersToContractEventType(string eventsContract,string eventOperation);
    
          [OperationContract]
          [TransactionFlow(TransactionFlowOption.Allowed)]
          PersistentSubscription[] GetAllSubscribersFromAddress(string address);
       }
    复制代码

    这里我添加了对[OperationContract(Name = "SubscribePersistent")] 将添加订阅方法进行重命名,以区别临时订阅接口的Subscribe方法.持久订阅不需要回调函数,接下来实现持久订阅同样简单,上面已经贴过代码,ServiceModelEx中的SubscriptionManager<T>同样已经实现了IPersistentSubscriptionService接口,这样临时订阅与持久订阅完成,接下来看发布服务。

    发布服务应该支持与订阅服务一样的事件契约,这是订阅服务与发布服务唯一的连接点,使用IMyEvents 作为例子,另外ServiceModelEx提供了用于简化发布服务的帮助类PublishService<T>

    复制代码
        [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
        public class MyPublishService : PublishService<IMyEvents>, IMyEvents
        {
            public void OnEvent1()
            {
                FireEvent();
            }
    
            public void OnEvent2(int number)
            {
                FireEvent(number);
            }
    
            public void OnEvent3(int number, string text)
            {
                FireEvent(number, text);
            }
        }
    复制代码

    其中FireEvent()被用作激发所有订阅者的事件,无论是临时还是持久订阅者,帮助类PublishService<T>已经做了实现,接下来配置发布服务

    复制代码
          <service behaviorConfiguration="MyBehavior" name="Service.pub.MyPublishService">
            <host>
              <baseAddresses>
                <add baseAddress="net.tcp://localhost:8022/MyPub/"/>
                <add baseAddress="http://localhost:8023/MyPub/"/>
              </baseAddresses>
            </host>
            <endpoint name="PubMyEvents" address="PubMyEvents" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1"
              contract="Service.sub.IMyEvents" />
          </service>
    复制代码

    这样发布服务完成,使用Gui应用程序作为宿主,可以使用ServiceModelEx 中ServiceHost<T> 作为发布的帮助类 。

    复制代码
        public partial class Form1 : Form
        {
            public Form1()
            {
                InitializeComponent();
            }
            ServiceHost<MyPublishService> hostPub = new ServiceHost<MyPublishService>();
            ServiceHost<MySubscriptionService> host = new ServiceHost<MySubscriptionService>(); 
            private void Form1_Load(object sender, EventArgs e)
            {
                try
                {
                    host.EnableMetadataExchange();
                    host.Open();
    
                    hostPub.EnableMetadataExchange();
                    hostPub.Open();
                }
                catch (Exception ex)
                {
                    throw;
                }
    
            }
    
          
            private void Form1_FormClosed(object sender, FormClosedEventArgs e)
            {
                try
                {
                    host.Close();
                }
                catch (Exception)
                {
                    try
                    {
                        host.Abort();
                    }
                    catch (Exception)
                    {
    
                    }
                }
                try
                {
                    hostPub.Close();
                }
                catch (Exception)
                {
                    try
                    {
                        hostPub.Abort();
                    }
                    catch (Exception)
                    {
    
                    }
                }
            }
        }
    复制代码

    其中 host.EnableMetadataExchange(); 能够帮助发布元数据,不需要再到配置中进行配置,服务配置好后接下来看客户端使用,

    客户端可以直接添加服务引用生成服务代理,但是一般本人喜欢使用SvcUtil工具生成代理,或者干脆直接使用通道进行服务调用,后者更为我所喜爱,因为这样代码阅读行更强,更简练。例子中偷了下懒,直接添加服务引用,然后用通道调用服务,这样剩了点复制配置或者接口的功夫,所以看到例子不要感到奇怪,全因懒造成的,废话不多说,接下来看临时订阅客户端调用

    复制代码
            DuplexChannelFactory<IMySubscriptionService, IMySubscriptionServiceCallback> channelFactory = null;
            IMySubscriptionService proxy = null;
            private void btnSub_Click(object sender, EventArgs e)
            {
                MyEventsCallback callBack = new MyEventsCallback();
                callBack.OnResultEvent += CallBack_OnResultEvent;
                InstanceContext<IMySubscriptionServiceCallback> instanceContext = new InstanceContext<IMySubscriptionServiceCallback>(callBack);
                channelFactory = new DuplexChannelFactory<IMySubscriptionService, IMySubscriptionServiceCallback>(instanceContext, "Sub");
                proxy = channelFactory.CreateChannel();
                proxy.Subscribe(null);
            }
    复制代码

    这里使用ServiceModelEx 中提供的DuplexChannelFactory<T,C>  类型安全的双向通道类创建代理,MyEventsCallback 实现回调接口,具体实现如下:

    复制代码
        internal class MyEventsCallback : IMySubscriptionServiceCallback
        {
            SynchronizationContext sc = SynchronizationContext.Current;
            public event EventHandler<EventsCallbackArgs> OnResultEvent;
            public void OnEvent1()
            {
                sc.Post(result =>
                {
                    EventsCallbackArgs e = new EventsCallbackArgs() { Msg = string.Concat("OnEvent1", System.Environment.NewLine) };
                    e.Raise(this, ref OnResultEvent);
                }, null);
            }
    
            public void OnEvent2(int number)
            {
                sc.Post(result =>
                {
                    EventsCallbackArgs e = new EventsCallbackArgs() { Msg = string.Concat("OnEvent2:", number, System.Environment.NewLine) };
                    e.Raise(this, ref OnResultEvent);
                }, null);
            }
    
            public void OnEvent3(int number, string text)
            {
                sc.Post(result =>
                {
                    EventsCallbackArgs e = new EventsCallbackArgs() { Msg = string.Concat("OnEvent3:", number, "text:", text + System.Environment.NewLine) };
                    e.Raise(this, ref OnResultEvent);
                }, null);
            }
        }
    复制代码
    复制代码
        public static class EventArgExtensions
        {
            public static void Raise<TEventArgs>(this TEventArgs e, Object sender, ref EventHandler<TEventArgs> eventDelegate) where TEventArgs : EventArgs
            {
                EventHandler<TEventArgs> temp = Interlocked.CompareExchange(ref eventDelegate, null, null);
                if (temp != null) temp(sender, e);
            }
        }
    复制代码

    SynchronizationContext 上下文提供post方法调用gui线程更新ui,e.Raise 使用扩展以线程安全方式调用事件,客户端调用订阅者就完成了,别忘了关闭代理,接下来看客户端调用发布者

    客户端调用发布服务:

    复制代码
      public partial class PubMessageForm : Form
        {
            public PubMessageForm()
            {
                InitializeComponent();
            }
            ChannelFactory<IMyEvents> channelFactory = null;
            IMyEvents proxy = null;
            private void btnStartPub_Click(object sender, EventArgs e)
            {
                channelFactory = new ChannelFactory<IMyEvents>("PubMyEvents");
                proxy = channelFactory.CreateChannel();
            }
    
            private void PubMessageForm_FormClosed(object sender, FormClosedEventArgs e)
            {
                try
                {
                    using (proxy as IDisposable)
                    {
    
                    }
                    channelFactory.Close();
                }
                catch
                {
                    channelFactory.Abort();
                }
            }
    
            private void btnPub_Click(object sender, EventArgs e)
            {
                proxy.OnEvent1();
            }
    
            private void btnPub2_Click(object sender, EventArgs e)
            {
                proxy.OnEvent2(2);
            }
    
            private void btnPub3_Click(object sender, EventArgs e)
            {
                proxy.OnEvent3(3, txtPubMessage.Text);
            }
    
            private void PubMessageForm_Load(object sender, EventArgs e)
            {
    
            }
        }
    复制代码

    使用ChannelFactory<T> 通道调用发布服务

    这样WCF发布订阅服务就完成了,另外如果发布服务或订阅服务不需要同步绑定,可以考虑使用msmq ,这样发布-订阅模式兼具松耦合和无连接系统的优势。

    需要注意的是队列化发布-订阅服务不支持临时订阅,需要使用持久订阅,具体实现在此不多讲,另外还可以结合服务发现实现另外一种模式的发布订阅模式,具体可以参考书籍《Programming WCF Services》。

     Demo下载 http://files.cnblogs.com/files/skystar/Demo.7z

    书中自有黄金屋,书中自有颜如玉

  • 相关阅读:
    Web API入门二(实例)
    Web API 入门一
    模板编程
    Unity3D中的AI架构模型
    Linux系列
    LCS记录
    hadoop使用问题
    AOP之Castle DynamicProxy 动态代理
    python 之readability与BeautifulSoup
    django rest_framework--入门教程3
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/5654794.html
Copyright © 2011-2022 走看看