zoukankan      html  css  js  c++  java
  • 坚持学习WF(19):工作流线程

    [置顶]坚持学习WF文章索引

    WF中提供了很多内置的服务,其中工作流计划服务是用来管理工作流实例线程的。默认情况下WF会自动使用DefaultWorkflowSchedulerService服务。使用该服务时在工作流运行时引擎上以异步方式运行工作流实例的线程。 等待运行的工作流存储在 DefaultWorkflowSchedulerService 的内部队列中.当 DefaultWorkflowSchedulerService 要启动工作流时,从 .NET Framework 线程池中获取一个线程并使用此线程运行工作流。 MaxSimultaneousWorkflows 属性确定调度程序服务同一时间允许的并发线程数。 例如,如果限值为 4,则 DefaultWorkflowSchedulerService 将从 .NET Framework 线程池中最多获取 4 个线程来执行工作流。 如果已经运行 4 个工作流,则其他工作项(工作流)将放入队列中,最终在线程可用时执行。

    除了使用WF默认提供的,我们还可以手动去加载ManualWorkflowSchedulerService服务。它提供了一个线程服务,该服务允许创建工作流实例的宿主应用程序指定用于运行工作流实例的 Thread。 使用此线程服务,宿主应用程序可在单个 Thread 上运行一个工作流实例(即处于同步模式)。 此模式将阻止宿主应用程序的执行,直到工作流实例进入空闲状态。 随后,只能通过使用此服务的 RunWorkflow 方法执行工作流实例。

    下面我们通过MSDN中的一个例子来说明工作流中的线程以及如何使用DefaultWorkflowSchedulerService和ManualWorkflowSchedulerService服务。

    一:在该实例中首先开发一个自定义活动WaitForMessageActivity.cs,代码如下:

    using System;
    using System.ComponentModel;
    using System.Threading;
    using System.Workflow.Activities;
    using System.Workflow.ComponentModel;
    using System.Workflow.ComponentModel.Design;
    using System.Workflow.Runtime;
    namespace Microsoft.Samples.Workflow.WorkflowThreading
    {
        [ToolboxItem(typeof(ActivityToolboxItem))]
        public partial class WaitForMessageActivity: Activity
       
    {
            WorkflowQueue workflowQueue;

            public WaitForMessageActivity()
            {
                InitializeComponent();
            }

            protected override void Initialize(IServiceProvider provider)
            {
                ThreadMonitor.WriteToConsole(Thread.CurrentThread, "WaitForMessageActivity",
                                             "WaitForMessageActivity执行构造函数");
                WorkflowQueuingService queuingService = (WorkflowQueuingService)provider.
                                             GetService(typeof(WorkflowQueuingService));
                this.workflowQueue = queuingService.CreateWorkflowQueue(
                                             "WaitForMessageActivityQueue", false);
                this.workflowQueue.QueueItemAvailable += this.HandleExternalEvent;
            }

            private void HandleExternalEvent(Object sender, QueueEventArgs args)
            {
                ThreadMonitor.WriteToConsole(Thread.CurrentThread, "WaitForMessageActivity",
                                             "WaitForMessageActivity外部事件处理程序");
                object data = this.workflowQueue.Dequeue();
                ActivityExecutionContext context = sender as ActivityExecutionContext;
                context.CloseActivity();
            }

            protected override ActivityExecutionStatus Execute(ActivityExecutionContext context)
            {
                ThreadMonitor.WriteToConsole(Thread.CurrentThread, "WaitForMessageActivity",
                                             "WaitForMessageActivity等待外部事件处理程序");
                return ActivityExecutionStatus.Executing;
            }
        }
    }

    二:工作流设计如下图:
     
    WFThread1 
     
    DelayActivity对工作流的影响我们在前面的文章中有过说明,我们就不管这个了。工作流的代码如下:
     
    using System;
    using System.ComponentModel;
    using System.Workflow.Activities;
    using System.Threading;
    namespace Microsoft.Samples.Workflow.WorkflowThreading
    {
        public sealed partial class ThreadingWorkflow : SequentialWorkflowActivity
        {
            private string branchFlag;
    
            public ThreadingWorkflow()
            {
                InitializeComponent();
                ThreadMonitor.WriteToConsole(Thread.CurrentThread, "ThreadingWorkflow", 
    "ThreadingWorkflow: 执行构造函数"); } public string BranchFlag { get { return this.branchFlag;} set { this.branchFlag = value;} } private void OnCodeActivity1ExecuteCode(object sender, EventArgs e) { ThreadMonitor.WriteToConsole(Thread.CurrentThread, "ThreadingWorkflow",
    "CodeActivity1的事件处理程序"); } private void OnCodeActivity2ExecuteCode(object sender, EventArgs e) { ThreadMonitor.WriteToConsole(Thread.CurrentThread, "ThreadingWorkflow",
    "CodeActivity2的事件处理程序"); } private void OnCodeActivity3ExecuteCode(object sender, EventArgs e) { ThreadMonitor.WriteToConsole(Thread.CurrentThread, "ThreadingWorkflow",
    "CodeActivity3的事件处理程序"); } private void IfElseBranchActivityCodeCondition(object sender, ConditionalEventArgs e) { e.Result = !this.BranchFlag.Equals("Delay", StringComparison.OrdinalIgnoreCase); } } }
    三:宿主程序代码如下:
     
    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Workflow.Runtime;
    using System.Workflow.Runtime.Hosting;
    using System.Drawing;
    namespace Microsoft.Samples.Workflow.WorkflowThreading
    {
        class Program
        {
            static AutoResetEvent waitHandle = new AutoResetEvent(false);
            static AutoResetEvent readyHandle = new AutoResetEvent(false);
            static WorkflowInstance workflowInstance;
            static WorkflowRuntime workflowRuntime;
    
            static void Main(string[] args)
            {
                if (args.Length < 2)
                {
                    Console.WriteLine("使用该形式命令行WorkflowThreading.exe [Single | Multi] 
    [Delay | WaitForMessage]"
    ); return; } if (!args[0].Equals("Single", StringComparison.OrdinalIgnoreCase) &&
    !args[0].Equals("Multi", StringComparison.OrdinalIgnoreCase)) { Console.WriteLine("指定Single 或者Multi 作为第一个命令行参数"); return; } if (!args[1].Equals("Delay", StringComparison.OrdinalIgnoreCase) &&
    !args[1].Equals("WaitForMessage", StringComparison.OrdinalIgnoreCase)) { Console.WriteLine("指定Delay 或者WaitForMessage 作为第二个命令行参数"); return; } ThreadMonitor.Enlist(Thread.CurrentThread, "主机"); Console.ForegroundColor = ConsoleColor.White; using (workflowRuntime = new WorkflowRuntime()) { ManualWorkflowSchedulerService scheduler = null; if (args[0].ToString().Equals("Single", StringComparison.OrdinalIgnoreCase)) { scheduler = new ManualWorkflowSchedulerService(); workflowRuntime.AddService(scheduler); } workflowRuntime.StartRuntime(); workflowRuntime.WorkflowCompleted += OnWorkflowCompleted; workflowRuntime.WorkflowTerminated += OnWorkflowTerminated; workflowRuntime.WorkflowIdled += OnWorkflowIdled; workflowRuntime.WorkflowCreated += OnWorkflowCreated; Type type = typeof(ThreadingWorkflow); Dictionary<string, object> workflowParameters = new Dictionary<string, object>(); workflowParameters.Add("BranchFlag", args[1]); Console.WriteLine("\n--- 开始工作流之前---\n"); workflowInstance = workflowRuntime.CreateWorkflow(type, workflowParameters); workflowInstance.Start(); Console.WriteLine("\n--- 开始工作流之后---\n"); if (scheduler != null) scheduler.RunWorkflow(workflowInstance.InstanceId); readyHandle.WaitOne(); if (args[1].Equals("WaitForMessage", StringComparison.OrdinalIgnoreCase)) { workflowInstance.EnqueueItem("WaitForMessageActivityQueue", "Hello",
    null, null); } if (scheduler != null) scheduler.RunWorkflow(workflowInstance.InstanceId); waitHandle.WaitOne(); workflowRuntime.StopRuntime(); } } static void OnWorkflowCreated(object sender, WorkflowEventArgs e) { ThreadMonitor.WriteToConsole(Thread.CurrentThread, "Host",
    "Host: Processed WorkflowCreated Event"); } static void OnWorkflowIdled(object sender, WorkflowEventArgs e) { if (workflowRuntime.GetService<ManualWorkflowSchedulerService>() != null) SetReloadWorkflowTimer(); else readyHandle.Set(); ThreadMonitor.WriteToConsole(Thread.CurrentThread, "Host",
    "Host: Processed WorkflowIdle Event"); Console.WriteLine("\n--- 工作流空闲---\n"); } static void SetReloadWorkflowTimer() { DateTime reloadTime = workflowInstance.GetWorkflowNextTimerExpiration(); if (reloadTime == DateTime.MaxValue) { readyHandle.Set(); } else { TimeSpan timeDifference = reloadTime - DateTime.UtcNow + new TimeSpan(0, 0, 0, 0, 1); Timer timer = new System.Threading.Timer( new TimerCallback(ReloadWorkflow), null,timeDifference < TimeSpan.Zero ? TimeSpan.Zero : timeDifference, new TimeSpan(-1)); } } static void ReloadWorkflow(object state) { if (workflowInstance.GetWorkflowNextTimerExpiration() > DateTime.UtcNow) SetReloadWorkflowTimer(); else readyHandle.Set(); } static void OnWorkflowCompleted(object sender, WorkflowCompletedEventArgs e) { ThreadMonitor.WriteToConsole(Thread.CurrentThread, "Host",
    "Host: Processed WorkflowCompleted Event"); waitHandle.Set(); Console.WriteLine("\n--- 工作流完成---\n"); } static void OnWorkflowTerminated(object sender, WorkflowTerminatedEventArgs e) { Console.WriteLine(e.Exception.Message); waitHandle.Set(); } } }
     
    在宿主程序我们根据命令行参数的不同来决定是否加载ManualWorkflowSchedulerService ,如果加载了该服务我们
    调用该类的RunWorkflow方法来运行工作流实例。
     
    四:ThreadMonitor类的用途是用不同的颜色为每个线程的输出着色。代码如下:
     
    using System;
    using System.Threading;
    using System.Collections.Generic;
    using System.Text;
    namespace Microsoft.Samples.Workflow.WorkflowThreading
    {
        public static class ThreadMonitor
        {
            private static int threadCount;
            private static Dictionary<string, ConsoleColor> threadList = new 
    Dictionary<string, ConsoleColor>(); public static void Enlist(Thread thread, string instanceName) { if ((thread.Name == null) || (thread.Name.Length == 0)) { thread.Name = string.Format("{0} 线程[{1}]", instanceName, threadCount++); if (!threadList.ContainsKey(thread.Name)) threadList.Add(thread.Name, GetConsoleColor()); } } public static void WriteToConsole(Thread thread, string instanceName, string message) { Enlist(thread, instanceName); WriteToConsole(thread, message); } public static void WriteToConsole(Thread thread, string message) { if (threadList.ContainsKey(thread.Name)) Console.ForegroundColor = threadList[thread.Name]; Console.WriteLine("{0} --> {1}", thread.Name, message); } private static ConsoleColor GetConsoleColor() { if ((int)Console.ForegroundColor < 9) return ConsoleColor.White; else return (ConsoleColor)(int)Console.ForegroundColor - 1; } } }
    五:下图是执行的结果对比,我们可以清晰的看到线程的变化情况。
     
    WFThread2 

    作者:生鱼片
             
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    Spring IOC容器的初始化-(二)BeanDefinition的载入和解析
    Spring IOC容器的初始化—(一)Resource定位
    SpringMVC的启动过程
    Spring IOC容器在Web容器中是怎样启动的
    Cannot retrieve metalink for repository: epel/x86_64. Please verify its path and try again 问题分析
    网络丢包分析
    yum安装jdk如何配置JAVA_HOME
    Linux网卡绑定
    Linux Buffer/Cache 的区别
    理解load averages
  • 原文地址:https://www.cnblogs.com/carysun/p/WFThread.html
Copyright © 2011-2022 走看看