zoukankan      html  css  js  c++  java
  • 使用WCF双工通讯实现发布订阅

    发布——订阅

    通俗一点的解释,就是推送。由服务端发布消息,所有订阅了这条消息的客户端,都会收到服务端广播的这条消息。

    WCF的双工机制的运行方式如下

    客户端发起请求->服务端收到请求并对客户端发起回调->客户端回复回调->服务端回复客户端请求

    发布订阅其实是双工的变种,它的原理大概是这样的

    客户端发起请求->服务端调用回调->服务端调用回调.....

    基本原理就这些,我们来用代码说话吧,现在瞅瞅,代码比说话都亲切....

    首先我们定义一个基础的WCF服务契约

    IPushService
    /// <summary>
        /// 推送服务契约
        /// 
        /// Tips:
        /// 契约提供两个服务,一个是订阅,一个是退订。
        /// 服务端会向订阅的客户端发布消息
        /// </summary>
        [ServiceContract(CallbackContract = typeof(IPushCallback))]
        public interface IPushService
        {
            /// <summary>
            /// 订阅服务
            /// </summary>
            [OperationContract(IsOneWay = true)]
            void Regist();
    
            /// <summary>
            /// 退订服务
            /// </summary>
            [OperationContract(IsOneWay = true)]
            void UnRegist();
        }

    在这里定义了两个行为,订阅和退订。

    同时还有一个回调契约如下

    IPushCallback
     /// <summary>
        /// 回调接口 IOC思想的体现
        /// </summary>
        public interface IPushCallback
        {
            [OperationContract(IsOneWay = true)]
            void NotifyMessage(string message);
        }

    在这里,我们对每一个行为都标记为OneWay,表示客户端在调用完服务之后不需要等待服务的回复,同理回调时服务端只管广播,同样不需要理会客户端

    广播的过程,就是服务端调用每一个连接的回调通道进行操作的过程,因此这里需要注意2点。

    1、有一个集合,用于维护回调通道。

    2、客户端与服务端建立的通道不能在调用之后就被回收,而是需要保持通道的正常状态

    在这里,我们建立一个ChannelManager类型,其中维护了一个回调通道的列表,并且提供了服务端操作的几个行为。这个类型以单例模式实现,保证唯一的通道列表。

    ChannelManager
      /// <summary>
        /// 通道管理
        /// </summary>
        public class ChannelManager
        {
            #region Fields
    
            /// <summary>
            /// 回调通道列表
            /// </summary>
            private List<IPushCallback> callbackChannelList = new List<IPushCallback>();
    
            /// <summary>
            /// 用于互斥锁的对象
            /// </summary>
            public static readonly object SyncObj = new object();
    
            #endregion
    
            #region Single
    
            private static readonly Lazy<ChannelManager> instance = new Lazy<ChannelManager>(() => new ChannelManager());
    
            public static ChannelManager Instance
            {
                get { return instance.Value; }
            }
    
            protected ChannelManager() { }
    
            #endregion
    
            #region Methods
            /// <summary>
            /// 将回调通道加入到通道列表中进行管理
            /// </summary>
            /// <param name="callbackChannel"></param>
            public void Add(IPushCallback callbackChannel)
            {
                if (callbackChannelList.Contains(callbackChannel))
                {
                    Console.WriteLine("已存在重复通道");
                }
                else
                {
                    lock (SyncObj)
                    {
                        callbackChannelList.Add(callbackChannel);
                        Console.WriteLine("添加了新的通道");
                    }
                }
            }
    
            /// <summary>
            /// 从通道列表中移除对一个通道的管理
            /// </summary>
            /// <param name="callbackChannel"></param>
            public void Remove(IPushCallback callbackChannel)
            {
                if (!callbackChannelList.Contains(callbackChannel))
                {
                    Console.WriteLine("不存在待移除通道");
                }
                else
                {
                    lock (SyncObj)
                    {
                        callbackChannelList.Remove(callbackChannel);
                        Console.WriteLine("移除了一个通道");
                    }
                }
            }
    
            /// <summary>
            /// 广播消息
            /// </summary>
            /// <param name="message"></param>
            public void NotifyMessage(string message)
            {
                if (callbackChannelList.Count > 0)
                {
                    //避免对callbackChannelList的更改对广播造成的影响
                    IPushCallback[] callbackChannels = callbackChannelList.ToArray();
    
                    foreach (var channel in callbackChannels)
                    {
                        try
                        {
                            //广播消息
                            channel.NotifyMessage(message);
    
                        }
                        catch
                        {
                            //对异常的通道进行处理
                            callbackChannelList.Remove(channel);
                        }
                    }
                }
            }
            #endregion
        }

    下面就是我们服务的实现了,服务的实现很简单,仅仅是捕获到客户端的回调通道,对集合进行操作。这里需要注意的是ServiceBehavior标记的InstanceContextMode属性的设置。我们需要为每一个单独的通道创建新的实例,但是在调用玩服务后,不对通道立刻进行回收,因此我们需要设置为InstanceContextModel.Single。

    PushService
     /// <summary>
        /// 服务的实现
        /// Tips:
        /// 实现发布订阅,要注意:每个信道在调用后不要回收,否则会在回调时报错
        /// </summary>
        [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]
        public class PushService : IPushService
        {
            public void Regist()
            {
                IPushCallback callbackChannel = OperationContext.Current.GetCallbackChannel<IPushCallback>();
                //添加到管理列表中
                ChannelManager.Instance.Add(callbackChannel);
    
            }
    
            public void UnRegist()
            {
                IPushCallback callbackChannel = OperationContext.Current.GetCallbackChannel<IPushCallback>();
                //从管理列表中移除
                ChannelManager.Instance.Remove(callbackChannel);
            }
        }

    这样我们的服务基本就完成了,置于客户端的调用,需要创建一个双工通道对象,并且将实现了回调契约的类型,传给InstanceContext属性。

    这篇文章很简单,没有涉及到对产生异常的通道的处理,也没有考虑到Silverlight调用时遇到的跨域问题,同时不支持HttpGet。

    文章的目的只有一个,就是尽可能简洁的体现发布订阅服务的原理。

    有兴趣的读者,可以对服务进行扩展,对服务在广播时可能产生的各种异常进行处理。

    由于文章的例子是基于TCP/IP的通迅方式,使用的是netTcpBinding的全双工模式,因此使用HttpGet的时候需要进行一些额外的设置。

    并且由于是自托管服务,寄宿在控制台应用程序中,因此跨域文件的提供方式也不太相同。

    下面提供完整的代码:

    SimplePush.rar

    另外提供一个Silverlight调用服务的例子,涉及HttpGet及跨域文件的提供

    Chat.rar

    这两个例子都是基于Vs2010 .Net4.0开发,Silverlight版本为5.0

  • 相关阅读:
    【Linux】解压分卷压缩的zip文件
    kafka数据清理
    在 Kubernetes 上安装 Gitlab CI Runner Gitlab CI 基本概念以及 Runner 的安装
    APM监控--(三)zipkin部署手册
    K8S使用NodePort类型Service
    kubernetes基本概念 pod, service
    rsyslog配置解析
    日志收集之rsyslog to kafka
    linux auditd审计的简单使用和理解
    Nginx的try_files指令使用实例
  • 原文地址:https://www.cnblogs.com/ShadowLoki/p/2663931.html
Copyright © 2011-2022 走看看