zoukankan      html  css  js  c++  java
  • APM之异步IO1

    概念

    异步执行计算限制的操作,可以使用线、线程池、Task在多个内核上调度任务,使多个线程并发的工作,从而高效的使用系统资源,同时提升应用程序的吞吐能力。

    异步I/O操作,允许将任务交由硬件设备处理,期间完全不占用线程和CPU资源,这样系统资源可以高效的使用。I/O操作的结果是由线程池来处理的。

     

    Windows中使用I/O完成端口的形式完成异步I/O<Windows核心编程>有详细描述。

    执行异步操作是构建高性能、可伸缩应用程序的关键,它允许用非常少的线程执行许多操作,和线程池配合,异步操作允许利用机器的所有CPU

    为此,CLR团队提供了一种模式:异步编程模型(Asynchronous Programming Modelm APM)

    Sytem.IO.Stream的类型都提供了BeginReadBeginWrite的类别的异步操作

     

    通过使用Wintellect Power Threading库可以显著减少异步开发的复杂性,如下是个例子:

        private static void ImplementedViaAsyncEnumerator()

        {

            // Start 1 server per CPU

            for (Int32 n = 0; n < Environment.ProcessorCount; n++)

            {

                var ae = new AsyncEnumerator();

                ae.BeginExecute(PipeServerAsyncEnumerator(ae), ae.EndExecute);

            }

     

            // Now make a 100 client requests against the server

            for (Int32 n = 0; n < 100; n++)

            {

                var ae = new AsyncEnumerator();

                ae.BeginExecute(PipeClientAsyncEnumerator(ae, "localhost""Request #" + n), ae.EndExecute);

            }

        }

     

        // This field records the timestamp of the most recent client's request

        private static DateTime s_lastClientRequestTimestamp = DateTime.MinValue;

     

        // The SyncGate enforces thread-safe access to the s_lastClientRequestTimestamp field

        private static readonly SyncGate s_gate = new SyncGate();

     

        private static IEnumerator<Int32> PipeServerAsyncEnumerator(AsyncEnumerator ae)

        {

            // Each server object performs asynchronous operations on this pipe

            using (var pipe = new NamedPipeServerStream(

               "Echo"PipeDirection.InOut, -1, PipeTransmissionMode.Message,

               PipeOptions.Asynchronous | PipeOptions.WriteThrough))

            {

     

                // Asynchronously accept a client connection

                pipe.BeginWaitForConnection(ae.End(), null);

                /*ae.End 这个方法返回一个委托,它引用了AsyncEnumerator内部的方法

                操作完成时,线程池通知AsyncEnumerator对象继续执行yield return 1语句后面的迭代器方法

                 */

                yield return 1; //异步操作的地方,使用这个允许线程返回它原来的地方,便于它做更多的工作

     

                // A client connected, let's accept another client

                // 客户连接后再建立一个新的对象提供服务

                var aeNewClient = new AsyncEnumerator();

                aeNewClient.BeginExecute(PipeServerAsyncEnumerator(aeNewClient), aeNewClient.EndExecute);

     

                // Accept the client connection

                //DequeueAsyncResult返回当异步操作完成时,由线程池线程传给AsyncEnumerator对象的IAsyncResult对象

                pipe.EndWaitForConnection(ae.DequeueAsyncResult());

     

                // Asynchronously read a request from the client

                Byte[] data = new Byte[1000];

                pipe.BeginRead(data, 0, data.Length, ae.End(), null);

                yield return 1;

     

                // The client sent us a request, process it. 

                Int32 bytesRead = pipe.EndRead(ae.DequeueAsyncResult());

     

                // Get the timestamp of this client's request

                DateTime now = DateTime.Now;

     

                // We want to save the timestamp of the most-recent client request. Since multiple

                // clients are running concurrently, this has to be done in a thread-safe way

                s_gate.BeginRegion(SyncGateMode.Exclusive, ae.End()); // Request exclusive access

                yield return 1;   // The iterator resumes when exclusive access is granted

     

                if (s_lastClientRequestTimestamp < now)

                    s_lastClientRequestTimestamp = now;

     

                s_gate.EndRegion(ae.DequeueAsyncResult());   // Relinquish exclusive access

     

                // My sample server just changes all the characters to uppercase

                // But, you can replace this code with any compute-bound operation

                data = Encoding.UTF8.GetBytes(

                   Encoding.UTF8.GetString(data, 0, bytesRead).ToUpper().ToCharArray());

     

                // Asynchronously send the response back to the client

                pipe.BeginWrite(data, 0, data.Length, ae.End(), null);

                yield return 1;

                // The response was sent to the client, close our side of the connection

                pipe.EndWrite(ae.DequeueAsyncResult());

            } // Close the pipe NamedPipeServerStream对象销毁

        }

     

        private static IEnumerator<Int32> PipeClientAsyncEnumerator(AsyncEnumerator ae, String serverName, String message)

        {

            // Each client object performs asynchronous operations on this pipe

            using (var pipe = new NamedPipeClientStream(serverName, "Echo",

                  PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.WriteThrough))

            {

                pipe.Connect(); // Must Connect before setting ReadMode

                pipe.ReadMode = PipeTransmissionMode.Message;

     

                // Asynchronously send data to the server

                Byte[] output = Encoding.UTF8.GetBytes(message);

                pipe.BeginWrite(output, 0, output.Length, ae.End(), null);

                yield return 1;

     

                // The data was sent to the server

                pipe.EndWrite(ae.DequeueAsyncResult());

     

                // Asynchronously read the server's response

                Byte[] data = new Byte[1000];

                pipe.BeginRead(data, 0, data.Length, ae.End(), data);

                yield return 1;

     

                // The server responded, display the response and close out connection

                Int32 bytesRead = pipe.EndRead(ae.DequeueAsyncResult());

     

                 Console.WriteLine("Server response: " + Encoding.UTF8.GetString(data, 0, bytesRead));
            }  // Close();      
    }

    IAsyncResult转为Task

    异步的通用接口IAsyncResult,使用Task的形式可以加入其它的控制(返回值处理等),如:

     

    普通的异步处理方法

            // Instead of this:
            WebRequest webRequest = WebRequest.Create("http://Wintellect.com/");
            webRequest.BeginGetResponse(result =>
            {
                WebResponse webResponse = null;
                try
                {
                    webResponse = webRequest.EndGetResponse(result);
                    Console.WriteLine("Content length: " + webResponse.ContentLength);
                }
                catch (WebException we)
                {
                    Console.WriteLine("Failed: " + we.GetBaseException().Message);
                }
                finally
                {
                    if (webResponse != null) webResponse.Close();
                }
            }, null);
     
            Console.ReadLine();  // for testing purposes
     
    使用Task的形式进行请求
            // Make a Task from an async operation that FromAsync starts
            /*WebRequest*/
            webRequest = WebRequest.Create("http://Wintellect.com/");
            var t1 = Task.Factory.FromAsync<WebResponse>(webRequest.BeginGetResponse, webRequest.EndGetResponse, nullTaskCreationOptions.None);
            var t2 = t1.ContinueWith(task =>
            {
                WebResponse webResponse = null;
                try
                {
                    webResponse = task.Result;
                    Console.WriteLine("Content length: " + webResponse.ContentLength);
                }
                catch (AggregateException ae)
                {
                    if (ae.GetBaseException() is WebException)
                        Console.WriteLine("Failed: " + ae.GetBaseException().Message);
                    else throw;
                }
                finally { if (webResponse != null) webResponse.Close(); }
            });
     
            try
            {
                t2.Wait();  // for testing purposes only
            }
            catch (AggregateException) { }
     

    应用程序及其线程处理模型

    每个应用程序都可能引入自己的线程处理模型

    控制台、Windows服务、Asp.netWeb Service:没有引入任何种类的线程处理模型,任何线程都可以在任何时候做它爱做的任何事情

    GUI程序(Windows Form WPF Silverlight):引入了一个线程处理模型,在这个模型中,创建窗口的线程是唯一能对那个窗口进行更新的线程

     

    Private static AsyncCallback SyncContextCallback(AsyncCallback callback)

        {

    // Capture the calling thread's SynchronizationContext-derived object

    SynchronizationContextsc = SynchronizationContext.Current;

     

    // If there is no SC, just return what was passed in

    if (sc == null) return callback;

     

    // Return a delegate that, when invoked, posts to the captured SC a method that

    // calls the original AsyncCallback passing it the IAsyncResult argument

    returnasyncResult =>sc.Post(result => callback((IAsyncResult)result), asyncResult);

        }

     

    如下的形式在UI程序中使用以上的处理就不用再处理手动的线程切换问题了

                var webRequest = WebRequest.Create("http://Wintellect.com/");
                webRequest.BeginGetResponse(SyncContextCallback(ProcessWebResponse), webRequest);
                base.OnMouseClick(e);
     
            private void ProcessWebResponse(IAsyncResult result)
            {
                // If we get here, this must be the GUI thread, it's OK to update the UI
                var webRequest = (WebRequest)result.AsyncState;
                using (var webResponse = webRequest.EndGetResponse(result))
                {
                    Text = "Content length: " + webResponse.ContentLength;
                }
            }

     

    详细参考:

    Clr Via C#

    http://transbot.blog.163.com

    http://ys-f.ys168.com/?CLR_via_CSharp_3rd_Edition_Code_by_Jeffrey_Richter.zip_55bism1e0e7bkisjthit2bso0cm5bs4bs1b5bktnql0c0bu22f05f12z

  • 相关阅读:
    安装VC6.0遇到的问题
    开发、测试环境
    OPENGL绘制文字
    C++实现文件关联
    MFC多国语言——配置文件
    MFC 资源记录
    如何解决——汉化英文界面出现乱码
    项目配置——添加第三方资源
    队列&生产者消费者模型
    抢票小程序
  • 原文地址:https://www.cnblogs.com/2018/p/2040333.html
Copyright © 2011-2022 走看看