zoukankan      html  css  js  c++  java
  • C#实现IOCP(完成端口)的详细代码示例

    C#实现IOCP(完成端口)的详细代码如下:

    using System;
    using System.Threading;  // Included for the Thread.Sleep call
    using Continuum.Threading;
    namespace Sample
    {
        //============================================
        /// <summary> Sample class for the threading class </summary>
        public class UtilThreadingSample
        {
            //******************************************* 
            /// <summary> Test Method </summary>
            static void Main()
            {
                // Create the MSSQL IOCP Thread Pool
                IOCPThreadPool pThreadPool = new IOCPThreadPool(0, 5, 10, new IOCPThreadPool.USER_FUNCTION(IOCPThreadFunction));
                pThreadPool.PostEvent(10);
                Thread.Sleep(100);
                pThreadPool.Dispose();
            }
            //*****************************************
            /// <summary> Function to be called by the IOCP thread pool.  Called when
            ///           a command is posted for processing by the SocketManager </summary>
            /// <param name="iValue"> The value provided by the thread posting the event </param>
            static public void IOCPThreadFunction(Int32 iValue)
            {
                try
                {
                    Console.WriteLine("Value: {0}", iValue);
                }
                catch (Exception pException)
                {
                    Console.WriteLine(pException.Message);
                }
            }
        }
    } 
    
    using System;
    using System.Threading;
    using System.Runtime.InteropServices; 
    
    namespace Continuum.Threading
    { 
    
        // Structures
        //==========================================
        /// <summary> This is the WIN32 OVERLAPPED structure </summary>
        [StructLayout(LayoutKind.Sequential, CharSet = CharSet.Auto)]
        public unsafe struct OVERLAPPED
        {
            UInt32* ulpInternal;
            UInt32* ulpInternalHigh;
            Int32 lOffset;
            Int32 lOffsetHigh;
            UInt32 hEvent;
        } 
    
        // Classes
        //============================================
        /// <summary> This class provides the ability to create a thread pool to manage work.  The
        ///           class abstracts the Win32 IOCompletionPort API so it requires the use of
        ///           unmanaged code.  Unfortunately the .NET framework does not provide this functionality </summary>
        public sealed class IOCPThreadPool
        { 
    
            // Win32 Function Prototypes
            /// <summary> Win32Func: Create an IO Completion Port Thread Pool </summary>
            [DllImport("Kernel32", CharSet = CharSet.Auto)]
            private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads); 
    
            /// <summary> Win32Func: Closes an IO Completion Port Thread Pool </summary>
            [DllImport("Kernel32", CharSet = CharSet.Auto)]
            private unsafe static extern Boolean CloseHandle(UInt32 hObject); 
    
            /// <summary> Win32Func: Posts a context based event into an IO Completion Port Thread Pool </summary>
            [DllImport("Kernel32", CharSet = CharSet.Auto)]
            private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, OVERLAPPED* pOverlapped); 
    
            /// <summary> Win32Func: Waits on a context based event from an IO Completion Port Thread Pool.
            ///           All threads in the pool wait in this Win32 Function </summary>
            [DllImport("Kernel32", CharSet = CharSet.Auto)]
            private unsafe static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, OVERLAPPED** ppOverlapped, UInt32 uiMilliseconds); 
    
            // Constants
            /// <summary> SimTypeConst: This represents the Win32 Invalid Handle Value Macro </summary>
            private const UInt32 INVALID_HANDLE_VALUE = 0xffffffff; 
    
            /// <summary> SimTypeConst: This represents the Win32 INFINITE Macro </summary>
            private const UInt32 INIFINITE = 0xffffffff; 
    
            /// <summary> SimTypeConst: This tells the IOCP Function to shutdown </summary>
            private const Int32 SHUTDOWN_IOCPTHREAD = 0x7fffffff; 
    
    
    
            // Delegate Function Types
            /// <summary> DelType: This is the type of user function to be supplied for the thread pool </summary>
            public delegate void USER_FUNCTION(Int32 iValue); 
    
    
            // Private Properties
            private UInt32 m_hHandle;
            /// <summary> SimType: Contains the IO Completion Port Thread Pool handle for this instance </summary>
            private UInt32 GetHandle { get { return m_hHandle; } set { m_hHandle = value; } } 
    
            private Int32 m_uiMaxConcurrency;
            /// <summary> SimType: The maximum number of threads that may be running at the same time </summary>
            private Int32 GetMaxConcurrency { get { return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } } 
    
            private Int32 m_iMinThreadsInPool;
            /// <summary> SimType: The minimal number of threads the thread pool maintains </summary>
            private Int32 GetMinThreadsInPool { get { return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } } 
    
            private Int32 m_iMaxThreadsInPool;
            /// <summary> SimType: The maximum number of threads the thread pool maintains </summary>
            private Int32 GetMaxThreadsInPool { get { return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } } 
    
            private Object m_pCriticalSection;
            /// <summary> RefType: A serialization object to protect the class state </summary>
            private Object GetCriticalSection { get { return m_pCriticalSection; } set { m_pCriticalSection = value; } } 
    
            private USER_FUNCTION m_pfnUserFunction;
            /// <summary> DelType: A reference to a user specified function to be call by the thread pool </summary>
            private USER_FUNCTION GetUserFunction { get { return m_pfnUserFunction; } set { m_pfnUserFunction = value; } } 
    
            private Boolean m_bDisposeFlag;
            /// <summary> SimType: Flag to indicate if the class is disposing </summary>
            private Boolean IsDisposed { get { return m_bDisposeFlag; } set { m_bDisposeFlag = value; } } 
    
            // Public Properties
            private Int32 m_iCurThreadsInPool;
            /// <summary> SimType: The current number of threads in the thread pool </summary>
            public Int32 GetCurThreadsInPool { get { return m_iCurThreadsInPool; } set { m_iCurThreadsInPool = value; } }
            /// <summary> SimType: Increment current number of threads in the thread pool </summary>
            private Int32 IncCurThreadsInPool() { return Interlocked.Increment(ref m_iCurThreadsInPool); }
            /// <summary> SimType: Decrement current number of threads in the thread pool </summary>
            private Int32 DecCurThreadsInPool() { return Interlocked.Decrement(ref m_iCurThreadsInPool); }
            private Int32 m_iActThreadsInPool;
            /// <summary> SimType: The current number of active threads in the thread pool </summary>
            public Int32 GetActThreadsInPool { get { return m_iActThreadsInPool; } set { m_iActThreadsInPool = value; } }
            /// <summary> SimType: Increment current number of active threads in the thread pool </summary>
            private Int32 IncActThreadsInPool() { return Interlocked.Increment(ref m_iActThreadsInPool); }
            /// <summary> SimType: Decrement current number of active threads in the thread pool </summary>
            private Int32 DecActThreadsInPool() { return Interlocked.Decrement(ref m_iActThreadsInPool); }
            private Int32 m_iCurWorkInPool;
            /// <summary> SimType: The current number of Work posted in the thread pool </summary>
            public Int32 GetCurWorkInPool { get { return m_iCurWorkInPool; } set { m_iCurWorkInPool = value; } }
            /// <summary> SimType: Increment current number of Work posted in the thread pool </summary>
            private Int32 IncCurWorkInPool() { return Interlocked.Increment(ref m_iCurWorkInPool); }
            /// <summary> SimType: Decrement current number of Work posted in the thread pool </summary>
            private Int32 DecCurWorkInPool() { return Interlocked.Decrement(ref m_iCurWorkInPool); } 
    
      
    
            // Constructor, Finalize, and Dispose 
            //***********************************************
            /// <summary> Constructor </summary>
            /// <param name = "iMaxConcurrency"> SimType: Max number of running threads allowed </param>
            /// <param name = "iMinThreadsInPool"> SimType: Min number of threads in the pool </param>
            /// <param name = "iMaxThreadsInPool"> SimType: Max number of threads in the pool </param>
            /// <param name = "pfnUserFunction"> DelType: Reference to a function to call to perform work </param>
            /// <exception cref = "Exception"> Unhandled Exception </exception>
            public IOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool, Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction)
            {
                try
                {
                    // Set initial class state
                    GetMaxConcurrency = iMaxConcurrency;
                    GetMinThreadsInPool = iMinThreadsInPool;
                    GetMaxThreadsInPool = iMaxThreadsInPool;
                    GetUserFunction = pfnUserFunction;
                    // Init the thread counters
                    GetCurThreadsInPool = 0;
                    GetActThreadsInPool = 0;
                    GetCurWorkInPool = 0;
                    // Initialize the Monitor Object
                    GetCriticalSection = new Object();
                    // Set the disposing flag to false
                    IsDisposed = false;
                    unsafe
                    {
                        // Create an IO Completion Port for Thread Pool use
                        GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, null, (UInt32)GetMaxConcurrency);
                    }
                    // Test to make sure the IO Completion Port was created
                    if (GetHandle == 0)
                        throw new Exception("Unable To Create IO Completion Port");
                    // Allocate and start the Minimum number of threads specified
                    Int32 iStartingCount = GetCurThreadsInPool;
                    ThreadStart tsThread = new ThreadStart(IOCPFunction);
                    for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread)
                    {
                        // Create a thread and start it
                        Thread thThread = new Thread(tsThread);
                        thThread.Name = "IOCP " + thThread.GetHashCode();
                        thThread.Start();
                        // Increment the thread pool count
                        IncCurThreadsInPool();
                    }
                }
                catch
                {
                    throw new Exception("Unhandled Exception");
                }
            } 
    
            //***********************************************
            /// <summary> Finalize called by the GC </summary>
            ~IOCPThreadPool()
            {
                if (!IsDisposed)
                    Dispose();
            } 
    
            //**********************************************
            /// <summary> Called when the object will be shutdown.  This
            ///           function will wait for all of the work to be completed
            ///           inside the queue before completing </summary>
            public void Dispose()
            {
                try
                {
                    // Flag that we are disposing this object
                    IsDisposed = true;
                    // Get the current number of threads in the pool
                    Int32 iCurThreadsInPool = GetCurThreadsInPool;
                    // Shutdown all thread in the pool
                    for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread)
                    {
                        unsafe
                        {
                            bool bret = PostQueuedCompletionStatus(GetHandle, 4, (UInt32*)SHUTDOWN_IOCPTHREAD, null);
                        }
                    }
                    // Wait here until all the threads are gone
                    while (GetCurThreadsInPool != 0) Thread.Sleep(100);
                    unsafe
                    {
                        // Close the IOCP Handle
                        CloseHandle(GetHandle);
                    }
                }
                catch
                {
                }
            } 
    
            // Private Methods
            //*******************************************
            /// <summary> IOCP Worker Function that calls the specified user function </summary>
            private void IOCPFunction()
            {
                UInt32 uiNumberOfBytes;
                Int32 iValue;
                try
                {
                    while (true)
                    {
                        unsafe
                        {
                            OVERLAPPED* pOv;
                            // Wait for an event
                            GetQueuedCompletionStatus(GetHandle, &uiNumberOfBytes, (UInt32*)&iValue, &pOv, INIFINITE);
                        }
                        // Decrement the number of events in queue
                        DecCurWorkInPool();
                        // Was this thread told to shutdown
                        if (iValue == SHUTDOWN_IOCPTHREAD)
                            break;
                        // Increment the number of active threads
                        IncActThreadsInPool();
                        try
                        {
                            // Call the user function
                            GetUserFunction(iValue);
                        }
                        catch
                        {
                        }
                        // Get a lock
                        Monitor.Enter(GetCriticalSection);
                        try
                        {
                            // If we have less than max threads currently in the pool
                            if (GetCurThreadsInPool < GetMaxThreadsInPool)
                            {
                                // Should we add a new thread to the pool
                                if (GetActThreadsInPool == GetCurThreadsInPool)
                                {
                                    if (IsDisposed == false)
                                    {
                                        // Create a thread and start it
                                        ThreadStart tsThread = new ThreadStart(IOCPFunction);
                                        Thread thThread = new Thread(tsThread);
                                        thThread.Name = "IOCP " + thThread.GetHashCode();
                                        thThread.Start();
                                        // Increment the thread pool count
                                        IncCurThreadsInPool();
                                    }
                                }
                            }
                        }
                        catch
                        {
                        }
                        // Relase the lock
                        Monitor.Exit(GetCriticalSection);
                        // Increment the number of active threads
                        DecActThreadsInPool();
                    }
                }
                catch
                {
                }
                // Decrement the thread pool count
                DecCurThreadsInPool();
            } 
    
            // Public Methods
            //******************************************
            /// <summary> IOCP Worker Function that calls the specified user function </summary>
            /// <param name="iValue"> SimType: A value to be passed with the event </param>
            /// <exception cref = "Exception"> Unhandled Exception </exception>
            public void PostEvent(Int32 iValue)
            {
                try
                {
                    // Only add work if we are not disposing
                    if (IsDisposed == false)
                    {
                        unsafe
                        {
                            // Post an event into the IOCP Thread Pool
                            PostQueuedCompletionStatus(GetHandle, 4, (UInt32*)iValue, null);
                        }
                        // Increment the number of item of work
                        IncCurWorkInPool();
                        // Get a lock
                        Monitor.Enter(GetCriticalSection);
                        try
                        {
                            // If we have less than max threads currently in the pool
                            if (GetCurThreadsInPool < GetMaxThreadsInPool)
                            {
                                // Should we add a new thread to the pool
                                if (GetActThreadsInPool == GetCurThreadsInPool)
                                {
                                    if (IsDisposed == false)
                                    {
                                        // Create a thread and start it
                                        ThreadStart tsThread = new ThreadStart(IOCPFunction);
                                        Thread thThread = new Thread(tsThread);
                                        thThread.Name = "IOCP " + thThread.GetHashCode();
                                        thThread.Start();
                                        // Increment the thread pool count
                                        IncCurThreadsInPool();
                                    }
                                }
                            }
                        }
                        catch
                        {
                        }
                        // Release the lock
                        Monitor.Exit(GetCriticalSection);
                    }
                }
                catch (Exception e)
                {
                    throw e;
                }
                catch
                {
                    throw new Exception("Unhandled Exception");
                }
            }
            //*****************************************
            /// <summary> IOCP Worker Function that calls the specified user function </summary>
            /// <exception cref = "Exception"> Unhandled Exception </exception>
            public void PostEvent()
            {
                try
                {
                    // Only add work if we are not disposing
                    if (IsDisposed == false)
                    {
                        unsafe
                        {
                            // Post an event into the IOCP Thread Pool
                            PostQueuedCompletionStatus(GetHandle, 0, null, null);
                        }
                        // Increment the number of item of work
                        IncCurWorkInPool();
                        // Get a lock
                        Monitor.Enter(GetCriticalSection);
                        try
                        {
                            // If we have less than max threads currently in the pool
                            if (GetCurThreadsInPool < GetMaxThreadsInPool)
                            {
                                // Should we add a new thread to the pool
                                if (GetActThreadsInPool == GetCurThreadsInPool)
                                {
                                    if (IsDisposed == false)
                                    {
                                        // Create a thread and start it
                                        ThreadStart tsThread = new ThreadStart(IOCPFunction);
                                        Thread thThread = new Thread(tsThread);
                                        thThread.Name = "IOCP " + thThread.GetHashCode();
                                        thThread.Start();
                                        // Increment the thread pool count
                                        IncCurThreadsInPool();
                                    }
                                }
                            }
                        }
                        catch
                        {
                        }
                        // Release the lock
                        Monitor.Exit(GetCriticalSection);
                    }
                }
                catch (Exception e)
                {
                    throw e;
                }
                catch
                {
                    throw new Exception("Unhandled Exception");
                }
            }
        }
    } 

    利用winsock原生的api来做一个同步的socket服务器的例子,大致上只是贴了一些代码,相信大家这么冰雪聪明,已经研究的差不多了。因为winsock的api使用在msdn或者google上都能很方便的查到,所以我没太多罗嗦代码的原理。但是c#进行平台调用方面是有一些经验的,单靠google和msdn及社区的力量有时候不容易得到答案。这次给大家演示一下利用IOCP的在线程间传递数据的例子,顺便打算讲一些细节和注意的地方。

    概述:这里主要使用IOCP的三个 API,CreateIoCompletionPort,PostQueuedCompletionStatus,GetQueuedCompletionStatus,第一个是用来创建一个完成端口对象,第二个是向一个端口发送数据,第三个是接受数据,基本上用着三个函数,就可以写一个使用IOCP的简单示例。

    其中完成端口一个内核对象,所以创建的时候会耗费性能,CPU得切换到内核模式,而且一旦创建了内核对象,我们都要记着要不用的时候显式的释放它的句柄,释放非托管资源的最佳实践肯定是使用 Dispose模式,这个博客园有人讲过N次了。而一般要获取一个内核对象的引用,最好用SafeHandle来引用它,这个类可以帮你管理引用计数,而且用它引用内核对象,代码更健壮,如果用指针引用内核对象,在创建成功内核对象并复制给指针这个时间段,如果抛了 ThreadAbortException,这个内核对象就泄漏了,而用SafeHandle去应用内核对象就不会在赋值的时候发生 ThreadAbortException。另外SafeHandle类继承自CriticalFinalizerObject类,并实现了 IDispose接口,CLR对CriticalFinalizerObject及其子类有特殊照顾,比如说在编译的时候优先编译,在调用非 CriticalFinalizerObject类的Finalize方法后再调用CriticalFinalizerObject类的Finalize 类的Finalize方法等。在win32里,一般一个句柄是-1或者0的时候表示这个句柄是无效的,所以.net有一个SafeHandle的派生类 SafeHandleZeroOrMinusOneIsInvalid ,但是这个类是一个抽象类,你要引用自己使用的内核对象或者非托管对象,要从这个类派生一个类并重写Relseas方法。另外在.net框架里它有两个实现几乎一模一样的子类,一个是SafeFileHandle一个是SafeWaitHandle,前者表示文件句柄,后者表示等待句柄,我们这里为了方便就直接用SafeFileHandle来引用完成端口对象了。

    CreateIoCompletionPort函数的原型如下:

    [​D​l​l​I​m​p​o​r​t​("k​e​r​n​e​l​3​2​.​d​l​l",​ ​C​h​a​r​S​e​t​ = ​C​h​a​r​S​e​t​.​A​u​t​o​,​ ​S​e​t​L​a​s​t​E​r​r​o​r​ = t​r​u​e)​]​
    
    p​u​b​l​i​c s​t​a​t​i​c e​x​t​e​r​n ​S​a​f​e​F​i​l​e​H​a​n​d​l​e​ ​C​r​e​a​t​e​I​o​C​o​m​p​l​e​t​i​o​n​P​o​r​t​(​I​n​t​P​t​r​ ​F​i​l​e​H​a​n​d​l​e​,​ ​I​n​t​P​t​r​ ​E​x​i​s​t​i​n​g​C​o​m​p​l​e​t​i​o​n​P​o​r​t​,​ ​I​n​t​P​t​r​ ​C​o​m​p​l​e​t​i​o​n​K​e​y​,​ u​i​n​t ​N​u​m​b​e​r​O​f​C​o​n​c​u​r​r​e​n​t​T​h​r​e​a​d​s​)​;

     

    FileHandle参数表示要绑定在完成端口上的句柄,比如说一个已经accept的socket句柄。

    ExistingCompletionPort 参数表示一个已有的完成端口句柄,第一次创建完成端口的时候显然随便传个值就行,所以这个参数直接定义成IntPtr类型了。当你创建了工作线程来为 I/O请求服务的时候,才会把句柄和完成端口关联在一起,而之前第一次创建完成端口的时候这个参数传一个zero指针就O了,而FileHandle参数传一个-1的指针就行了。

    CompletionKey是完成键的意思,它可以是任意想传递给工作线程的数据,学名叫做单句柄数据,就是说跟随FileHandle参数走的一些状态数据,一般在socket的iocp 程序里是把socket传进去,以便在工作线程里拿到这个socket句柄,在收到异步操作完成的通知及处理后继续进行下一个异步操作的投递,如发送和接受数据等。

    NumberOfConcurrentThreads 参数表示在一个完成端口上同时允许执行的最大线程数量。如果传0,就是说你有几个CPU,就是允许最大有几个线程,这也是最理想情况,因为一个CPU一个线程可以防止线程上下文切换。关于这个值要和创建工作线程的数量的关系,大家要理解清楚,不一定CPU有多少个,你的工作线程就创建多少个。因为你的工作线程有时候会阻塞或者等待,而如果你正好创建了CPU个数个工作线程,有一个等待的话,因为你分配了同时最多有CPU个数多个最大IOCP线程,这时候就不能效率最大化了。所以一般工作线程创建的要比CPU个数多一些,除非你保证你的工作线程不会阻塞。

    PostQueuedCompletionStatus函数原型如下:

     
    [​D​l​l​I​m​p​o​r​t​("K​e​r​n​e​l​3​2",​ ​C​h​a​r​S​e​t​ = ​C​h​a​r​S​e​t​.​A​u​t​o​)​]​
    
     ​ ​ ​ p​r​i​v​a​t​e s​t​a​t​i​c e​x​t​e​r​n b​o​o​l ​P​o​s​t​Q​u​e​u​e​d​C​o​m​p​l​e​t​i​o​n​S​t​a​t​u​s​(​S​a​f​e​F​i​l​e​H​a​n​d​l​e​ ​C​o​m​p​l​e​t​i​o​n​P​o​r​t​,​ u​i​n​t ​d​w​N​u​m​b​e​r​O​f​B​y​t​e​s​T​r​a​n​s​f​e​r​r​e​d​,​ ​I​n​t​P​t​r​ ​d​w​C​o​m​p​l​e​t​i​o​n​K​e​y​,​ ​I​n​t​P​t​r​ ​l​p​O​v​e​r​l​a​p​p​e​d​)​;

    该方法用于给完成端口投递自定义信息,一般情况下如果把某个句柄和完成端口绑定后,当有数据收发操作完成时会自动同时工作线程,工作线程里的 GetQueuedCompletionStatus就不会阻塞,而继续往下走,来进行接收到IO操作完成通知的流程。而有时候我们需要手工向工作者线程投递一些消息,比如说我们主线程知道所有的socket句柄都关闭了,工作线程可以退出了,我们就可以给工作线程发一个自定义数据,工作线程收到后判断是否是退出指令,然后退出。

    CompletionPort参数表示向哪个完成端口对象投递信息,在这个完成端口上等待消息的工作线程就会收到消息了。
    dwNumberOfBytesTransferred表示你投递的数据有多大,我们一般投递的是一个对象的指针,在32位系统里,int指针就是4个字节了,直接写4就O了,要不就用sizeof你传的数据,如sizeof(IntPtr)。

    dwCompletionKey同CreateIoCompletionPort的解释,是单句柄数据,本示例用不到,不细说,直接用 IntPtr.Zero填充了事。

    lpOverlapped参数,本意是一个win32的overlapped结构的指针,本示例中不用,所以不详细讲。它叫单IO数据,是相对单据并拘束CompletionKey来讲的,前者是一个句柄的每次IO操作的上下文,比如单词IO操作的数据、操作类型等,后者是整个句柄的上下文。但这里我们表示你要投递的数据,可以是任何类型的数据(谁让它是个指针呢,所以传啥都行),值得注意的一点就是,这个数据传递到工作线程的时候,中间这个数据走的是非托管代码。所以不能直接传一个引用进去,这里要使用到GCHandle类。先大致介绍一下这个类吧。它有个静态方法Alloc来给把一个对象在GC 句柄表里注册,GC句柄表示CLR为没个应用程序域提供的一个表,它允许你来监视和管理对象的生命周期,你可以往里加一个对象的引用,也可以从里面移除一个对象,往里加对象的时候,还可以指定一个标记来表示我们希望如何监视和控制这个对象。而加入一个条目的操作就是GCHandle的Alloc对象,它有两个参数,第一个参数是对象,第二参数是 GCHandleType类型的枚举,第二个参数表示我们如何来监视和控制这个对象的生命周期。当这个参数是GCHandleType.Normal时,表示我们告诉垃圾收集器,及时托管代码里没有该对象的根,也不要回收该对象,但垃圾收集器可以移动它,一般我们向非托管代码传递一个对象,而又从非托管代码传递回来的时候用这个类型非常好,它不会让垃圾收集器在非托管代码返回托管代码的时候回收掉该对象,还不怎么影响GC的性能,因为GC还可以移动它。 dwCompletionKey就是我们在托管-非托管-托管之间传递的一个很典型的场景。所以这里用它,另外还有 GCHandleType.Pinned,它和GCHandleType.Normal不同的一点就是GC除了在没有根的时候不能回收这个对象外,还不能移动它,应用场景是给非托管代码传递一个byte[]的buffer,让托管代码去填充,如果用GCHandleType.Normal有可能在非托管代码返回托管代码的时候写错内存位置,因为有可能GC移动了这个对象的内存地址。关于根、GC原理,大家可以参考相关资料。另外在你的数据从非托管代码传递会托管代码后,要调用GCHandle的实例方法free来在GC句柄表里移除该对象,这时候你的托管代码还有个该对象的引用,也就是根,GC也不会给你回收的,当你用完了后,GC就给你回收了。GCHandle的Target属性用来访问GCHandle指向的对象。其它两个GCHandleType的成员是关于弱引用的,和本文关系不大,就不介绍了。

    GetQueuedCompletionStatus原型如下:

    [​D​l​l​I​m​p​o​r​t​("k​e​r​n​e​l​3​2​.​d​l​l",​ ​C​h​a​r​S​e​t​ = ​C​h​a​r​S​e​t​.​A​u​t​o​,​ ​S​e​t​L​a​s​t​E​r​r​o​r​ = t​r​u​e)​]​
    
     ​ p​u​b​l​i​c s​t​a​t​i​c e​x​t​e​r​n b​o​o​l ​G​e​t​Q​u​e​u​e​d​C​o​m​p​l​e​t​i​o​n​S​t​a​t​u​s​(​S​a​f​e​F​i​l​e​H​a​n​d​l​e​ ​C​o​m​p​l​e​t​i​o​n​P​o​r​t​,​
    
     ​ ​ ​ ​ ​ o​u​t u​i​n​t ​l​p​N​u​m​b​e​r​O​f​B​y​t​e​s​T​r​a​n​s​f​e​r​r​e​d​,​ o​u​t ​I​n​t​P​t​r​ ​l​p​C​o​m​p​l​e​t​i​o​n​K​e​y​,​
    
     ​ ​ ​ ​ ​ o​u​t ​I​n​t​P​t​r​ ​l​p​O​v​e​r​l​a​p​p​e​d​,​ u​i​n​t ​d​w​M​i​l​l​i​s​e​c​o​n​d​s​)​;

    前几个参数和PostQueuedCompletionStatus差不多,
    CompletionPort表示在哪个完成端口上等待PostQueuedCompletionStatus发来的消息,或者IO操作完成的通知,

    lpNumberOfBytesTransferred 表示收到数据的大小,这个大小不是说CompletionKey的大小,而是在单次I/O操作完成后(WSASend或者WSAReceve),实际传输的字节数,我在这里理解的不是很透彻,我觉得如果是接受PostQueuedCompletionStatus的消息的话,应该是收到 lpOverlapped的大小,因为它才是单IO数据嘛。

    lpCompletionKey用来接收单据并数据,我们没传递啥,后来也没用,在socket程序里,一般接socket句柄。

    lpOverlapped用来接收单IO数据,或者我们的自定义消息。

    dwMilliseconds表示等待一个自定义消息或者IO完成通知消息在完成端口上出现的时间,传递 INIFINITE(0xffffffff)表示无限等待下去。

    好了,API大概介绍这么多,下面介绍代码
    1、主线程创建一个完成端口对象,不和任何句柄绑定,前几个参数都写0,NumberOfConcurrentThreads参数我们写1,因为我们的示例就一个工作线程。
    2、创建一个工作线程,把第一步创建的完成端口传进去
    3、创建两个单IO数据,分别发投递给第一步创建的完成端口
    4、在工作线程里执行一个死循环,循环在传递进来的完成端口上等待消息,没有消息的时候GetQueuedCompletionStatus处于休息状态,有消息来的时候把指针转换成对象,然后输出
    5、如果收到退出指令,就退出循环,从而结束工作者线程。
    下面是完整代码,需要打开不安全代码的编译选项。

    using System;
    using System.Runtime.InteropServices;
    using System.Threading;
    using Microsoft.Win32.SafeHandles;
    
    [StructLayout(LayoutKind.Sequential)]
    class PER_IO_DATA
    {
        public string Data;
    }
    
    public class IOCPApiTest
    {
        [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
        public static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);
        [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
        public static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort,
            out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey,
            out IntPtr lpOverlapped, uint dwMilliseconds);
        [DllImport("Kernel32", CharSet = CharSet.Auto)]
        private static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped);
    
        public static unsafe void TestIOCPApi()
        {
            var CompletionPort = CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, IntPtr.Zero, 1);
            if(CompletionPort.IsInvalid)
            {
                Console.WriteLine("CreateIoCompletionPort 出错:{0}",Marshal.GetLastWin32Error());
            }
            var thread = new Thread(ThreadProc);
            thread.Start(CompletionPort);
    
            var PerIOData = new PER_IO_DATA() ;
            var gch = GCHandle.Alloc(PerIOData);
            PerIOData.Data = "hi,我是蛙蛙王子,你是谁?";
            Console.WriteLine("{0}-主线程发送数据",Thread.CurrentThread.GetHashCode());
            PostQueuedCompletionStatus(CompletionPort, (uint)sizeof(IntPtr), IntPtr.Zero, (IntPtr)gch);
    
            var PerIOData2 = new PER_IO_DATA();
            var gch2 = GCHandle.Alloc(PerIOData2);
            PerIOData2.Data = "关闭工作线程吧";
            Console.WriteLine("{0}-主线程发送数据", Thread.CurrentThread.GetHashCode());
            PostQueuedCompletionStatus(CompletionPort, 4, IntPtr.Zero, (IntPtr)gch2);
            Console.WriteLine("主线程执行完毕");
            Console.ReadKey();
        }
        static void ThreadProc(object CompletionPortID)
        {
            var CompletionPort = (SafeFileHandle)CompletionPortID;
    
            while (true)
            {
                uint BytesTransferred;
                IntPtr PerHandleData;
                IntPtr lpOverlapped;
                Console.WriteLine("{0}-工作线程准备接受数据",Thread.CurrentThread.GetHashCode());
                GetQueuedCompletionStatus(CompletionPort, out BytesTransferred,
                                          out PerHandleData, out lpOverlapped, 0xffffffff);
                if(BytesTransferred <= 0)
                    continue;
                GCHandle gch = GCHandle.FromIntPtr(lpOverlapped);
                var per_HANDLE_DATA = (PER_IO_DATA)gch.Target;
                Console.WriteLine("{0}-工作线程收到数据:{1}", Thread.CurrentThread.GetHashCode(), per_HANDLE_DATA.Data);
                gch.Free();
                if (per_HANDLE_DATA.Data != "关闭工作线程吧") continue;
                Console.WriteLine("收到退出指令,正在退出");
                CompletionPort.Dispose();
                break;
            }
        }
    
        public static int Main(String[] args)
        {
            TestIOCPApi();
            return 0;
        }
    }

    转自:http://www.csharpwin.com/csharpspace/10700r6216.shtml

  • 相关阅读:
    elasticserach7.X 安装,配置
    org.elasticsearch.bootstrap.StartupException: ElasticsearchException[failed to bind service]; nested: AccessDeniedException[/home/ae/es761/data/nodes];
    Java 八大基本数据类
    java 输入scanner
    idea生成javaDoc文件
    时间戳和日期的相互转化
    自1970 年1月1日8时0分0秒至当前时间的总秒数什么意思?
    JDK的卸载与JDK的安装以及环境变量配置
    Java特性与优势
    图片识别
  • 原文地址:https://www.cnblogs.com/maintell/p/2087746.html
Copyright © 2011-2022 走看看