zoukankan      html  css  js  c++  java
  • [C#]I/O完成端口的类定义和测试实例

    整理者:郑昀@UltraPower
    日期:2005-04-13

    从William Kennedy那里整理过来的,不同之处在于他自己定义了一个Overlapped,而我们这里直接使用
    System.Threading.NativeOverlapped:。

    附一段我以前的Win32下的IOCP文档,如果您了解IOCP也可以直接跳过看后面的C#测试示范:

    我们采用的是I/O Complete Port(以下简称IOCP)处理机制。

    简单的讲,当服务应用程序初始化时,它应该先创建一个I/O CP。我们在请求到来后,将得到的数据打包用PostQueuedCompletionStatus发送到IOCP中。这时需要创建一些个线程(7个线程/CPU,再多就没有意义了)来处理发送到IOCP端口的消息。实现步骤大致如下:

    1     先在主线程中调用CreateIoCompletionPort创建IOCP

    CreateIoCompletionPort的前三个参数只在把设备同Complete Port相关联时才有用。

    此时我们只需传递INVALID_HANDLE_VALUE,NULL0即可。

    第四个参数告诉端口同时能运行的最多线程数,这里设置为0,表示默认为当前计算机的CPU数目。

    2     我们的ThreadFun线程函数执行一些初始化之后,将进入一个循环,该循环会在服务进程终止时才结束。

    在循环中,调用GetQueuedCompletionStatus,这样就把当前线程的ID放入一个等待线程队列中,I/O CP内核对象就总能知道哪个线程在等待处理完成的I/O请求。

    如果在IDLE_THREAD_TIMEOUT规定的时间内I/O CP上还没有出现一个Completion Packet,则转入下一次循环。在这里我们设置的IDLE_THREAD_TIMEOUT1秒。

     

    当端口的I/O完成队列中出现一项时,完成端口就唤醒等待线程队列中的这个线程,该线程将得到完成的I/O项中的信息:       传输的字节数、完成键和OVERLAPPED结构的地址。

     

    在我们的程序中可以用智能指针或者BSTR或者int来接受这个OVERLAPPED结构的地址的值,从而得到消息;然后在这个线程中处理消息。

    GetQueuedCompletionStatus的第一个参数hCompletionPort指出了要监视哪一个端口,这里我们传送先前从CreateIoCompletionPort返回的端口句柄。

     

    需要注意的是:

    第一,   线程池的数目是有限制的,和CPU数目有关系。

    第二,   IOCP是一种较为完美的睡眠/唤醒 线程机制;线程当前没有任务要处理时,就进入睡眠状态,从而不占用CPU资源,直到被内核唤醒;

    第三,   最近一次刚执行完的线程,下次任务来的时候还会唤醒它;所以有可能比较少被调用的线程以后被调用的几率也少。

     


    测试代码:

    using System;
    using System.Threading;  // Included for the Thread.Sleep call
    using Continuum.Threading;
    using System.Runtime.InteropServices;

    namespace IOCPDemo
    {
        
    //=============================================================================
        /// <summary> Sample class for the threading class </summary>
        public class UtilThreadingSample
        
    {
            
    //*****************************************************************************   
            /// <summary> Test Method </summary>
            static void Main()
            
    {
                
    // Create the MSSQL IOCP Thread Pool
                IOCPThreadPool pThreadPool = new IOCPThreadPool(01020new IOCPThreadPool.USER_FUNCTION(IOCPThreadFunction));
          
                
    //for(int i =1;i<10000;i++)
                {
                    pThreadPool.PostEvent(
    1234);
                }

          
                Thread.Sleep(
    100);
          
                pThreadPool.Dispose();
            }

        
            
    //********************************************************************
            /// <summary> Function to be called by the IOCP thread pool.  Called when
            
    ///           a command is posted for processing by the SocketManager </summary>
            
    /// <param name="iValue"> The value provided by the thread posting the event </param>

            static public void IOCPThreadFunction(int iValue)
            
    {
                
    try
                
    {
                    Console.WriteLine(
    "Value: {0}", iValue.ToString());
                    Thread.Sleep(
    3000);
                }

          
                
    catch (Exception pException)
                
    {
                    Console.WriteLine(pException.Message);
                }

            }

        }


    }



    类代码:
    using System;
    using System.Threading;
    using System.Runtime.InteropServices;

    namespace IOCPThreading
    {
        [StructLayout(LayoutKind.Sequential, CharSet
    =CharSet.Auto)]

        
    public sealed class IOCPThreadPool
        
    {
            [DllImport(
    "Kernel32", CharSet=CharSet.Auto)]
            
    private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);

            [DllImport(
    "Kernel32", CharSet=CharSet.Auto)]
            
    private unsafe static extern Boolean CloseHandle(UInt32 hObject);

            [DllImport(
    "Kernel32", CharSet=CharSet.Auto)]
            
    private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped* pOverlapped);

            [DllImport(
    "Kernel32", CharSet=CharSet.Auto)]
            
    private unsafe static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped** ppOverlapped, UInt32 uiMilliseconds);

            
    private const UInt32 INVALID_HANDLE_VALUE = 0xffffffff;
            
    private const UInt32 INIFINITE = 0xffffffff;
            
    private const Int32 SHUTDOWN_IOCPTHREAD = 0x7fffffff;
            
    public delegate void USER_FUNCTION(int iValue);
            
    private UInt32 m_hHandle;
            
    private UInt32 GetHandle get return m_hHandle; } set { m_hHandle = value; } }

            
    private Int32 m_uiMaxConcurrency;

            
    private Int32 GetMaxConcurrency get return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } }


            
    private Int32 m_iMinThreadsInPool;

            
    private Int32 GetMinThreadsInPool get return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } }

            
    private Int32 m_iMaxThreadsInPool;

            
    private Int32 GetMaxThreadsInPool get return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } }


            
    private Object m_pCriticalSection;

            
    private Object GetCriticalSection get return m_pCriticalSection; } set { m_pCriticalSection = value; } }


            
    private USER_FUNCTION m_pfnUserFunction;

            
    private USER_FUNCTION GetUserFunction get return m_pfnUserFunction; } set { m_pfnUserFunction = value; } }


            
    private Boolean m_bDisposeFlag;

            
    /// <summary> SimType: Flag to indicate if the class is disposing </summary>

            
    private Boolean IsDisposed get return m_bDisposeFlag; } set { m_bDisposeFlag = value; } }

            
    private Int32 m_iCurThreadsInPool;

            
    /// <summary> SimType: The current number of threads in the thread pool </summary>

            
    public Int32 GetCurThreadsInPool get return m_iCurThreadsInPool; } set { m_iCurThreadsInPool = value; } }

            
    /// <summary> SimType: Increment current number of threads in the thread pool </summary>

            
    private Int32 IncCurThreadsInPool() return Interlocked.Increment(ref m_iCurThreadsInPool); }

            
    /// <summary> SimType: Decrement current number of threads in the thread pool </summary>

            
    private Int32 DecCurThreadsInPool() return Interlocked.Decrement(ref m_iCurThreadsInPool); }


            
    private Int32 m_iActThreadsInPool;

            
    /// <summary> SimType: The current number of active threads in the thread pool </summary>

            
    public Int32 GetActThreadsInPool get return m_iActThreadsInPool; } set { m_iActThreadsInPool = value; } }

            
    /// <summary> SimType: Increment current number of active threads in the thread pool </summary>

            
    private Int32 IncActThreadsInPool() return Interlocked.Increment(ref m_iActThreadsInPool); }

            
    /// <summary> SimType: Decrement current number of active threads in the thread pool </summary>

            
    private Int32 DecActThreadsInPool() return Interlocked.Decrement(ref m_iActThreadsInPool); }


            
    private Int32 m_iCurWorkInPool;

            
    /// <summary> SimType: The current number of Work posted in the thread pool </summary>

            
    public Int32 GetCurWorkInPool get return m_iCurWorkInPool; } set { m_iCurWorkInPool = value; } }

            
    /// <summary> SimType: Increment current number of Work posted in the thread pool </summary>

            
    private Int32 IncCurWorkInPool() return Interlocked.Increment(ref m_iCurWorkInPool); }

            
    /// <summary> SimType: Decrement current number of Work posted in the thread pool </summary>

            
    private Int32 DecCurWorkInPool() return Interlocked.Decrement(ref m_iCurWorkInPool); }

            
    public IOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool, Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction)
            
    {
                
    try
                
    {
                    
    // Set initial class state

                    GetMaxConcurrency   
    = iMaxConcurrency;

                    GetMinThreadsInPool 
    = iMinThreadsInPool;

                    GetMaxThreadsInPool 
    = iMaxThreadsInPool;

                    GetUserFunction     
    = pfnUserFunction;


                    
    // Init the thread counters

                    GetCurThreadsInPool 
    = 0;

                    GetActThreadsInPool 
    = 0;

                    GetCurWorkInPool    
    = 0;


                    
    // Initialize the Monitor Object

                    GetCriticalSection 
    = new Object();


                    
    // Set the disposing flag to false

                    IsDisposed 
    = false;


                    
    unsafe
                    
    {

                        
    // Create an IO Completion Port for Thread Pool use
                        GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0null, (UInt32) GetMaxConcurrency);

                    }



                    
    // Test to make sure the IO Completion Port was created

                    
    if (GetHandle == 0)

                        
    throw new Exception("Unable To Create IO Completion Port");


                    
    // Allocate and start the Minimum number of threads specified

                    Int32 iStartingCount 
    = GetCurThreadsInPool;

            

                    ThreadStart tsThread 
    = new ThreadStart(IOCPFunction);

                    
    for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread)
                    
    {

                        
    // Create a thread and start it

                        Thread thThread 
    = new Thread(tsThread);

                        thThread.Name 
    = "IOCP " + thThread.GetHashCode();

                        thThread.Start();


                        
    // Increment the thread pool count

                        IncCurThreadsInPool();

                    }


                }



                
    catch
                
    {

                    
    throw new Exception("Unhandled Exception");

                }


            }


            
    ~IOCPThreadPool()
            
    {

                
    if (!IsDisposed)

                    Dispose();

            }


            
    public void Dispose()
            
    {

                
    try
                
    {

                    
    // Flag that we are disposing this object

                    IsDisposed 
    = true;


                    
    // Get the current number of threads in the pool

                    Int32 iCurThreadsInPool 
    = GetCurThreadsInPool;


                    
    // Shutdown all thread in the pool

                    
    for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread)
                    
    {
                        
    unsafe
                        
    {

                            
    bool bret = PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) SHUTDOWN_IOCPTHREAD, null);

                        }


                    }



                    
    // Wait here until all the threads are gone

                    
    while (GetCurThreadsInPool != 0) Thread.Sleep(100);


                    
    unsafe
                    
    {

                        
    // Close the IOCP Handle
                        CloseHandle(GetHandle);

                    }


                }


                
    catch
                
    {

                }


            }

            
    private void IOCPFunction()
            
    {
                UInt32 uiNumberOfBytes;

                Int32  iValue;

                
    try
                
    {
                    
    while (true)
                    
    {

                        
    unsafe
                        
    {

                            System.Threading.NativeOverlapped
    * pOv;


                            
    // Wait for an event

                            GetQueuedCompletionStatus(GetHandle, 
    &uiNumberOfBytes, (UInt32*&iValue, &pOv, INIFINITE);
                        }


                        
    // Decrement the number of events in queue

                        DecCurWorkInPool();


                        
    // Was this thread told to shutdown

                        
    if (iValue == SHUTDOWN_IOCPTHREAD)

                            
    break;


                        
    // Increment the number of active threads

                        IncActThreadsInPool();


                        
    try
                        
    {
                            
    // Call the user function
                            GetUserFunction(iValue);

                        }


                        
    catch(Exception ex)
                        
    {
                            
    throw ex;
                        }



                        
    // Get a lock

                        Monitor.Enter(GetCriticalSection);


                        
    try
                        
    {

                            
    // If we have less than max threads currently in the pool

                            
    if (GetCurThreadsInPool < GetMaxThreadsInPool)
                            
    {

                                
    // Should we add a new thread to the pool

                                
    if (GetActThreadsInPool == GetCurThreadsInPool)
                                
    {

                                    
    if (IsDisposed == false)
                                    
    {

                                        
    // Create a thread and start it

                                        ThreadStart tsThread 
    = new ThreadStart(IOCPFunction);

                                        Thread thThread 
    = new Thread(tsThread);

                                        thThread.Name 
    = "IOCP " + thThread.GetHashCode();

                                        thThread.Start();


                                        
    // Increment the thread pool count

                                        IncCurThreadsInPool();

                                    }


                                }


                            }


                        }


                        
    catch
                        
    {

                        }



                        
    // Relase the lock

                        Monitor.Exit(GetCriticalSection);


                        
    // Increment the number of active threads

                        DecActThreadsInPool();

                    }


                }



                
    catch(Exception ex)
                
    {
                    
    string str=ex.Message;

                }



                
    // Decrement the thread pool count

                DecCurThreadsInPool();

            }


            
    //public void PostEvent(Int32 iValue
            public void PostEvent(int iValue)
            
    {

                
    try
                
    {

                    
    // Only add work if we are not disposing

                    
    if (IsDisposed == false)
                    
    {

                        
    unsafe
                        
    {

                            
    // Post an event into the IOCP Thread Pool

                            PostQueuedCompletionStatus(GetHandle, 
    4, (UInt32*) iValue, null);

                        }



                        
    // Increment the number of item of work

                        IncCurWorkInPool();


                        
    // Get a lock

                        Monitor.Enter(GetCriticalSection);


                        
    try
                        
    {

                            
    // If we have less than max threads currently in the pool

                            
    if (GetCurThreadsInPool < GetMaxThreadsInPool)
                            
    {

                                
    // Should we add a new thread to the pool

                                
    if (GetActThreadsInPool == GetCurThreadsInPool)
                                
    {

                                    
    if (IsDisposed == false)
                                    
    {

                                        
    // Create a thread and start it

                                        ThreadStart tsThread 
    = new ThreadStart(IOCPFunction);

                                        Thread thThread 
    = new Thread(tsThread);

                                        thThread.Name 
    = "IOCP " + thThread.GetHashCode();

                                        thThread.Start();


                                        
    // Increment the thread pool count

                                        IncCurThreadsInPool();

                                    }


                                }


                            }


                        }



                        
    catch
                        
    {

                        }



                        
    // Release the lock

                        Monitor.Exit(GetCriticalSection);

                    }


                }



                
    catch (Exception e)
                
    {

                    
    throw e;

                }



                
    catch
                
    {

                    
    throw new Exception("Unhandled Exception");

                }


            }
      

            
    public void PostEvent()
            
    {

                
    try
                
    {

                    
    // Only add work if we are not disposing

                    
    if (IsDisposed == false)
                    
    {

                        
    unsafe
                        
    {

                            
    // Post an event into the IOCP Thread Pool

                            PostQueuedCompletionStatus(GetHandle, 
    0nullnull);

                        }



                        
    // Increment the number of item of work

                        IncCurWorkInPool();


                        
    // Get a lock

                        Monitor.Enter(GetCriticalSection);


                        
    try

                        
    {

                            
    // If we have less than max threads currently in the pool

                            
    if (GetCurThreadsInPool < GetMaxThreadsInPool)

                            
    {

                                
    // Should we add a new thread to the pool

                                
    if (GetActThreadsInPool == GetCurThreadsInPool)

                                
    {

                                    
    if (IsDisposed == false)

                                    
    {

                                        
    // Create a thread and start it

                                        ThreadStart tsThread 
    = new ThreadStart(IOCPFunction);

                                        Thread thThread 
    = new Thread(tsThread);

                                        thThread.Name 
    = "IOCP " + thThread.GetHashCode();

                                        thThread.Start();


                                        
    // Increment the thread pool count

                                        IncCurThreadsInPool();

                                    }


                                }


                            }


                        }



                        
    catch

                        
    {

                        }



                        
    // Release the lock

                        Monitor.Exit(GetCriticalSection);

                    }


                }


                
    catch

                
    {

                    
    throw new Exception("Unhandled Exception");

                }


            }


        }


    }

  • 相关阅读:
    全面监测网站信息
    linux 将Mysql的一张表导出至Excel格式文件
    渗透测试人员发现用户可无限输入密码次数,超过5次未锁定用户,存在暴力破解风险。解放方案:限制每个输入的用户名(不管存不存在该账户)登陆失败次数不超过5次,超过则锁定该用户
    mysql linux下数据库导出 常用操作
    find php.ini 和 php的执行目录 bin目录
    解决:The “https://packagist.laravel-china.org/packages.json” file could not be downloaded
    如何上传代码至GitHub
    7. Jmeter-逻辑控制器介绍与使用
    19、Linux命令对服务器内存进行监控
    20、Linux命令对服务器磁盘进行监控
  • 原文地址:https://www.cnblogs.com/zhengyun_ustc/p/136689.html
Copyright © 2011-2022 走看看