zoukankan      html  css  js  c++  java
  • C#(99):一、.NET 1.0 异步编程模型(APM)

    一、概念

    .NET 1.0提出了APM(Asynchronous Programming Model)即异步编程模式。

    .NET的类库有以BeginXXX和EndXXX类似的方法,就是使用异步编程模型。

    NET Framework很多类也实现了该模式,同时我们也可以自定义类来实现该模式,即在自定义的类中实现返回类型为IAsyncResult接口的BeginXXX方法和EndXXX方法,另外委托类型也定义了BeginInvoke和EndInvoke方法。

    异步编程模型的本质

    利用委托和线程池帮助我们实现异步编程模型模式。

    该模式利用一个线程池线程去执行一个操作,在FileStream类BeginRead方法中就是执行一个读取文件操作,该线程池线程会立即将控制权返回给调用线程,此时线程池线程在后台进行这个异步操作;

    异步操作完成之后,通过回调函数来获取异步操作返回的结果,此时就是利用委托的机制。

    1、BeginXxx方法——开始执行异步操作介绍

    当需要读取文件中的内容时,我们通常会采用FileStream的同步方法Read来读取,该同步方法的定义为:

    // 从文件流中读取字节块并将该数据写入给定的字节数组中
    public override int Read(byte[] array, int offset, int count )

    该同步方法会堵塞执行的线程。

    可以通过BeginRead方法来实现异步编程,使读取操作不再堵塞UI线程。BeginRead方法代表异步执行Read操作,并返回实现IAsyncResult接口的对象,该对象存储着异步操作的信息。

    // 开始异步读操作
    // 前面的3个参数和同步方法代表的意思一样,这里就不说了,可以看到这里多出了2个参数
    // userCallback代表当异步IO操作完成时,你希望由一个线程池线程执行的方法,该方法必须匹配AsyncCallback委托
    // stateObject代表你希望转发给回调方法的一个对象的引用,在回调方法中,可以查询IAsyncResult接口的AsyncState属性来访问该对象
    public override IAsyncResult BeginRead(byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject)

    从上面的代码中可以看出异步方法和同步方法的区别,如果你在使用该异步方法时,不希望异步操作完成后调用任何代码,你可以把userCallback参数设置为null

    2、EndXxx方法——结束异步操作介绍

    前面介绍完了BeginXxx方法,我们看到所有BeginXxx方法返回的都是实现了IAsyncResult接口的一个对象,并不是对应的同步方法那样直接得到结果。

    此时我们需要调用对应的EndXxx方法来结束异步操作,并向该方法传递IAsyncResult对象,EndXxx方法的返回类型就是和同步方法一样的。例如,FileStreamEndRead方法返回一个Int32来代表从文件流中实际读取的字节数。

    // 摘要:  等待挂起的异步读取完成。
    // asyncResult:对要完成的挂起异步请求的引用。
    // 返回结果: 从流中读取的字节数.
    public virtual int EndRead(IAsyncResult asyncResult);

    对于访问异步操作的结果,APM的首选方式是:
    使用 AsyncCallback委托来指定操作完成时要调用的方法,在操作完成后调用的方法中调用EndXxx操作来获得异步操作的结果。

    二、APM示例:

    代码:

    private static void Main(string[] args)
    {
        string downUrl = "http://download.microsoft.com/download/5/B/9/5B924336-AA5D-4903-95A0-56C6336E32C9/TAP.docx";
        DownLoadFileSync(downUrl);  //同步下载文件,在下载操作完成之后我们才可以看到"Start DownLoad File......." 消息显示
        DownloadFileAsync(downUrl); //异步下载文件,在下载操作完成之前我们就可以看到"Start DownLoad File......." 消息显示
        Console.WriteLine("开始下载文件.........");
        Console.ReadLine();
    }
    //同步下载文件
    private static void DownLoadFileSync(string url) { RequestState req = new RequestState(); // 创建一个 RequestState 实例 try { HttpWebRequest myHttpWebRequest = (HttpWebRequest)WebRequest.Create(url); // 初始化一个 HttpWebRequest 对象 req.request = myHttpWebRequest; // 指派 HttpWebRequest实例到requestState的request字段. req.response = (HttpWebResponse)myHttpWebRequest.GetResponse(); req.streamResponse = req.response.GetResponseStream(); int readSize = req.streamResponse.Read(req.BufferRead, 0, req.BufferRead.Length); while (readSize > 0) { req.filestream.Write(req.BufferRead, 0, readSize); readSize = req.streamResponse.Read(req.BufferRead, 0, req.BufferRead.Length); } Console.WriteLine(" 此文件的长度是: {0}", req.filestream.Length); Console.WriteLine("下载完成,下载路径是: {0}", req.savepath); } catch (Exception e) { Console.WriteLine("错误信息:{0}", e.Message); } finally { req.response.Close(); req.filestream.Close(); } }

    //异步下载文件
    private static void DownloadFileAsync(string url)
    {
        try
        {
            HttpWebRequest myHttpWebRequest = (HttpWebRequest)WebRequest.Create(url);
            RequestState req = new RequestState();
            req.request = myHttpWebRequest;
            myHttpWebRequest.BeginGetResponse(new AsyncCallback(ResponseCallback), req);
        }
        catch (Exception e)
        {
            Console.WriteLine("错误信息:{0}", e.Message);
        }
    }
    
    //每个异步操作完成时,将调用下面的方法
    private static void ResponseCallback(IAsyncResult callbackresult)
    {
        RequestState req = (RequestState)callbackresult.AsyncState; // 获取 RequestState 对象
        HttpWebRequest myHttpRequest = req.request;
        req.response = (HttpWebResponse)myHttpRequest.EndGetResponse(callbackresult); // 结束一个对英特网资源的的异步请求
        Stream responseStream = req.response.GetResponseStream(); //从服务器获取响应流
        req.streamResponse = responseStream;
        IAsyncResult asynchronousRead = responseStream.BeginRead(req.BufferRead, 0, req.BufferRead.Length, ReadCallBack, req);//异步读取流到字节数组
    }
    
    // 写字节数组到 FileStream
    private static void ReadCallBack(IAsyncResult asyncResult)
    {
        try
        {
            RequestState req = (RequestState)asyncResult.AsyncState; // 获取 RequestState 对象
            Stream responserStream = req.streamResponse;   //从RequestState对象中获取 Response Stream
    
            int readSize = responserStream.EndRead(asyncResult);
            if (readSize > 0)
            {
                req.filestream.Write(req.BufferRead, 0, readSize);
                responserStream.BeginRead(req.BufferRead, 0, req.BufferRead.Length, ReadCallBack, req);//循环调用ReadCallBack方法。
            }
            else
            {
                Console.WriteLine("
    此文件的长度是: {0}", req.filestream.Length);
                Console.WriteLine("下载完成,下载路径是: {0}", req.savepath);
                req.response.Close();
                req.filestream.Close();
            }
        }
        catch (Exception e)
        {
            Console.WriteLine("错误信息:{0}", e.Message);
        }
    }
    
    //存储请求的状态类
    public class RequestState
    {
        public int BufferSize = 1024;
        public string savepath = Environment.GetFolderPath(Environment.SpecialFolder.Desktop) + "\TAP.docx";
        public byte[] BufferRead;
        public HttpWebRequest request;
        public HttpWebResponse response;
        public Stream streamResponse;
        public FileStream filestream;
    
        public RequestState()
        {
            BufferRead = new byte[BufferSize];
            request = null;
            streamResponse = null;
            if (File.Exists(savepath))
            {
                File.Delete(savepath);
            }
            filestream = new FileStream(savepath, FileMode.OpenOrCreate);
        }
    }

    运行结果:

    使用同步方法下载文件的运行结果为:(从运行结果也可以看出,调用的是同步方法时,此时会堵塞主线程,直到文件的下载操作被完成之后主线程才继续执行后面的代码)

    image

    使用APM异步编程:运行结果为(从运行结果也可以看出,在主线程中调用 DownloadFileAsync(downUrl)方法时,DownloadFileAsync(downUrl)方法中的req.BeginGetResponse调用被没有阻塞调用线程(即主线程),而是立即返回到主线程,是主线程后面的代码可以立即执行)

    image

    三、委托实例的异步调用(BeginInvoke、EndInvoke方法

    在前面的介绍中已经提到委托类型也会定义了BeginInvoke方法和EndInvoke方法,所以委托类型也实现了异步编程模型,所以可以使用委托的BeginInvokeEndInvoke方法来回调同步方法从而实现异步编程。

    因为调用委托的BeginInvoke方法来执行一个同步方法时,此时会使用线程池线程回调这个同步方法并立即返回到调用线程中,由于耗时操作在另外一个线程上运行,所以执行BeginInvoke方法的主线程就不会被堵塞。

    下面实现在线程池线程中如何更新GUI线程中窗体。

    // 定义用来实现异步编程的委托
    private delegate string AsyncMethodCaller(string fileurl);
    
    private void btnDownLoad_Click(object sender, EventArgs e)
    {
        AsyncMethodCaller methodCaller = new AsyncMethodCaller(DownLoadFileSync);//DownLoadFileSync为同步下载文件的方法
        methodCaller.BeginInvoke(this.txbUrl.Text.Trim(), GetResult, null);
    }
    
    // 异步操作完成时执行的方法
    private void GetResult(IAsyncResult result)
    {
        AsyncMethodCaller caller = (AsyncMethodCaller)((AsyncResult)result).AsyncDelegate;// 或者(AsyncMethodCaller)result.AsyncState;
        string returnstring = caller.EndInvoke(result); // 调用EndInvoke去等待异步调用完成并且获得返回值,如果异步调用尚未完成,则 EndInvoke 会一直阻止调用线程,直到异步调用完成
    
        // 然后Invoke方法来使更新GUI操作方法由GUI 线程去执行
        this.Invoke(new MethodInvoker(() =>
        {
            rtbState.Text = returnstring;
            btnDownLoad.Enabled = true;
        }));
    }

    运行的结果为:

    04194654_4dc331037959450e9cf9a14a6d18da42[1]

    上例子中使用了控件的Invoke方式进行异步回调访问控件的方法,其背后是通过获得GUI线程的同步上文对象SynchronizationContext,然后同步调用同步上下文对象的post方法把要调用的方法交给GUI线程去处理。

    五、task实例

    假如现在有这样的一个需求,我们需要从3个txt文件中读取字符,然后进行倒序,前提是不能阻塞主线程。

    如果不用task的话我可能会用工作线程去监视一个bool变量来判断文件是否全部读取完毕,然后再进行倒序,我也说了,相对task来说还是比较麻烦的,这里我就用task来实现。

    private static void Main()
    {
        string[] array = { "C://1.txt", "C://2.txt", "C://3.txt" };
        List<Task<string>> taskList = new List<Task<string>>(3);
        foreach (var item in array)
        {
            taskList.Add(ReadAsyc(item));
        }
        Task.Factory.ContinueWhenAll(taskList.ToArray(), i =>
        {
            string result = string.Empty;
            //获取各个task返回的结果
            foreach (var item in i)
            {
                result += item.Result;
            }
            //倒序
            String content = new String(result.OrderByDescending(j => j).ToArray());
            Console.WriteLine("倒序结果:" + content);
        });
        Console.WriteLine("我是主线程,我不会被阻塞");
        Console.ReadKey();
    }
    
    //异步读取
    private static Task<string> ReadAsyc(string path)
    {
        FileInfo info = new FileInfo(path);
        byte[] b = new byte[info.Length];
        FileStream fs = new FileStream(path, FileMode.Open);
        Task<int> task = Task<int>.Factory.FromAsync(fs.BeginRead, fs.EndRead, b, 0, b.Length, null, TaskCreationOptions.None);
        //返回当前task的执行结果
        return task.ContinueWith(i =>
        {
            return i.Result > 0 ? Encoding.Default.GetString(b) : string.Empty;
        }, TaskContinuationOptions.ExecuteSynchronously);
    }
  • 相关阅读:
    设计模式学习笔记——Bridge 桥接模式
    设计模式学习笔记——Adapter 适配器模式
    protoc protobuff安装
    docker-compose启动consul
    docker etcd 环境搭建
    nifi的去重方案设计(二)-外部存储mysql全局去重
    实现一套ES全文检索语法-到Lucene语法的转换工具,以实现在es外部兼容处理文本分词
    nifi的去重方案设计(一)-单队列内去重.md
    k8s 证书过期处理
    部分项目从kafka迁移至pulsar,近期使用中碰到了一些问题,勉强把大的坑踩完了,topic永驻,性能相关
  • 原文地址:https://www.cnblogs.com/springsnow/p/13139019.html
Copyright © 2011-2022 走看看