zoukankan      html  css  js  c++  java
  • 坚持学习WF(14):自定义持久化服务

    [置顶]坚持学习WF文章索引

    我们除了使用WF提供的SqlWorkflowPersistenceService外,还可以自定义持久化服务。因为有的时候你可能不想使用Sql Server数据库,我们就可以通过自定义持久化服务来使用其他的数据库,文件等来进行持久化存储。

    一:1.1 我们先看一个MSDN中的例子,当从内存中卸载工作流时,工作流运行时可使用该服务将工作流实例状态保存到文件。该持久服务类代码如下FilePersistence.cs:
        public class FilePersistenceService : WorkflowPersistenceService
        {
            public readonly static TimeSpan MaxInterval = new TimeSpan(30, 0, 0, 0);

            private bool unloadOnIdle = false;
            private Dictionary<Guid,Timer> instanceTimers;

            public FilePersistenceService(bool unloadOnIdle)
            {
                this.unloadOnIdle = unloadOnIdle;
                this.instanceTimers = new Dictionary<Guid, Timer>();
            }

            protected override void SaveWorkflowInstanceState(Activity rootActivity, bool unlock)
            {
                // Save the workflow
                Guid contextGuid = (Guid)rootActivity.GetValue(Activity.ActivityContextGuidProperty);
                Console.WriteLine("Saving instance: {0}\n", contextGuid);
                SerializeToFile( WorkflowPersistenceService.GetDefaultSerializedForm(rootActivity), contextGuid);

                // See when the next timer (Delay activity) for this workflow will expire
                TimerEventSubscriptionCollection timers = (TimerEventSubscriptionCollection)rootActivity.GetValue(TimerEventSubscriptionCollection.TimerCollectionProperty);
                TimerEventSubscription subscription = timers.Peek();
                if (subscription != null)
                {
                    // Set a system timer to automatically reload this workflow when its next timer expires
                    TimerCallback callback = new TimerCallback(ReloadWorkflow);
                    TimeSpan timeDifference = subscription.ExpiresAt - DateTime.UtcNow;
                    // check to make sure timeDifference is in legal range
                    if (timeDifference > FilePersistenceService.MaxInterval)
                    {
                        timeDifference = FilePersistenceService.MaxInterval;
                    }
                    else if (timeDifference < TimeSpan.Zero)
                    {
                        timeDifference = TimeSpan.Zero;
                    }
                    this.instanceTimers.Add(contextGuid, new System.Threading.Timer(
                        callback,
                        subscription.WorkflowInstanceId,
                        timeDifference,
                        new TimeSpan(-1)));
                }
            }

            private void ReloadWorkflow(object id)
            {
                // Reload the workflow so that it will continue processing
                Timer toDispose;
                if (this.instanceTimers.TryGetValue((Guid)id, out toDispose))
                {
                    this.instanceTimers.Remove((Guid)id);
                    toDispose.Dispose();
                }
                this.Runtime.GetWorkflow((Guid)id);
            }

            // Load workflow instance state.
            protected override Activity LoadWorkflowInstanceState(Guid instanceId)
            {
                Console.WriteLine("Loading instance: {0}\n", instanceId);
                byte[] workflowBytes = DeserializeFromFile(instanceId);
                return WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes, null);
            }

            // Unlock the workflow instance state.
            // Instance state locking is necessary when multiple runtimes share instance persistence store
            protected override void UnlockWorkflowInstanceState(Activity state)
            {
                //File locking is not supported in this sample
            }

            // Save the completed activity state.
            protected override void SaveCompletedContextActivity(Activity activity)
            {
                Guid contextGuid = (Guid)activity.GetValue(Activity.ActivityContextGuidProperty);
                Console.WriteLine("Saving completed activity context: {0}", contextGuid);
                SerializeToFile(
                    WorkflowPersistenceService.GetDefaultSerializedForm(activity), contextGuid);
            }

            // Load the completed activity state.
            protected override Activity LoadCompletedContextActivity(Guid activityId, Activity outerActivity)
            {
                Console.WriteLine("Loading completed activity context: {0}", activityId);
                byte[] workflowBytes = DeserializeFromFile(activityId);
                Activity deserializedActivities = WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes, outerActivity);
                return deserializedActivities;

            }

            protected override bool UnloadOnIdle(Activity activity)
            {
                return unloadOnIdle;
            }
            // Serialize the activity instance state to file
            private void SerializeToFile(byte[] workflowBytes, Guid id)
            {
                String filename = id.ToString();
                FileStream fileStream = null;
                try
                {
                    if (File.Exists(filename))
                        File.Delete(filename);

                    fileStream = new FileStream(filename, FileMode.CreateNew, FileAccess.Write, FileShare.None);

                    // Get the serialized form
                    fileStream.Write(workflowBytes, 0, workflowBytes.Length);
                }
                finally
                {
                    if (fileStream != null)
                        fileStream.Close();
                }
            }
            // Deserialize the instance state from the file given the instance id
            private byte[] DeserializeFromFile(Guid id)
            {
                String filename = id.ToString();
                FileStream fileStream = null;
                try
                {
                    // File opened for shared reads but no writes by anyone
                    fileStream = new FileStream(filename, FileMode.Open, FileAccess.Read, FileShare.Read);
                    fileStream.Seek(0, SeekOrigin.Begin);
                    byte[] workflowBytes = new byte[fileStream.Length];

                    // Get the serialized form
                    fileStream.Read(workflowBytes, 0, workflowBytes.Length);

                    return workflowBytes;
                }
                finally
                {
                    fileStream.Close();
                }
            }
        }

    1.2 看看我们的工作流设计,只需要放入一个DelayActivity即可并设置他的Timeout时间,如下图:

    CustomPersistenceService2

    1.3 宿主程序加载了我们自定义的持久化服务后,执行结果如下:

    CustomPersistenceService1

    二:上面的例子其实很简单,我们只是做了一些基本的操作,还有很多工作没有做。我们就来说说如何自定义持久化服务。自定义持久化服务大概有以下几步:

    1.定义自己的持久化类FilePersistenceService ,必须继承自WorkflowPersistenceService.
    2.实现WorkflowPersistenceService中所有的抽象方法,下面会具体介绍。
    3.把自定义的持久化服务装载到工作流引擎中(和装载WF提供的标准服务的方式一样)。

    下面我们来介绍下WorkflowPersistenceService类中相关的抽象方法:

    2.1 SaveWorkflowInstanceState:将工作流实例状态保存到数据存储区。

    protected internal abstract void SaveWorkflowInstanceState(Activity rootActivity,bool unlock)
    rootActivity:工作流实例的根 Activity。
    unlock:如果工作流实例不应锁定,则为 true;如果工作流实例应该锁定,则为 false。 关于锁方面的我们暂时不提。
     
    在这个方法中我们会把rootActivity 序列化到 Stream 中,可以看我们上面的例子中的代码实现的该方法中有这样一段
     
    SerializeToFile( WorkflowPersistenceService.GetDefaultSerializedForm(rootActivity), contextGuid);
    private void SerializeToFile(byte[] workflowBytes, Guid id)
     
    SerializeToFile方法的第一个参数是需要一个byte[]的数组,我们是使用WorkflowPersistenceService.GetDefaultSerializedForm
    (rootActivity)来实现的,那我们Reflector一下WorkflowPersistenceService这个类中的GetDefaultSerializedForm
    方法,代码如下:
     protected static byte[] GetDefaultSerializedForm(Activity activity)
        {
            DateTime now = DateTime.Now;
            using (MemoryStream stream = new MemoryStream(0x2800))
            {
                stream.Position = 0L;
                activity.Save(stream);
                using (MemoryStream stream2 = new MemoryStream((int) stream.Length))
                {
                    using (GZipStream stream3 = new GZipStream(stream2, CompressionMode.Compress,
    true)) { stream3.Write(stream.GetBuffer(), 0, (int) stream.Length); } ActivityExecutionContextInfo info = (ActivityExecutionContextInfo)
    activity.GetValue(Activity.ActivityExecutionContextInfoProperty); TimeSpan span = (TimeSpan) (DateTime.Now - now); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Serialized a
    {0} with id {1} to length {2}. Took {3}.
    ", new object[] { info, info.ContextGuid,
    stream2.Length, span }); byte[] array = stream2.GetBuffer(); Array.Resize<byte>(ref array, Convert.ToInt32(stream2.Length)); return array; } } }

    可以看出主要是调用了activity.Save(stream); ,将rootActivity 序列化到 Stream 中,如果我们自定义这个方法我们要调用 Activity的save方法将Activity序列化到stream中去,我们在实现LoadWorkflowInstanceState方法时会调用Activity的Load方法来读取,另外我们把工作流保存到持久化存储里我们一般都使用WorkflowInstanceId来做为唯一性标识

    当工作流实例完成或终止时,工作流运行时引擎最后一次调用 SaveWorkflowInstanceState。因此,如果 GetWorkflowStatus等于 Completed或 Terminated,则可以从数据存储区中安全地删除工作流实例及其所有关联的已完成作用域。此外,可以订阅 WorkflowCompleted或 WorkflowTerminated事件,确定何时可以安全地删除与工作流实例关联的记录。是否确实从数据存储区中删除记录取决于您的实现。

    如果无法将工作流实例状态保存到数据存储区,则应引发带有适当错误消息的 PersistenceException。

     2.2 LoadWorkflowInstanceState :SaveWorkflowInstanceState中保存的工作流实例的指定状态加载回内存

    protected internal abstract Activity LoadWorkflowInstanceState(Guid instanceId)
    instanceId:工作流实例的根活动的 Guid。

    必须还原活动的相同副本。为此,必须从数据存储区中工作流实例的表示形式中还原有效的 Stream;然后,必须将此 Stream 传递到重载的 Load 方法之一,用于反序列化工作流实例。如果持久性服务无法从其数据存储区加载工作流实例状态,则它应引发带有适当消息的 PersistenceException。

    在我们上面例子中实现的该方法中,

    protected override Activity LoadWorkflowInstanceState(Guid instanceId) 
    { 
        Console.WriteLine("Loading instance: {0}\n", instanceId); 
        byte[] workflowBytes = DeserializeFromFile(instanceId); 
        return WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes, null); 
    }

    我们调用了WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes, null);方法
    我们Reflector出WorkflowPersistenceService类的代码后可以看到,如下代码:

    protected static Activity RestoreFromDefaultSerializedForm(byte[] activityBytes, Activity outerActivity)
        {
            Activity activity;
            DateTime now = DateTime.Now;
            MemoryStream stream = new MemoryStream(activityBytes);
            stream.Position = 0L;
            using (GZipStream stream2 = new GZipStream(stream, CompressionMode.Decompress, true))
            {
                activity = Activity.Load(stream2, outerActivity);
            }
            TimeSpan span = (TimeSpan) (DateTime.Now - now);
            WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Deserialized a {0}
                                                        to length {1}. Took {2}.
    ", new object[] { activity, stream.Length, span });
            return activity;
        }

    这段代码核心的调用了Activity.Load方法将从 Stream 加载 Activity的实例。
    2.3 SaveCompletedContextActivity
    protected internal abstract void SaveCompletedContextActivity(Activity activity)
    activity:表示已完成范围的 Activity。
     

    将指定的已完成作用域保存到数据存储区。保存完成活动的AEC环境以便实现补偿,比如WhileActivity他每次循环的
    都会创建新的AEC环境,这个时候完成的活动的AEC就会被保存,但是前提是这个活动要支持补偿才可以,所有如果你的WhileActivity里包含SequenceActivity这样该方法是不会被调用的,如果你换成CompensatableSequenceActivity就可以了

    工作流运行时引擎保存已完成作用域活动的状态,以便实现补偿。必须调用重载的 Save 方法之一,将 activity 序列化到 Stream 中;然后可以选择在将 Stream 写入到数据存储区之前,对其执行其他处理。但是,在工作流运行时引擎调用 LoadCompletedContextActivity时,必须还原活动的相同副本。

    本例子的程序中不会涉及到这部分

    也同样使用了WorkflowPersistenceService的GetDefaultSerializedForm方法

    2.4 LoadCompletedContextActivity 

    和SaveCompletedContextActivity是对应的,加载SaveCompletedContextActivity中保存的已完成活动的AEC,就不多说了

    2.5 UnlockWorkflowInstanceState:解除对工作流实例状态的锁定。

    此方法是抽象的,因此它不包含对锁定和解锁的默认实现。

    实现自定义持久性服务时,如果要实现锁定方案,则需要重写此方法,并在 SaveWorkflowInstanceState方法中提供根据解锁参数的值进行锁定-解锁的机制。

    比如我们的工作流在取消的时候,这个方法被调用来解除工作流实例状态的锁定。

    2.6 UnloadOnIdle

    返回一个布尔值,确定在工作流空闲时是否将其卸载。

    这些都只是自定义持久化中最基本的,就先说这些吧。

    作者:生鱼片
             
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    Saltstack
    搭建中小规模集群之rsync数据同步备份
    Python开发【第七篇】:面向对象二
    批量管理
    inotify
    Python开发【第六篇】:面向对象
    网络文件系统NFS
    Linux基础介绍【第九篇】
    Linux基础介绍【第八篇】
    Linux基础介绍【第七篇】
  • 原文地址:https://www.cnblogs.com/carysun/p/WFCustomPersistenceService.html
Copyright © 2011-2022 走看看