zoukankan      html  css  js  c++  java
  • .Net Cancellable Task APM异步超时机制扩展

    概述

    .NET基于委托的APM(Asynchronous Programming Model)模式通过BeginInvoke, EndInvoke, AsyncCallback,IAsyncResult的组合使用,让程序员可以方便的进行异步调用、异步回调和同步等待等操作。但.NET平台还没有为线程的中止(abort)提供安全可靠的机制,也许正是基于这个原因APM并没有包含异步调用的超时机制,而是把这个可能引起争议的工作交给使用者自己来把握。

    作为APM模型的补充,本文通过CancellableTask类提供了一个异步调用超时机制。CancellableTask类的设计有两个主要的考虑:

    1.保持APM风格,使用者依然可以使用熟悉的BeginInvoke, EndInvoke, IAsyncResult, AsyncCallback等;

    2.提供基于Thread.Abort的默认超时处理,同时支持用户自定义cancel回调。

    使用

    CancellableTask的构造函数包含workCallbak和cancelCallback(可选)两参数,分别对应work回调和cancel回调。CancellableTask的BeginInvoke保持了APM的风格,可以看作是增加了timeout参数(单位:ms)的扩展版;而EndInvoke,AsyncCallback以及IAsyncResult的使用都和APM保持一致。Work委托产生的异常会在EndInvoke时抛出,同时若线程被超时中止,EndInvoke则会抛出ThreadAbortException异常。

    下面是一段CancellableTask的使用示例:

    class Program
    {
        
    static void Main(string[] args)
        {
            
    //默认超时直接abort线程
            {
                Console.WriteLine(
    "[case 1]");

                CancellableTask cancellableTask 
    = new CancellableTask(Work);

                State arg 
    = new State { Loop = 20, Stop = false };
                IAsyncResult asyncResult 
    = cancellableTask.BeginInvoke(
                    arg, 
                    (ar 
    => Console.WriteLine("Async Callback")), 
                    
    null
                    
    10 * 1000);
                asyncResult.AsyncWaitHandle.WaitOne();
                
    try
                {
                    
    object r = cancellableTask.EndInvoke(asyncResult);
                    Console.WriteLine(
    "return " + r);
                }
                
    catch (ThreadAbortException)
                {
                    Console.WriteLine(
    "Thread Aborted");
                }
                
    catch (Exception exp)
                {
                    Console.WriteLine(exp.ToString());
                }
            }

            
    //自定义Cancel回调
            {
                Console.WriteLine(Environment.NewLine 
    + "[case 2]");

                CancellableTask cancellableTask 
    = new CancellableTask(Work, Cancel);

                State arg 
    = new State { Loop = 20, Stop = false };
                IAsyncResult asyncResult 
    = cancellableTask.BeginInvoke(
                    arg,
                    (ar 
    =>
                        {
                            
    try
                            {
                                
    object r = cancellableTask.EndInvoke(ar);
                                Console.WriteLine(
    "return " + r);
                            }
                            
    catch (ThreadAbortException)
                            {
                                Console.WriteLine(
    "Thread Aborted");
                            }
                            
    catch (Exception exp)
                            {
                                Console.WriteLine(exp.ToString());
                            }
                        }
                    ),
                    arg,
                    
    10 * 1000);
            }
            Console.ReadLine();
        }

        
    static object Work(object arg)
        {
            State state 
    = arg as State;
            
    int i;
            
    for (i = 0; i < state.Loop; i++)
            {
                
    if (state.Stop) break;

                Console.WriteLine(i);
                Thread.Sleep(
    1000);
            }

            
    return i;
        }

        
    static void Cancel(object state)
        {
            State st 
    = state as State;
            st.Stop 
    = true;
        }
    }

    internal class State
    {
        
    public int Loop { getset; }
        
    public bool Stop { getset; }
    }

    实现

    CancellableTask通过wrapper对workCallback进行包装。Wrapper内部首先创建等待事件e,并通过ThreadPool.RegisterWaitForSingleObject注册事件和WaitOrTimeout回调,然后调用workCallback。若workCallback提前返回,调用e.Set(),ThreadPool会调用WaitOrTimeout回调,isTimeout参数为false,不进行处理;否则,当workCallback超时未返回,ThreadPool会调用WaitOrTimeout回调,isTimeout参数为true。WaitOrTimeout回调在isTimeout情况下,首先判断是否有自定义cancel回调,如果有则采用自定义回调;否则,默认情况下调用Thread.Abort终止work线程。下面是CancellableTask的实现细节:

    public class CancellableTask
    {
        
    public delegate object WorkCallback(object arg);
        
    public delegate void CancelCallback(object state);

        
    protected class TimeoutState
        {
            
    internal Thread thread;
            
    internal object state;

            
    public TimeoutState(Thread thread, object state)
            {
                
    this.thread = thread;
                
    this.state = state;
            }
        }

        
    protected WorkCallback workCallback;
        
    protected CancelCallback cancelCallback;
        
    protected WorkCallback wrapper;

        
    public CancellableTask(WorkCallback workCallback)
        {
            
    this.workCallback = workCallback;
        }

        
    public CancellableTask(WorkCallback workCallback, CancelCallback cancelCallback)
        {
            
    this.workCallback = workCallback;
            
    this.cancelCallback = cancelCallback;
        }

        
    public IAsyncResult BeginInvoke(object arg, AsyncCallback asyncCallback, object state, int timeout)
        {
            wrapper 
    = delegate(object argv)
            {
                AutoResetEvent e 
    = new AutoResetEvent(false);

                
    try
                {
                    TimeoutState waitOrTimeoutState 
    = new TimeoutState(Thread.CurrentThread, state);

                    ThreadPool.RegisterWaitForSingleObject(e, WaitOrTimeout, waitOrTimeoutState, timeout, 
    true);

                    
    return workCallback(argv);
                }
                
    finally
                {
                    e.Set();
                }
            };

            IAsyncResult asyncResult 
    = wrapper.BeginInvoke(arg, asyncCallback, state);

            
    return asyncResult;
        }

        
    public object EndInvoke(IAsyncResult result)
        {
            
    return wrapper.EndInvoke(result);
        }

        
    protected void WaitOrTimeout(object state, bool isTimeout)
        {
            
    try
            {
                
    if (isTimeout)
                {
                    TimeoutState waitOrTimeoutState 
    = state as TimeoutState;

                    
    if (null != cancelCallback)
                    {
                        cancelCallback(waitOrTimeoutState.state);
                    }
                    
    else
                    {
                        waitOrTimeoutState.thread.Abort();
                    }
                }
            }
            
    catch { }
        }
    }

    总结

    本文为.NET APM模型提供了异步超时机制扩展,一方面保持了APM编程风格,另一方面支持用户自定义cancel回调。需要注意的是,默认的cancel方式Thread.Abort的安全性问题,使用时应注意资源释放等。

    作者

    http://www.cnblogs.com/weidagang2046/,欢迎就线程问题交流探讨。

  • 相关阅读:
    RedisDump安装报错
    安装mysql解压版时遇到的错误
    Docker 私有仓库 Harbor搭建与使用
    最好的6个Go语言Web框架
    安裝 drone CLI和使用drone
    使用 Kubernetes Helm 安装 Drone
    从ELK到EFK演进
    搭建helm私服ChartMuseum
    Helm3的使用
    Helm3部署安装
  • 原文地址:https://www.cnblogs.com/weidagang2046/p/1394050.html
Copyright © 2011-2022 走看看