zoukankan      html  css  js  c++  java
  • C# IOThread

    在看微软的ASP.NET - 将 ASP.NET 用作高性能文件下载器 示例里面用到了IO 线程,以前打算自己撸的,这里贴出来 已标记一下:

    ////////////////////////////////////////////////////
        // A delegate's BeginInvoke runs on a "worker thread" inside the CLR ThreadPool.
        // This allows you to run a delegate on a "completion port thread" inside the CLR ThreadPool
        // instead of a "worker thread" by calling IOBeginInvoke instead of BeginInvoke.
        // In a nut shell, this mechanism basically skips the actual system I/O and simply posts a
        // completion delegate to the completion port queue so the completion delegate will be 
        // executed by a "completion port thread" that is working the completion port queue.
        //
        // Example:
        // delegate.BeginInvoke => executes delegate on "worker thread".
        // delegate.IOBeginInvoke => executes delegate on "completion port thread".
        // 
        //
        // Extremely simplified explanation:
        //
        // CLR ThreadPool is made up of two pools of threads: "worker threads" and "completion port threads".
        //
        // Basically you can either queue a user work item which runs the delegate on a "worker thread",
        // or queue a native overlapped item which runs the completion delegate on a "completion port thread".
        // ThreadPool.QueueUserWorkItem (and delegate.BeginInvoke) => executes delegate on "worker thread".
        // ThreadPool.UnsafeQueueNativeOverlapped => executes completion delegate on "completion port thread".
        //
        //             (CLR ThreadPool)
        //               /        
        // [worker threads]      [completion port threads]
        //
        //  o o  oo  o  oo                    _____________post to queue using ThreadPool.UnsafeQueueNativeOverlapped
        //    o o oo o  o                    |             (i.e. PostQueuedCompletionStatus)
        //   o  o o  o o                     v
        //  oo o  oo  o o                  |  |
        //    o  oo oo o                   |  | <----------completion port queue (one per process)
        //                                 |__|
        //       ^                         oo o
        //       |                        o o oo <---------completion port threads (work the completion port queue)
        //       |                 (cpu)(cpu)(cpu)(cpu)                            (i.e. GetQueuedCompletionStatus in loop)
        //       |                               
        //       |                               ^
        //       |                               |
        //       |                               |
        //       |                               |
        //       |                               |
        //       |    Each individual completion delegate is given to the completion port queue to execute,
        //       |    and the "completion port threads" working the completion port queue execute the delegate.
        //       |    (This has much less risk of thread explosion because each delegate just gets posted to the
        //       |     completion port queue, instead of being given to its own individual thread. Basically
        //       |     the queue grows, not the threads.)
        //       |
        //       |    The completion delegate is supplied in the overlapped structure that is posted to the
        //       |    completion port queue.
        //       |
        //       |    Posting to queue (PostQueuedCompletionStatus) => done by ThreadPool.UnsafeQueueNativeOverlapped.
        //       |    Working queue (GetQueuedCompletionStatus in loop) => done by "completion port thread" working queue.
        //       |
        //       |
        // Each individual delegate is given to its own individual "worker thread" to execute,
        // and the OS schedules the "worker thread" to run on a cpu.
        // (This has the risk of thread explosion because each new delegate executes on its own
        //  individual "worker thread". If the number of threads grows to large the entire OS can
        //  grind to a hault, with all the memory used and the cpu's spending all their time
        //  just trying to schedule the threads to run but not actually doing any work.)
        //
        ////////////////////////////////////////////////////
        public static class IOThread
        {
            public delegate void ProcessRequestDelegate(HttpContext context);
    
            public class IOAsyncResult : IAsyncResult
            {
                public object AsyncState { get; set; }
                public WaitHandle AsyncWaitHandle { get; set; }
                public Delegate AsyncDelegate { get; set; }
                public bool CompletedSynchronously { get; set; }
                public bool IsCompleted { get; set; }
                public Exception Exception { get; set; }
            }
    
            unsafe public static IAsyncResult IOBeginInvoke(this ProcessRequestDelegate value, HttpContext context, AsyncCallback callback, object data)
            {
                ManualResetEvent evt = new ManualResetEvent(false);
    
                IOAsyncResult ar = new IOAsyncResult();
                ar.AsyncState = new object[] { context, callback, data };
                ar.AsyncDelegate = value;
                ar.AsyncWaitHandle = evt;
    
                Overlapped o = new Overlapped(0, 0, IntPtr.Zero, ar);
                NativeOverlapped* no = o.Pack(new IOCompletionCallback(ProcessRequestCompletionCallback), data);
                ThreadPool.UnsafeQueueNativeOverlapped(no);
    
                return ar;
            }
    
            unsafe private static void ProcessRequestCompletionCallback(uint errorCode, uint numBytes, NativeOverlapped* no)
            {
                try
                {
                    Overlapped o = Overlapped.Unpack(no);
    
                    ProcessRequestDelegate d = (ProcessRequestDelegate)((IOAsyncResult)o.AsyncResult).AsyncDelegate;
    
                    object[] state = (object[])o.AsyncResult.AsyncState;
                    HttpContext context = (HttpContext)state[0];
                    AsyncCallback callback = (AsyncCallback)state[1];
                    object data = state[2];
    
                    try
                    {
                        d(context);
                    }
                    catch(Exception ex)
                    {
                        ((IOAsyncResult)o.AsyncResult).Exception = ex;
                    }
    
                    ((IOAsyncResult)o.AsyncResult).IsCompleted = true;
                    ((ManualResetEvent)o.AsyncResult.AsyncWaitHandle).Set();
    
                    if (callback != null)
                        callback(o.AsyncResult);
                }
                finally
                {
                    Overlapped.Free(no);
                }
            }
    
            unsafe public static void IOEndInvoke(this ProcessRequestDelegate value, IAsyncResult result)
            {
                IOAsyncResult ar = (IOAsyncResult)result;
                ar.AsyncWaitHandle.WaitOne(Timeout.Infinite);
                if (ar.Exception != null)
                    throw ar.Exception;
            }
        }

    使用示例:

      ////////////////////////////////////////////////////
        // Example URLs to call download handler and download a file:
        //
        //    https://localhost/DownloadPortal/Download?file=file.txt
        //    https://localhost/DownloadPortal/Download?file=file.txt&chunksize=1000000
        //    https://localhost/DownloadPortal/Download?file=customer1/file.txt
        //    https://localhost/DownloadPortal/Download?file=customer1/file.txt&chunksize=1000000
        //
        //    Important!!!  Must replace 'localhost' in URL with actual machine name or SSL certificate will fail.
        //
        //    Can either include the 'Range' HTTP request header in the HTTP web request OR
        //    supply the 'chunksize' parameter in the URL. If include the 'Range' HTTP request header,
        //    it will transmit back that portion of the file identified by the 'Range'. If supply the
        //    'chunksize' parameter, it will transmit back the entire file in separate pieces the size of
        //    'chunksize'. If supply both, 'Range' will be used. If supply neither, it will transmit back
        //    the entire file in one piece.
        //
        // Web.config on IIS 7.0:
        //    <system.webServer>
        //      <handlers>
        //        <add name="Download" verb="*" path="Download" type="DownloadHandlers.DownloadHandler" />
        //      </handlers>
        //    </system.webServer>
        //
        // Put DownloadHandler.dll and IOThreads.dll in the /bin directory of the virtual directory on IIS.
        //
        // Put files to be downloaded into virtual directory or sub directory under virtual directory.
        //
        // To debug:
        //    Debug  Attach to Process  w3wp.exe
        //
        // Note: Make sure the IIS virtual directory is using an AppPool that is using .NET Framework 4.0.
        //       Define a new AppPool using .NET Framework 4.0, if no other AppPool exists that is using .NET Framework 4.0.
        ////////////////////////////////////////////////////
    
        public class DownloadHandler : IHttpAsyncHandler
        {
            public IAsyncResult BeginProcessRequest(HttpContext context, AsyncCallback cb, object extraData)
            {
                //Offloads to "completion port threads" inside CLR ThreadPool
                //instead of "worker threads" inside CLR ThreadPool, so that our
                //work doesn't use up the "worker threads" that are needed for the
                //IIS server's request processing. (see comment section at top of
                //IOThread class for more details)
                IOThread.ProcessRequestDelegate d = ProcessRequest;
                return d.IOBeginInvoke(context, cb, extraData);
            }
    
            public void EndProcessRequest(IAsyncResult result)
            {
                IOThread.IOAsyncResult ar = (IOThread.IOAsyncResult)result;
                IOThread.ProcessRequestDelegate d = (IOThread.ProcessRequestDelegate)ar.AsyncDelegate;
                d.IOEndInvoke(result);
            }
    
            public void ProcessRequest(HttpContext context)
            {
                try
                {
                    string file = context.Request.QueryString["File"];
                    if (string.IsNullOrEmpty(file))
                        throw new Exception("Must specify file in query string.  (Example: Download?File=your file)");
    
                    string fileName = Path.GetFileName(file);
                    if (string.IsNullOrEmpty(fileName))
                        throw new Exception("File name '" + fileName + "' is not valid.");
    
                    long chunkSize = 0;
                    if (!string.IsNullOrEmpty(context.Request.QueryString["ChunkSize"]))
                        if (context.Request.QueryString["ChunkSize"].Trim().Length > 0)
                            chunkSize = long.Parse(context.Request.QueryString["ChunkSize"]);
    
                    FileInfo fi = new FileInfo(context.Server.MapPath(file));
                    long fileLength = fi.Length;
                    if (chunkSize > 0 && chunkSize > fileLength)
                        throw new Exception("ChunkSize is greater than file length.");
    
    
                    context.Response.ClearHeaders();
                    context.Response.ClearContent();
                    context.Response.Clear();
    
                    //request is just checking file length
                    if (context.Request.HttpMethod == "HEAD")
                    {
                        context.Response.AddHeader("content-length", fileLength.ToString());
                        context.Response.AddHeader("accept-ranges", "bytes");
                        context.Response.Flush();
                        return;
                    }
                    
                    //file save
                    context.Response.ContentEncoding = Encoding.UTF8;
                    context.Response.AddHeader("content-disposition", "attachment;filename="" + fileName + """);
                    //context.Response.AddHeader("content-disposition", "attachment;filename="" + HttpUtility.UrlEncode(fileName) + """);
                    context.Response.ContentType = "application/octet-stream";
                    context.Response.AddHeader("cache-control", "no-store, no-cache");
                    context.Response.ExpiresAbsolute = DateTime.Now.Subtract(new TimeSpan(1, 0, 0, 0));
                    context.Response.Expires = -1;
                    context.Response.AddHeader("Connection", "Keep-Alive");
                    context.Response.AddHeader("accept-ranges", "bytes");
    
                    //request is for byte range
                    if (!string.IsNullOrEmpty(context.Request.Headers["Range"]) && context.Request.Headers["Range"].Trim().Length > 0)
                    {
                        string range = context.Request.Headers["Range"];
                        range = range.Replace(" ", "");
                        string[] parts = range.Split(new char[] { '=', '-' });
                        long contentLength = long.Parse(parts[2]) - long.Parse(parts[1]);
                        context.Response.AddHeader("content-length", contentLength.ToString());
                        string contentRange = string.Format("bytes {0}-{1}/{2}", parts[1], parts[2], fileLength.ToString());
                        context.Response.AddHeader("content-range", contentRange);
                        context.Response.AddHeader("last-modified", DateTime.Now.ToString("ddd, dd MMM yyyy hh:mm:ss") + " GMT");
                        byte[] bytes = Encoding.ASCII.GetBytes(string.Format("{0} : {1}", fi.Name, fi.LastAccessTimeUtc));
                        string eTag = Convert.ToBase64String(MD5CryptoServiceProvider.Create().ComputeHash(bytes));
                        context.Response.AddHeader("ETag", string.Format(""{0}"", eTag));
                        context.Response.StatusCode = 206;  //partial content
    
                        context.Response.TransmitFile(file, long.Parse(parts[1]), contentLength);
                        context.Response.Flush();
                    }
                    else  //request is not for byte range
                    {
                        context.Response.AddHeader("content-length", fileLength.ToString());
    
                        if (chunkSize <= 0)
                        {
                            context.Response.TransmitFile(file);
                            context.Response.Flush();
                        }
                        else
                        {
                            long index = 0;
                            while (true)
                            {
                                if (index >= fileLength)
                                    break;
                                if (index < fileLength && index + chunkSize > fileLength)
                                {
                                    context.Response.TransmitFile(file, index, fileLength - index);
                                    context.Response.Flush();
                                    break;
                                }
    
                                context.Response.TransmitFile(file, index, chunkSize);
                                context.Response.Flush();
                                index += chunkSize;
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    context.Response.ClearHeaders();
                    context.Response.ClearContent();
                    context.Response.Clear();
    
                    context.Response.StatusCode = 500;
                    context.Response.Write("Download Error:  " + ex.GetBaseException().Message);
                    context.Response.Flush();
                    //throw ex;
                }
                finally
                {
                    context.Response.End();
                    //context.ApplicationInstance.CompleteRequest();
                }
            }
    
            public bool IsReusable
            {
                get { return false; }
            }
        }
  • 相关阅读:
    文件监控(教学版)
    设备读写 之 直接方式(Direct I/O)
    过滤驱动加密文件(代码)
    Why ASMLIB and why not?
    SQL调优:Clustering Factor影响数据删除速度一例
    监控一个大事务的回滚
    crsctl status resource t init in 11.2.0.2 grid infrastructure
    Script:收集Oracle备份恢复信息
    Only ARCH Bgprocess may create archivelog?
    11g新特性:A useful View V$DIAG_INFO
  • 原文地址:https://www.cnblogs.com/majiang/p/7954868.html
Copyright © 2011-2022 走看看