zoukankan      html  css  js  c++  java
  • 缓存方案 通过SqlDependency实现Cache和Database的同步

    对于一个真正的企业级的应用来说,Caching肯定是一个不得不考虑的因素,合理、有效地利用Caching对于增强应用的Performance(减少对基于Persistent storage的IO操作)、Scalability(将数据进行缓存,减轻了对Database等资源的压力)和Availability(将数据进行缓存,可以应对一定时间内的网络问题、Web Service不可访问问题、Database的崩溃问题等等)。Enterprise Library的Caching Application Block为我们提供了一个易用的、可扩展的实现Caching的框架。借助于Caching Application Block,Administrator和Developer很容易实现基于Caching的管理和编程。由于Caching的本质在于将相对稳定的数据常驻内存,以避免对Persistent storage的IO操作的IO操作,所以有两个棘手的问题:Load Balance问题;Persistent storage和内存中数据同步的问题。本篇文章提供了一个解决方案通过SqlDependency实现SQL Server中的数据和Cache同步的问题。

    一、Cache Item的过期策略

    在默认的情况下,通过CAB(以下对Caching Application Block的简称,注意不是Composite UI Application Block )的CacheManager加入的cache item是永不过期的;但是CacheManager允许你在添加cache item的时候通过一个ICacheItemExpiration对象应用不同的过期策略。CAB定了一个以下一些class实现了ICacheItemExpiration,以提供不同的过期方式:

    • AbsoluteTime:为cache item设置一个cache item的绝对过期时间。
    • ExtendedFormatTime:通过一个表达式实现这样的过期策略:每分钟过期(*****:5个*分别代表minute、hour、date、month、year);每个小时的第5分钟过期(5****);每个月的2号零点零分过期(0 0 2 * *)。
    • FileDependency:将cache item和一个file进行绑定,通过检测file的最后更新时间确定file自cache item被添加以来是否进行过更新。如果file已经更新则cache item过期。
    • NeverExpired:永不过期。
    • SlidingTime:一个滑动的时间,cache item的每次获取都将生命周期延长到设定的时间端,当cache item最后一次获取的时间算起,超出设定的时间,则cache item过期。

    对于过期的cache item,会及时地被清理。所以要实现我们开篇提出的要求:实现Sql Server中的数据和Cache中的数据实现同步,我们可以通过创建基于Sql Server数据变化的cache item的过期策略。换句话说,和FileDependency,当Persistent storage(Database)的数据变化本检测到之后,对于得cache自动过期。但是,对于文件的修改和删除,我们和容易通过文件的最后更新日期或者是否存在来确定。对于Database中Table数据的变化的探测就不是那么简单了。不过SQL Server提供了一个SqlDependency的组建帮助我们很容易地实现了这样的功能。

    二、创建基于SqlDependency的ICacheItemExpiration

    SqlDependency是建立在SQL Server 2005的Service Broker之上。SqlDependency向SQL Server订阅一个Query Notification。当SQL Server检测到基于该Query的数据发生变化,向SqlDependency发送一个Notification,并触发SqlDependency的Changed事件,我们就可以通过改事件判断对应的cache item是否应该过期。

    我们现在就来创建这样的一个ICacheItemExpiration。我们先看看ICacheItemExpiration的的定义:

       1: public interface ICacheItemExpiration
       2: {
       3:     // Methods
       4:     bool HasExpired();
       5:     void Initialize(CacheItem owningCacheItem);
       6:     void Notify();
       7: } 

    而判断过期的依据就是根据HasExpired方法,我们自定义的CacheItemExpiration就是实现了该方法,根据SqlDependency判断cache item是否过期。下面是SqlDependencyExpiration的定义(注:SqlDependencyExpiration的实现通过Enterprise Library DAAB实现DA操作):

       1: namespace Artech.SqlDependencyCaching
       2: {
       3:     public class SqlDependencyExpiration : ICacheItemExpiration
       4:     {
       5:         private static readonly CommandType DefaultComamndType = CommandType.StoredProcedure; 
       6:  
       7:         public event EventHandler Expired; 
       8:  
       9:         public bool HasChanged
      10:         { get; set; } 
      11:  
      12:         public string ConnectionName
      13:         { get; set; } 
      14:  
      15:         public SqlDependencyExpiration(string commandText, IDictionary<string, object> parameters) :
      16:             this(commandText, DefaultComamndType, string.Empty, parameters)
      17:         { } 
      18:  
      19:         public SqlDependencyExpiration(string commandText, string connectionStringName, IDictionary<string, object> parameters) :
      20:             this(commandText, DefaultComamndType, connectionStringName, parameters)
      21:         { } 
      22:  
      23:         public SqlDependencyExpiration(string commandText, CommandType commandType, IDictionary<string, object> parameters) :
      24:             this(commandText, commandType, string.Empty, parameters)
      25:         { } 
      26:  
      27:         public SqlDependencyExpiration(string commandText, CommandType commandType, string connectionStringName, IDictionary<string, object> parameters)
      28:         {
      29:             if (string.IsNullOrEmpty(connectionStringName))
      30:             {
      31:                 this.ConnectionName = DatabaseSettings.GetDatabaseSettings(ConfigurationSourceFactory.Create()).DefaultDatabase;
      32:             }
      33:             else
      34:             {
      35:                 this.ConnectionName = connectionStringName;
      36:             } 
      37:  
      38:             SqlDependency.Start(ConfigurationManager.ConnectionStrings[this.ConnectionName].ConnectionString);
      39:             using (SqlConnection sqlConnection = DatabaseFactory.CreateDatabase(this.ConnectionName).CreateConnection() as SqlConnection)
      40:             {
      41:                 SqlCommand command = new SqlCommand(commandText, sqlConnection);
      42:                 command.CommandType = commandType;
      43:                 if (parameters != null)
      44:                 {
      45:                     this.AddParameters(command, parameters);
      46:                 }
      47:              SqlDependency dependency = new SqlDependency(command);
      48:                 dependency.OnChange += delegate
      49:                 {
      50:                     this.HasChanged = true;
      51:                     if (this.Expired != null)
      52:                     {
      53:                         this.Expired(this, new EventArgs());
      54:                     }
      55:                 };
      56:                 if (sqlConnection.State != ConnectionState.Open)
      57:                 {
      58:                     sqlConnection.Open();
      59:                 }
      60:                 command.ExecuteNonQuery();
      61:             }
      62:         } 
      63:  
      64:         private void AddParameters(SqlCommand command, IDictionary<string, object> parameters)
      65:         {
      66:             command.Parameters.Clear();
      67:             foreach (var parameter in parameters)
      68:             {
      69:                 string parameterName = parameter.Key;
      70:                 if (!parameter.Key.StartsWith("@"))
      71:                 {
      72:                     parameterName = "@" + parameterName;
      73:                 } 
      74:  
      75:                 command.Parameters.Add(new SqlParameter(parameterName, parameter.Value));
      76:             }
      77:         } 
      78:  
      79:         #region ICacheItemExpiration Members 
      80:  
      81:         public bool HasExpired()
      82:         {
      83:             bool indicator = this.HasChanged;
      84:             this.HasChanged = false;
      85:             return indicator;
      86:         } 
      87:  
      88:         public void Initialize(CacheItem owningCacheItem)
      89:         {         } 
      90:  
      91:         public void Notify()
      92:         {         } 
      93:  
      94:         #endregion
      95:     }
      96: } 
      97:  

    我们来简单分析一下实现过程,先看看Property定义:

       1: private static readonly CommandType DefaultComamndType = CommandType.StoredProcedure; 
       2:  
       3: public event EventHandler Expired; 
       4:  
       5: public bool HasChanged
       6: { get; set; } 
       7:  
       8: public string ConnectionName
       9: { get; set; } 
      10:  

    通过DefaultComamndType 定义了默认的CommandType,在这了我默认使用Stored Procedure;Expired event将在cache item过期时触发;HasChanged代表Database的数据是否被更新,将作为cache过期的依据;ConnectionName代表的是Connection string的名称。

    为了使用上的方便,我定义了4个重载的构造函数,最后的实现定义在public SqlDependencyExpiration(string commandText, CommandType commandType, string connectionStringName, IDictionary<string, object> parameters)。parameters代表commandText的参数列表,key为参数名称,value为参数的值。首先获得真正的connection string name(如果参数connectionStringName为空,就使用DAAB默认的connection string)

       1: if (string.IsNullOrEmpty(connectionStringName))
       2: {
       3:      this.ConnectionName = DatabaseSettings.GetDatabaseSettings(ConfigurationSourceFactory.Create()).DefaultDatabase;
       4: }
       5: else
       6: {
       7:      this.ConnectionName = connectionStringName;
       8: } 

    然后通过调用SqlDependency.Start()方法,并传入connection string作为参数。该方法将创建一个Listener用于监听connection string代表的database instance发送过来的query notifucation。

    SqlDependency.Start(ConfigurationManager.ConnectionStrings[this.ConnectionName].ConnectionString);

    然后创建SqlConnection,并根据CommandText和CommandType参数创建SqlCommand对象,并将参数加入到command的参数列表中。最后将这个SqlCommand对象作为参数创建SqlDependency 对象,并注册该对象的OnChange 事件(对HasChanged 赋值;并触发Expired事件)。这样当我们执行该Cmmand之后,当基于commandtext的select sql语句获取的数据在database中发生变化(添加、更新和删除),SqlDependency 的OnChange 将会触发

       1: SqlDependency dependency = new SqlDependency(command);
       2: dependency.OnChange += delegate
       3: {
       4:       this.HasChanged = true;
       5:        if (this.Expired != null)
       6:        {
       7:               this.Expired(this, new EventArgs());
       8:  
       9:        }
      10:          
      11: };
      12:  

    这样在HasExpired方法中,就可以根据HasChanged 属性判断cache item是否应该过期了。

       1: public bool HasExpired()
       2: {
       3:      bool indicator = this.HasChanged;
       4:      this.HasChanged = false;
       5:      return indicator;
       6: } 

    三、如何应用SqlDependencyExpiration

    我们现在创建一个简单的Windows Application来模拟使用我们创建的SqlDependencyExpiration。我们模拟一个简单的场景:假设我们有一个功能需要向系统所有的user发送通知,而且不同的user,通知是不一样的,由于通知的更新的频率不是很高,我们需要讲某个User的通知进行缓存。

    这是我们的表结构:Messages

    image

    我们通过下面的SP来获取基于某个User 的Message:

       1: ALTER PROCEDURE [dbo].[Message_Select_By_User]
       2: (@UserID    VarChar(50))
       3: AS
       4: BEGIN    
       5:     Select ID, UserID, [Message] From dbo.Messages Where UserID = @UserID
       6: END

    注:如何写成Select * From dbo.Messages Where UserID = @UserID, SqlDependency 将不能正常运行;同时Table的schema(dbo)也是必须的。

    我们设计如下的界面来模拟:通过Add按钮,可以为选择的User创建新的Message,而下面的List将显示基于某个User(Foo)的Message List。该列表的获取方式基于Lazy Loading的方式,如果在Cache中,则直接从Cache中获取,否则从Db中获取,并将获取的数据加入cache。

    image

    我们先定义了3个常量,分别表示:缓存message针对的User,获取Message list的stored procedure名称和Cache item的key。

       1: private const string UserName = "Foo";
       2: private const string MessageCachingProcedure = "Message_Select_By_User";
       3: private const string CacheKey = "__MessageOfFoo"; 

    我们通过一个Property来创建或获取我们的上面定义的SqlDependencyExpiration 对象

       1: private SqlDependencyExpiration CacheItemExpiration
       2: {
       3:     get
       4:     {
       5:         IDictionary<string, object> parameters = new Dictionary<string, object>();
       6:         parameters.Add("UserID", UserName);
       7:         SqlDependencyExpiration expiration= new SqlDependencyExpiration(MessageCachingProcedure, parameters);
       8:         expiration.Expired += delegate
       9:         {
      10:             MessageBox.Show("Cache has expired!");
      11:         }; 
      12:  
      13:         return expiration;
      14:     }
      15: } 
      16:  

    通过GetMessageByUser从数据库中获取基于某个User的Message List(使用了DAAB):

       1: private List<string> GetMessageByUser(string userName)
       2: {
       3:     List<string> messageList = new List<string>();
       4:     Database db = DatabaseFactory.CreateDatabase();
       5:     DbCommand command = db.GetStoredProcCommand(MessageCachingProcedure);
       6:     db.AddInParameter(command, "UserID", DbType.String, userName);
       7:     IDataReader reader = db.ExecuteReader(command);
       8:     while (reader.Read())
       9:     {
      10:         messageList.Add(reader.GetString(2));
      11:     } 
      12:  
      13:     return messageList;
      14: } 

    通过GetMessages获取User(Foo)的Message List:首先通过CacheManager检测message list是否存在于Cache,如何不存在,调用上面的GetMessageByUser方法从database中获取Foo的message list。并将其加入Cache中,需要注意的是这里使用到了我们的SqlDependencyExpiration 对象。

       1: private List<string> GetMessages()
       2: {
       3:     ICacheManager manager = CacheFactory.GetCacheManager();
       4:     if (manager.GetData(CacheKey) == null)
       5:     { 
       6:         manager.Add(CacheKey, GetMessageByUser(UserName), CacheItemPriority.Normal, null, this.CacheItemExpiration);
       7:     } 
       8:  
       9:     return manager.GetData(CacheKey) as List<string>;
      10: } 

    由于在我们的例子中需要对DB进行数据操作,来检测数据的变换是否应用Cache的过期,我们需要想数据库中添加Message。我们通过下面的方式现在message的添加。

       1: private void CreateMessageEntry(string userName, string message)
       2: {
       3:     Database db = DatabaseFactory.CreateDatabase();
       4:     string insertSql = "INSERT INTO [dbo].[Messages]([UserID],[Message])VALUES(@userID, @message)";
       5:     DbCommand command = db.GetSqlStringCommand(insertSql);
       6:     db.AddInParameter(command, "userID", DbType.String, userName);
       7:     db.AddInParameter(command, "message", DbType.String, message);
       8:     db.ExecuteNonQuery(command);
       9: } 

    我们的Add按钮的实现如下:基于我们选择的Username和输入的message的内容向DB中添加Message,然后调用GetMessages()方法获取基于用户Foo的Message列表。之所以要在两者之间将线程休眠1s,是为了上SqlDependency有足够的时间结果从Database传过来的Query Notification,并触发OnChanged事件并执行相应的Event Handler,这样调用GetMessages时检测Cache才能检测到cache item已经过期了。

       1: private void buttonAdd_Click(object sender, EventArgs e)
       2: {
       3:     this.CreateMessageEntry(this.comboBoxUserName.SelectedValue.ToString(), this.textBoxMessage.Text.Trim());
       4:     Thread.Sleep(1000);
       5:     this.listBoxMessage.DataSource = this.GetMessages();
       6: } 

    由于我们缓存了用户Foo的Message list,所以当我们为Foo创建Message的时候,下面的ListBox的列表能够及时更新,这表明我们的cache item已经过期了。而我们为其他的用户(Bar,Baz)创建Message的时候,cache item将不会过期,这一点我们可以通过弹出的MessageBox探测掉(expiration.Expired += delegate           MessageBox.Show("Cache has expired!");}; ),只有前者才会弹出下面的MessageBox:

    image

    注:由于SqlDependency建立在Service Broker之上的,所以我们必须将service Broker开关打开(默认使关闭的)。否则我们将出现下面的错误:

    image

    打开service Broker可以通过如下的T-SQL:

       1: ALTER DATABASE MyDb SET ENABLE_BROKER ;
  • 相关阅读:
    Leetcode Binary Tree Preorder Traversal
    Leetcode Minimum Depth of Binary Tree
    Leetcode 148. Sort List
    Leetcode 61. Rotate List
    Leetcode 86. Partition List
    Leetcode 21. Merge Two Sorted Lists
    Leetcode 143. Reorder List
    J2EE项目应用开发过程中的易错点
    JNDI初认识
    奔腾的代码
  • 原文地址:https://www.cnblogs.com/lhxsoft/p/6671969.html
Copyright © 2011-2022 走看看