zoukankan      html  css  js  c++  java
  • WCF后续之旅(10): 通过WCF Extension实现以对象池的方式创建Service Instance

    我们知道WCF有3种典型的对service instance进行实例化的方式,他们分别与WCF的三种InstanceContextMode相匹配,他们分别是PerCall,PerSession和Single。PerCall为每次service invocation创建一个新的service instance; 而PerSession则让一个service instance处理来自通过各Session(一般是同一个proxy对象)的调用请求;而Single则是用同一个service instance来处理所有的调用请求。SOA的一个原则是创建无状态的service(stateless service),PerCall应该是我们经常使用的实例化方式,尽管PerSession是默认的InstanceContextMode。

    但是对于PerCall这种实例化方式来说,为每次service请求都创建新的service instance,有时候显得有点极端,频繁的对象创建会对系统的性能造成一定的影响。我们能够以池的机制(Pooling)进行对象的获取和创建呢:当service调用请求抵达service端,先试图从池中获取一个没有被使用的service instance,如何找到,直接获取该对象;否则创建新的对象。当service instance对象执行完毕,将对象释放到池中以供下次service 调用使用。

    一、实现原理

    我们今天就来试着实现这样的service instance提供机制。主要的实现原理是:让所有的service实现一个公共的interface(IPooledObject),定义了IsBusy的属性表明该对象当前是否正在被使用;为每个service type维护一个weak reference列表,每个weak reference对应一个确定的service instance,我们姑且将该weak reference列表成为该service type对应的对象池(object pool);为了处理service的调用需要提供一个确定的service instance的时候,遍历对象池,通过weak reference的Target属性找出一个可用的service instance(IsBusy=false)。如何顺利找到这样的service instance,则将其从对象池取出,将IsBusy属性设为true;如何没有找到,则通过反射创建一个新的service instance,将IsBusy设为true,同时利用weak reference将其包装,并将该weak reference加入到对象池中,最后返回该service instance用于处理service 调用。当service 调用结束,不是直接将其dispose掉,而是将其释放回对象池,供后续的service调用使用。

    由于我们通过weak reference来实现对象池,weak reference引用的service instance是可以被GC回收的,这样做的好处是充分利用的GC的垃圾回收功能,避免不需要的service instance常驻内容,带来不必要的内存压力。此外,正是因为weak reference引用的service instance是可以被GC回收,我们需要一个后台的任务定期地将已经被回收的weak reference清除掉。

    和本系列前两篇文章(WCF和Unity Appliation Block集成;WCF和Policy Injection Application Block集成)一样,涉及的是service instance的提供的问题,所以,我们也是通过自定义InstanceProvider来实现以对象池的机制创建service instance的目的。

    四、PooledInstnaceProvider的创建

    在创建我们自定义的InstanceProvider之前,我们先来介绍几个辅助的class:

    I、IPooledObject

       1: namespace Artech.WCFExtensions
       2: {
       3:     public interface IPooledObject
       4:     {
       5:         bool IsBusy
       6:         { get; set; }
       7:     }
       8: } 

    由于我们要判断service instance是否可用,我们让所有的service type实现IPooledObject interface。IPooledObject 仅仅定义一个bool类型的属性:IsBusy。通过该属性判断service instance是否正在被使用。

    II、WeakReferenceCollection和WeakReferenceDictionary

       1: namespace Artech.WCFExtensions
       2: {
       3:     public class WeakReferenceCollection:List<WeakReference>
       4:     {} 
       5:  
       6:     public class WeakReferenceDictionary : Dictionary<Type, WeakReferenceCollection>
       7:     {}
       8: } 

    WeakReferenceCollection仅仅是WeakReference的列表,WeakReferenceDictionary 则是key为Type,value为WeakReferenceCollection的dictionary。在提供service instance的时候,就是根据service type为key找到对应的WeakReferenceCollection。

    III、PooledInstanceLocator

       1: namespace Artech.WCFExtensions
       2: {
       3:     public static class PooledInstanceLocator
       4:     {
       5:         internal static WeakReferenceDictionary ServiceInstancePool
       6:         { get; set; } 
       7:  
       8:         static PooledInstanceLocator()
       9:         {
      10:             ServiceInstancePool = new WeakReferenceDictionary();
      11:         } 
      12:  
      13:        public static IPooledObject GetInstanceFromPool(Type serviceType)
      14:         {
      15:             if(!serviceType.GetInterfaces().Contains(typeof(IPooledObject)))
      16:             {
      17:                 throw new InvalidCastException("InstanceType must implement Artech.WCFExtensions.IPooledInstance");
      18:             } 
      19:  
      20:             if (!ServiceInstancePool.ContainsKey(serviceType))
      21:             {
      22:                 ServiceInstancePool[serviceType] = new WeakReferenceCollection();
      23:             } 
      24:  
      25:             WeakReferenceCollection instanceReferenceList = ServiceInstancePool[serviceType] ; 
      26:  
      27:             lock (serviceType)
      28:             {
      29:                 IPooledObject serviceInstance =null;
      30:                 foreach (WeakReference weakReference in instanceReferenceList)
      31:                 {
      32:                     serviceInstance = weakReference.Target as IPooledObject;
      33:                     if (serviceInstance != null && !serviceInstance.IsBusy)
      34:                     {
      35:                         serviceInstance.IsBusy = true;
      36:                         return serviceInstance;
      37:                     }
      38:                 } 
      39:  
      40:                 serviceInstance = Activator.CreateInstance(serviceType) as IPooledObject;
      41:                 serviceInstance.IsBusy = true;
      42:                 instanceReferenceList.Add(new WeakReference(serviceInstance));
      43:                 return serviceInstance;
      44:             }            
      45:         } 
      46:  
      47:        public static void Scavenge()
      48:        {
      49:            foreach (Type serviceType in ServiceInstancePool.Keys)
      50:            {
      51:                lock (serviceType)
      52:                {
      53:                    WeakReferenceCollection instanceReferenceList = ServiceInstancePool[serviceType];
      54:                    for (int i = instanceReferenceList.Count - 1; i > -1; i--)
      55:                    {
      56:                        if (instanceReferenceList[i].Target == null)
      57:                        {
      58:                            instanceReferenceList.RemoveAt(i); 
      59:                        }
      60:                    } 
      61:  
      62:                }
      63:            }
      64:        } 
      65:  
      66:         public static void ReleaseInstanceToPool(IPooledObject instance)
      67:         {
      68:             instance.IsBusy = false;
      69:         }       
      70:     }
      71: } 
      72:  

    PooledInstanceLocator实现了3基于对象池的功能:从对象池中获取对象(GetInstanceFromPool);将对象释放到池中(ReleaseInstanceToPool);清理被GC回收的weak reference(Scavenge)。代码很简单,我就不再一一介绍了。有一点需要注意的,由于PooledInstanceLocator工作在一个多线程环境下,保证线程的同步时最重要的。在本例中为了简便,我直接对service type对象进行枷锁,由于本例比较简单,不会引起什么问题。在实际的项目开发中,如何对Type对象进行加锁就需要三思了,因为type对象一个全局对象(可以参考的我的文章:What is type in managed heap),对其加锁很容易引起死锁。

    IV、自定义InstanceProvider:PooledInstanceProvider

    有了PooledInstanceLocator,我们的InstanceProvider就显得很简单了:

       1: namespace Artech.WCFExtensions
       2: {
       3:     class PooledInstanceProvider : IInstanceProvider
       4:     {
       5:  
       6:         public object GetInstance(InstanceContext instanceContext, Message message)
       7:         {
       8:             return PooledInstanceLocator.GetInstanceFromPool(instanceContext.Host.Description.ServiceType);
       9:         }
      10:  
      11:         public object GetInstance(InstanceContext instanceContext)
      12:         {
      13:             return this.GetInstance(instanceContext, null);
      14:         }
      15:  
      16:         public void ReleaseInstance(InstanceContext instanceContext, object instance)
      17:         {
      18:             PooledInstanceLocator.ReleaseInstanceToPool(instance as IPooledObject);
      19:         }
      20:     }
      21: }

    在GetInstance和ReleaseInstance,直接调用调用PooledInstanceLocator的GetInstanceFromPool和ReleaseInstanceToPool。

    V、为自定义InstanceProvider定义Behavior

    对于使用自定义InstanceProvider,我们一般可以通过Contract Behavior和Endpoint Behavior来实现,由于定义Behavior不是本篇文章的重点,在这里我仅仅通过Contract Behavior进行扩展这一种方式。

       1: namespace Artech.WCFExtensions
       2: {
       3:     public class PooledInstanceBehaviorAttribute : Attribute, IContractBehavior, IContractBehaviorAttribute
       4:     {
       5:         public void AddBindingParameters(ContractDescription contractDescription, ServiceEndpoint endpoint, BindingParameterCollection bindingParameters){ }
       6:         public void ApplyClientBehavior(ContractDescription contractDescription, ServiceEndpoint endpoint, ClientRuntime clientRuntime){ }
       7:         public void ApplyDispatchBehavior(ContractDescription contractDescription, ServiceEndpoint endpoint, DispatchRuntime dispatchRuntime)
       8:         {
       9:             dispatchRuntime.InstanceProvider = new PooledInstanceProvider();
      10:         }
      11:         public void Validate(ContractDescription contractDescription, ServiceEndpoint endpoint){ }
      12:         public Type TargetContract
      13:         {
      14:             get { return null; }
      15:         }
      16:     }
      17: } 

    再ApplyDispatchBehavior中将DispatchRuntime 的InstanceProvider 设置成我们定义的PooledInstanceProvider就可以了。

    三、将PooledInstanceProvider应用到WCF应用中

    现在我们就创建一个简单的WCF应用将看看我们自定义的InstanceProvider能给我们带来什么。我们照例创建如下图一样的4层结构:

    image

    I、Contract:Contracts.IService

       1: namespace Contracts
       2: {
       3:     [ServiceContract]
       4:     [PooledInstanceBehavior]
       5:     public interface IService : IPooledObject
       6:     {
       7:         [OperationContract(IsOneWay =true)]
       8:         void DoSomething();
       9:     }
      10: } 

    通过custom attribute的方式将PooledInstanceBehaviorAttribute应用到service conntract。Iservice继承我们定义的IPooledObject interface。

    II、Service:Services.Service

       1: namespace Services
       2: {
       3:     [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
       4:     public class Service : IService
       5:     {
       6:         static int count;
       7:         public Service()
       8:         {
       9:             Interlocked.Increment(ref count);
      10:             Console.WriteLine("{0}: Service instance is constructed!", count);
      11:         }
      12:         public void DoSomething(){ }
      13:         public bool IsBusy
      14:         { get; set; }
      15:     }
      16: } 

    我们应用PerCall InstanceContextMode。由于我们需要检测的是service instance的创建,所以我们通过下面的代码判断service instance创建的次数。

       1: public Service()
       2: {
       3:       Interlocked.Increment(ref count);
       4:       Console.WriteLine("{0}: Service instance is constructed!", count);
       5: } 

    III、Hosting

       1: namespace Hosting
       2: {
       3:     class Program
       4:     {
       5:         static Timer ScavengingTimer;
       6:  
       7:         static void Main(string[] args)
       8:         {
       9:             using (ServiceHost host = new ServiceHost(typeof(Service)))
      10:             {
      11:                 host.Opened += delegate
      12:                 {
      13:                     Console.WriteLine("Service has been started up!");
      14:                 }; 
      15:  
      16:                 host.Open(); 
      17:  
      18:                 ScavengingTimer = new Timer(delegate
      19:                     {
      20:                         PooledInstanceLocator.Scavenge();
      21:                     }, null, 0, 5000);                 
      22:  
      23:                 Console.Read(); 
      24:  
      25:             }
      26:         }
      27:     }
      28: } 
      29:  

    除了对service进行Host之外,Main()方法还通过一个Timer对象实现对对象池的清理工作(调用PooledInstanceLocator.Scavenge();),时间间隔是5s。
    下面是configuration:

       1: <?xml version="1.0" encoding="utf-8" ?>
       2: <configuration>    
       3:     <system.serviceModel>
       4:         <services>
       5:             <service name="Services.Service">
       6:                 <endpoint binding="basicHttpBinding" contract="Contracts.IService" />
       7:                 <host>
       8:                     <baseAddresses>
       9:                         <add baseAddress="http://127.0.0.1/service" />
      10:                     </baseAddresses>
      11:                 </host>
      12:             </service>
      13:         </services>
      14:     </system.serviceModel>
      15: </configuration> 

    IV、Client:Clients.

       1: namespace Clients
       2: {
       3:     class Program
       4:     {
       5:         static void Main(string[] args)
       6:         {
       7:             using (ChannelFactory<IService> channelFactory = new ChannelFactory<IService>("service"))
       8:             {
       9:                 for (int i = 1; i <= 10; i++)
      10:                 {
      11:                     Console.WriteLine("{0}: invocate service!", i);
      12:                     channelFactory.CreateChannel().DoSomething();
      13:                     Thread.Sleep(1000);
      14:                 }
      15:             } 
      16:  
      17:             Console.Read();
      18:         }
      19:     }
      20: } 

    在上面的代码中,我们通过for循环进行了10次service调用。每次间隔1s.我们看看运行的结果,这是client端的运行结果:

    image

    这是service端的结果:

    image

    可见service instance只创建了一次。因为方法执行太快,方法结束后service instance马上释放到对象池中,后续的调用一直使用的是同一个service instance。

    然后我们把IService 的PooledInstanceBehavior注释掉。

       1: namespace Contracts
       2: {
       3:     [ServiceContract]
       4:     //[PooledInstanceBehavior]
       5:     public interface IService : IPooledObject
       6:     {
       7:         [OperationContract(IsOneWay =true)]
       8:         void DoSomething();
       9:     }
      10: } 

    再次运行程序,service端将会得到下面的输出结果:

    image

    可见在没有运用PooledInstanceBehavior情况下,service instance的创建真正使“PerCall”。我们将PooledInstanceBehavior重新加上,然后通过在DoSomething方法中加上下面的代码延长该方法执行的时间:

       1: namespace Services
       2: {    
       3:     [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
       4:     public class Service:IService
       5:     { 
       6:         public void DoSomething()
       7:         {
       8:             Thread.Sleep(2000);
       9:         } 
      10:     }
      11: } 

    再次运行程序,service端的运行结果如下图所示

    image 
    由于我们将DoSomething方法的执行延长至2s,在这种情况下,由于client端的service掉用的间隔是1s,所有当第二次service调用抵达之后,第一次创建的service instance还没有被释放,所以需要重新创建新的service instance。当第三次service调用时,第一个service instance已经释放,以此类推,永远只需要两个service instance。这和上面的结果一致。

    上面的运行结果都是在GC没有进行垃圾回收的情况下的运行结果,如何GC参与了又会有怎样的行为表现呢?在Hosting中,我们通过另一个Timer定期地进行垃圾回收(间隔为500ms):

       1: namespace Hosting
       2: {
       3:     class Program
       4:     {
       5:         static Timer ScavengingTimer;
       6:         static Timer GCTimer;
       7:         static void Main(string[] args)
       8:         {
       9:             using (ServiceHost host = new ServiceHost(typeof(Service)))
      10:             {
      11:                 host.Opened += delegate
      12:                 {
      13:                     Console.WriteLine("Service has been started up!");
      14:                 }; 
      15:  
      16:                 host.Open(); 
      17:  
      18:                 ScavengingTimer = new Timer(delegate
      19:                     {
      20:                         PooledInstanceLocator.Scavenge();
      21:                     }, null, 0, 5000); 
      22:  
      23:                 GCTimer = new Timer(delegate
      24:                     {
      25:                         GC.Collect();
      26:                     }, null, 0, 500); 
      27:  
      28:                 Console.Read(); 
      29:  
      30:             }
      31:         }
      32:     }
      33: } 
      34:  

    然后我们将serivice的DoSomething()操作执行时间缩短(比client调用service的间隔短:500ms),使得操作执行完毕后,还没有新的请求抵达,这样GC会将其垃圾回收。

       1: namespace Services
       2: {    
       3:     [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
       4:     public class Service:IService
       5:     { 
       6:  
       7:         #region IService Members 
       8:  
       9:         public void DoSomething()
      10:         {
      11:             Thread.Sleep(500);
      12:         } 
      13:  
      14:         #endregion
      15:     }
      16: } 

    那么现在的输出结果将会是这样:

    image

    WCF后续之旅:

    WCF后续之旅(1): WCF是如何通过Binding进行通信的
    WCF后续之旅(2): 如何对Channel Layer进行扩展——创建自定义Channel
    WCF后续之旅(3): WCF Service Mode Layer 的中枢—Dispatcher
    WCF后续之旅(4):WCF Extension Point 概览
    WCF后续之旅(5): 通过WCF Extension实现Localization
    WCF后续之旅(6): 通过WCF Extension实现Context信息的传递
    WCF后续之旅(7):通过WCF Extension实现和Enterprise Library Unity Container的集成
    WCF后续之旅(8):通过WCF Extension 实现与MS Enterprise Library Policy Injection Application Block 的集成
    WCF后续之旅(9):通过WCF的双向通信实现Session管理[Part I]
    WCF后续之旅(9): 通过WCF双向通信实现Session管理[Part II]
    WCF后续之旅(10): 通过WCF Extension实现以对象池的方式创建Service Instance
    WCF后续之旅(11): 关于并发、回调的线程关联性(Thread Affinity)
    WCF后续之旅(12): 线程关联性(Thread Affinity)对WCF并发访问的影响
    WCF后续之旅(13): 创建一个简单的WCF SOAP Message拦截、转发工具[上篇]
    WCF后续之旅(13):创建一个简单的SOAP Message拦截、转发工具[下篇]
    WCF后续之旅(14):TCP端口共享
    WCF后续之旅(15): 逻辑地址和物理地址
    WCF后续之旅(16): 消息是如何分发到Endpoint的--消息筛选(Message Filter)
    WCF后续之旅(17):通过tcpTracer进行消息的路由

  • 相关阅读:
    nsight system
    unity 拿到管线权限的解决方案
    Erlang cowboy 入门参考
    [Erlang]Mnesia分布式应用
    erlang浅谈
    erlang 中带下划线变量的使用
    erlang 符号相关基本语法
    Erlang与ActionScript3采用JSON格式进行Socket通讯
    Erlang游戏开发-协议
    Erlang之IO编程
  • 原文地址:https://www.cnblogs.com/artech/p/1260594.html
Copyright © 2011-2022 走看看