执行简单的计算限制操作+限制执行上下文流动:
public static void UseThreadPool() { CallContext.LogicalSetData("Data", 2);//存在当前线程的资源 ThreadPool.QueueUserWorkItem( state => Console.WriteLine("Name1={0}", CallContext.LogicalGetData("Data"))); ExecutionContext.SuppressFlow();//取消了上下文的流动 无法异步操作无法获得当前资源 ThreadPool.QueueUserWorkItem(ComputeBoundOp, CallContext.LogicalGetData("Data"));//这样的写法无效,委托的参数已经在当前线程复制一份传进去了。 ThreadPool.QueueUserWorkItem( state => Console.WriteLine("Name2={0}",CallContext.LogicalGetData("Data"))); ExecutionContext.RestoreFlow();//恢复上下文流动 ThreadPool.QueueUserWorkItem( state => Console.WriteLine("Name3={0}", CallContext.LogicalGetData("Data"))); //ThreadPool.QueueUserWorkItem(ComputeBoundOp, CallContext.LogicalGetData("Data")); //for (int i=0;i<5;i++) //{ // ThreadPool.QueueUserWorkItem(ComputeBoundOp,CallContext.LogicalGetData("Data")); // Console.WriteLine("doing other work"); // Thread.Sleep(1000); //} } private static void ComputeBoundOp(object state) { Console.WriteLine("ComputeBoundOp:state={0}", state); Thread.Sleep(1000); }
协作式取消和超时:
/// <summary> /// 当前线程取消异步操作 /// </summary> private static void CancellationDemo() { CancellationTokenSource cts = new CancellationTokenSource(); ThreadPool.QueueUserWorkItem(o => Count(cts.Token, 1000)); Console.WriteLine("Press <Enter> to cancel the operation."); Console.ReadLine(); cts.Cancel(); } private static void Count(CancellationToken token,int countTO) { for(int count=0;count<countTO;count++) { if(token.IsCancellationRequested) { Console.WriteLine("Count is cancelled"); break; } Console.WriteLine(count); Thread.Sleep(200); } }
任务取消时回调
/// <summary> /// 任务取消后回调register方法。 /// </summary> private static void CancellationTS_Register() { var cts = new CancellationTokenSource(); //bool useSynchronizationContext 指明是否使用调用线程SynchronizationContext来调用委托 //否则调用Cancel的线程会顺序调用所有方法。 cts.Token.Register(() => Console.WriteLine("Canceled 1,ThreadId "+Thread.CurrentThread.ManagedThreadId),false); var ctsRegistration2 = cts.Token.Register(() => Console.WriteLine("Canceled 2,ThreadId "+Thread.CurrentThread.ManagedThreadId)); //销毁回调方法。就不会执行了。 ctsRegistration2.Dispose(); ThreadPool.QueueUserWorkItem(o => Count(cts.Token, 1000)); //如果选择Flase,那么会把所有方法执行完后再抛出异常 cts.Cancel(true); var cts1 = new CancellationTokenSource(); cts1.Token.Register(() => Console.WriteLine("cts1 canceled")); var cts2 = new CancellationTokenSource(); cts2.Token.Register(() => Console.WriteLine("cts2 canceled")); //他在cts1或cts2取消时取消 var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cts1.Token, cts2.Token); linkedCts.Token.Register(() => Console.WriteLine("linkedCts canceled")); cts2.Cancel(); Console.WriteLine("cts1={0},cts2={1},linkedCts={2}", cts1.IsCancellationRequested, cts2.IsCancellationRequested, linkedCts.IsCancellationRequested); }
任务完成时自动启动新任务:
private static void TaskDemo() { var task = new Task<int>(n => Sum((int)n), 100); task.Start(); Thread.Sleep(4000); task.Wait(); Console.WriteLine("The Sum is :" + task.Result); CancellationTokenSource cts = new CancellationTokenSource(); Task<int> CancelT = Task.Run(() => Sum(cts.Token, 100), cts.Token); cts.Cancel();//任务还没开始,则任务无法再调度了,如果已经开始了,只能显示停止任务。所以需要修改sum方法传入一个Toke, try { Console.WriteLine("The sum is:" + CancelT.Result); } catch(Exception ex) { Console.WriteLine("CancelT任务终结了,并抛出异常:"+ex.ToString()); } Task<int> t = Task.Run(() => Sum(10000)); t.ContinueWith(ta => Console.WriteLine("The sum is" + ta.Result), TaskContinuationOptions.OnlyOnRanToCompletion); t.ContinueWith(ta => Console.WriteLine("Sum threw:"+ta.Exception.InnerException), TaskContinuationOptions.OnlyOnFaulted); t.ContinueWith(ta => Console.WriteLine("Sum was canceled"), TaskContinuationOptions.OnlyOnCanceled); } private static int Sum(int n) { int sum = 0; for(;n>0;n--) { checked { sum += n; } } return sum; } private static int Sum(CancellationToken ct,int n) { int sum = 0; for (; n > 0; n--) { ct.ThrowIfCancellationRequested(); checked { sum += n; } } return sum; }
任务可以启动子任务,任务可以具有父子关系
/// <summary> /// 任务启动子任务 /// </summary> public static void ParentTask() { Task<int[]> parent = new Task<int[]>(() => { var results = new int[3]; new Task(() => results[0] = Sum(10000), TaskCreationOptions.AttachedToParent).Start();//子任务结束,父任务才算结束 new Task(() => results[1] = Sum(20000), TaskCreationOptions.AttachedToParent).Start(); new Task(() => results[2] = Sum(30000), TaskCreationOptions.AttachedToParent).Start(); return results; }); var cwt = parent.ContinueWith( ParentTask => Array.ForEach(ParentTask.Result, Console.WriteLine)); parent.Start(); }
任务工厂使用
/// <summary> /// 任务工厂使用 /// </summary> public static void Parent() { Task parent = new Task(() => { //任务工厂可以统一进行配置,TaskFactory创建的所有Task对象都使用默认的 //TaskScheduler var cts = new CancellationTokenSource(); var tf = new TaskFactory<int>(cts.Token, TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); var childTasks = new[] { tf.StartNew(()=>Sum(cts.Token,1000)), tf.StartNew(()=>Sum(cts.Token,2000)), tf.StartNew(()=>Sum(cts.Token,3000)) }; //为每个字任务添加延续任务,只要一个任务出现异常,终止其他任务 for (int task = 0; task < childTasks.Length; task++) { childTasks[task].ContinueWith( t => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted); } //当所有任务结束,取所有子任务(不包括异常和取消)结果最大值,最后延续子任务输出该值。 tf.ContinueWhenAll( childTasks, completedTasks => completedTasks.Where( t => !t.IsFaulted && !t.IsCanceled).Max(t => t.Result), CancellationToken.None)//CancellationToken.None表示该任务不能被取消 .ContinueWith(t => Console.WriteLine("The maximum is:" + t.Result), TaskContinuationOptions.ExecuteSynchronously); }); parent.Start(); }