ThreadPool 类
提供一个线程池,该线程池可用于执行任务、发送工作项、处理异步 I/O、代表其他线程等待以及处理计时器。
命名空间: System.Threading
程序集: mscorlib(位于 mscorlib.dll)版本信息
.NET Framework
自 1.1 起可用
可移植类库
在 可移植 .NET 平台 中受支持
Silverlight
自 2.0 起可用
Windows Phone Silverlight
自 7.0 起可用
一边说着要用技术安身立命,一边感叹自己的野生属性。好吧,知之为知之,不知就不知。我"以为"是这样这样那样那样,这样说真是没意思。现在的疑惑有以下几点:
- 1、线程池内部有几个工作线程?
- 2、使用线程池的正确姿势(场景和控制)?
- 3、有必要自己封装一个不?
开始看MSDN文档
[HostProtectionAttribute(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)] public static class ThreadPool
方法
伪代码
#region 程序集 mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 // C:Program Files (x86)Reference AssembliesMicrosoftFramework.NETFrameworkv4.0mscorlib.dll #endregion using System.Runtime.InteropServices; using System.Security; namespace System.Threading { // // 摘要: // 提供一个线程池,该线程池可用于发送工作项、处理异步 I/O、代表其他线程等待以及处理计时器。 public static class ThreadPool { // // 摘要: // 将操作系统句柄绑定到 System.Threading.ThreadPool。 // // 参数: // osHandle: // 保存操作系统句柄的 System.Runtime.InteropServices.SafeHandle。在非托管端必须为重叠 I/O 打开该句柄。 // // 返回结果: // 如果绑定了句柄,则为 true;否则为 false。 // // 异常: // T:System.ArgumentNullException: // osHandle 为 null。 [SecuritySafeCritical] public static bool BindHandle(SafeHandle osHandle); // // 摘要: // 将操作系统句柄绑定到 System.Threading.ThreadPool。 // // 参数: // osHandle: // 持有句柄的 System.IntPtr。在非托管端必须为重叠 I/O 打开该句柄。 // // 返回结果: // 如果绑定了句柄,则为 true;否则为 false。 // // 异常: // T:System.Security.SecurityException: // 调用方没有所要求的权限。 [Obsolete("ThreadPool.BindHandle(IntPtr) has been deprecated. Please use ThreadPool.BindHandle(SafeHandle) instead.", false)] [SecuritySafeCritical] public static bool BindHandle(IntPtr osHandle); // // 摘要: // 检索由 System.Threading.ThreadPool.GetMaxThreads(System.Int32@,System.Int32@) 方法返回的最大线程池线程数和当前活动线程数之间的差值。 // // 参数: // workerThreads: // 可用辅助线程的数目。 // // completionPortThreads: // 可用异步 I/O 线程的数目。 [SecuritySafeCritical] public static void GetAvailableThreads(out int workerThreads, out int completionPortThreads); // // 摘要: // 检索可以同时处于活动状态的线程池请求的数目。所有大于此数目的请求将保持排队状态,直到线程池线程变为可用。 // // 参数: // workerThreads: // 线程池中辅助线程的最大数目。 // // completionPortThreads: // 线程池中异步 I/O 线程的最大数目。 [SecuritySafeCritical] public static void GetMaxThreads(out int workerThreads, out int completionPortThreads); // // 摘要: // 检索线程池在新请求预测中维护的空闲线程数。 // // 参数: // workerThreads: // 当前由线程池维护的空闲辅助线程的最小数目。 // // completionPortThreads: // 当前由线程池维护的空闲异步 I/O 线程的最小数目。 [SecuritySafeCritical] public static void GetMinThreads(out int workerThreads, out int completionPortThreads); // // 摘要: // 将方法排入队列以便执行。此方法在有线程池线程变得可用时执行。 // // 参数: // callBack: // 一个 System.Threading.WaitCallback,表示要执行的方法。 // // 返回结果: // 如果此方法成功排队,则为 true;如果未能将该工作项排队,则引发 System.NotSupportedException。 // // 异常: // T:System.ArgumentNullException: // callBack 为 null。 // // T:System.NotSupportedException: // 承载公共语言运行时 (CLR) 的宿主不支持此操作。 [SecuritySafeCritical] public static bool QueueUserWorkItem(WaitCallback callBack); // // 摘要: // 将方法排入队列以便执行,并指定包含该方法所用数据的对象。此方法在有线程池线程变得可用时执行。 // // 参数: // callBack: // System.Threading.WaitCallback,它表示要执行的方法。 // // state: // 包含方法所用数据的对象。 // // 返回结果: // 如果此方法成功排队,则为 true;如果未能将该工作项排队,则引发 System.NotSupportedException。 // // 异常: // T:System.NotSupportedException: // 承载公共语言运行时 (CLR) 的宿主不支持此操作。 // // T:System.ArgumentNullException: // callBack 为 null。 [SecuritySafeCritical] public static bool QueueUserWorkItem(WaitCallback callBack, object state); // // 摘要: // 注册一个等待 System.Threading.WaitHandle 的委托,并指定一个 System.TimeSpan 值来表示超时时间。 // // 参数: // waitObject: // 要注册的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 参数终止时调用的 System.Threading.WaitOrTimerCallback 委托。 // // state: // 传递给委托的对象。 // // timeout: // System.TimeSpan 表示的超时时间。如果 timeout 为 0(零),则函数将测试对象的状态并立即返回。如果 timeout 为 -1,则函数的超时间隔永远不过期。 // // executeOnlyOnce: // 如果为 true,表示在调用了委托后,线程将不再在 waitObject 参数上等待;如果为 false,表示每次完成等待操作后都重置计时器,直到注销等待。 // // 返回结果: // 封装本机句柄的 System.Threading.RegisteredWaitHandle。 // // 异常: // T:System.ArgumentOutOfRangeException: // timeout 参数小于 -1。 // // T:System.NotSupportedException: // timeout 参数大于 System.Int32.MaxValue。 [SecuritySafeCritical] public static RegisteredWaitHandle RegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, TimeSpan timeout, bool executeOnlyOnce); // // 摘要: // 注册一个等待 System.Threading.WaitHandle 的委托,并指定一个 64 位有符号整数来表示超时值(以毫秒为单位)。 // // 参数: // waitObject: // 要注册的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 参数终止时调用的 System.Threading.WaitOrTimerCallback 委托。 // // state: // 传递给委托的对象。 // // millisecondsTimeOutInterval: // 以毫秒为单位的超时。如果 millisecondsTimeOutInterval 参数为 0(零),函数将测试对象的状态并立即返回。如果 millisecondsTimeOutInterval // 为 -1,则函数的超时间隔永远不过期。 // // executeOnlyOnce: // 如果为 true,表示在调用了委托后,线程将不再在 waitObject 参数上等待;如果为 false,表示每次完成等待操作后都重置计时器,直到注销等待。 // // 返回结果: // 封装本机句柄的 System.Threading.RegisteredWaitHandle。 // // 异常: // T:System.ArgumentOutOfRangeException: // millisecondsTimeOutInterval 参数小于 -1。 [SecuritySafeCritical] public static RegisteredWaitHandle RegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, long millisecondsTimeOutInterval, bool executeOnlyOnce); // // 摘要: // 注册一个等待 System.Threading.WaitHandle 的委托,并指定一个 32 位有符号整数来表示超时值(以毫秒为单位)。 // // 参数: // waitObject: // 要注册的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 参数终止时调用的 System.Threading.WaitOrTimerCallback 委托。 // // state: // 传递给委托的对象。 // // millisecondsTimeOutInterval: // 以毫秒为单位的超时。如果 millisecondsTimeOutInterval 参数为 0(零),函数将测试对象的状态并立即返回。如果 millisecondsTimeOutInterval // 为 -1,则函数的超时间隔永远不过期。 // // executeOnlyOnce: // 如果为 true,表示在调用了委托后,线程将不再在 waitObject 参数上等待;如果为 false,表示每次完成等待操作后都重置计时器,直到注销等待。 // // 返回结果: // 封装本机句柄的 System.Threading.RegisteredWaitHandle。 // // 异常: // T:System.ArgumentOutOfRangeException: // millisecondsTimeOutInterval 参数小于 -1。 [SecuritySafeCritical] public static RegisteredWaitHandle RegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, int millisecondsTimeOutInterval, bool executeOnlyOnce); // // 摘要: // 指定表示超时(以毫秒为单位)的 32 位无符号整数,注册一个委托等待 System.Threading.WaitHandle。 // // 参数: // waitObject: // 要注册的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 参数终止时调用的 System.Threading.WaitOrTimerCallback 委托。 // // state: // 传递给委托的对象。 // // millisecondsTimeOutInterval: // 以毫秒为单位的超时。如果 millisecondsTimeOutInterval 参数为 0(零),函数将测试对象的状态并立即返回。如果 millisecondsTimeOutInterval // 为 -1,则函数的超时间隔永远不过期。 // // executeOnlyOnce: // 如果为 true,表示在调用了委托后,线程将不再在 waitObject 参数上等待;如果为 false,表示每次完成等待操作后都重置计时器,直到注销等待。 // // 返回结果: // System.Threading.RegisteredWaitHandle,可用于取消已注册的等待操作。 // // 异常: // T:System.ArgumentOutOfRangeException: // millisecondsTimeOutInterval 参数小于 -1。 [CLSCompliant(false)] [SecuritySafeCritical] public static RegisteredWaitHandle RegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, uint millisecondsTimeOutInterval, bool executeOnlyOnce); // // 摘要: // 设置可以同时处于活动状态的线程池的请求数目。所有大于此数目的请求将保持排队状态,直到线程池线程变为可用。 // // 参数: // workerThreads: // 线程池中辅助线程的最大数目。 // // completionPortThreads: // 线程池中异步 I/O 线程的最大数目。 // // 返回结果: // 如果更改成功,则为 true;否则为 false。 [SecuritySafeCritical] public static bool SetMaxThreads(int workerThreads, int completionPortThreads); // // 摘要: // 设置线程池在新请求预测中维护的空闲线程数。 // // 参数: // workerThreads: // 要由线程池维护的新的最小空闲辅助线程数。 // // completionPortThreads: // 要由线程池维护的新的最小空闲异步 I/O 线程数。 // // 返回结果: // 如果更改成功,则为 true;否则为 false。 [SecuritySafeCritical] public static bool SetMinThreads(int workerThreads, int completionPortThreads); // // 摘要: // 将重叠的 I/O 操作排队以便执行。 // // 参数: // overlapped: // 要排队的 System.Threading.NativeOverlapped 结构。 // // 返回结果: // 如果成功地将此操作排队到 I/O 完成端口,则为 true;否则为 false。 [CLSCompliant(false)] [SecurityCritical] public static bool UnsafeQueueNativeOverlapped(NativeOverlapped* overlapped); // // 摘要: // 将指定的委托排队到线程池,但不会将调用堆栈传播到辅助线程。 // // 参数: // callBack: // 一个 System.Threading.WaitCallback,表示当线程池中的线程选择工作项时调用的委托。 // // state: // 在接受线程池服务时传递给委托的对象。 // // 返回结果: // 如果方法成功,则为 true;如果未能将该工作项排队,则引发 System.OutOfMemoryException。 // // 异常: // T:System.Security.SecurityException: // 调用方没有所要求的权限。 // // T:System.ApplicationException: // 遇到了内存不足的情况。 // // T:System.OutOfMemoryException: // 未能将该工作项排队。 // // T:System.ArgumentNullException: // callBack 为 null。 [SecurityCritical] public static bool UnsafeQueueUserWorkItem(WaitCallback callBack, object state); // // 摘要: // 注册一个等待 System.Threading.WaitHandle 的委托,并指定一个 64 位有符号整数来表示超时值(以毫秒为单位)。不将调用堆栈传播到辅助线程。 // // 参数: // waitObject: // 要注册的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 参数终止时调用的委托。 // // state: // 传递给委托的对象。 // // millisecondsTimeOutInterval: // 以毫秒为单位的超时。如果 millisecondsTimeOutInterval 参数为 0(零),函数将测试对象的状态并立即返回。如果 millisecondsTimeOutInterval // 为 -1,则函数的超时间隔永远不过期。 // // executeOnlyOnce: // 如果为 true,表示在调用了委托后,线程将不再在 waitObject 参数上等待;如果为 false,表示每次完成等待操作后都重置计时器,直到注销等待。 // // 返回结果: // System.Threading.RegisteredWaitHandle 对象,可用于取消已注册的等待操作。 // // 异常: // T:System.ArgumentOutOfRangeException: // millisecondsTimeOutInterval 参数小于 -1。 // // T:System.Security.SecurityException: // 调用方没有所要求的权限。 [SecurityCritical] public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, long millisecondsTimeOutInterval, bool executeOnlyOnce); // // 摘要: // 指定表示超时(以毫秒为单位)的 32 位无符号整数,注册一个委托等待 System.Threading.WaitHandle。不将调用堆栈传播到辅助线程。 // // 参数: // waitObject: // 要注册的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 参数终止时调用的委托。 // // state: // 传递给委托的对象。 // // millisecondsTimeOutInterval: // 以毫秒为单位的超时。如果 millisecondsTimeOutInterval 参数为 0(零),函数将测试对象的状态并立即返回。如果 millisecondsTimeOutInterval // 为 -1,则函数的超时间隔永远不过期。 // // executeOnlyOnce: // 如果为 true,表示在调用了委托后,线程将不再在 waitObject 参数上等待;如果为 false,表示每次完成等待操作后都重置计时器,直到注销等待。 // // 返回结果: // System.Threading.RegisteredWaitHandle 对象,可用于取消已注册的等待操作。 // // 异常: // T:System.Security.SecurityException: // 调用方没有所要求的权限。 [CLSCompliant(false)] [SecurityCritical] public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, uint millisecondsTimeOutInterval, bool executeOnlyOnce); // // 摘要: // 注册一个等待 System.Threading.WaitHandle 的委托,并指定一个 System.TimeSpan 值来表示超时时间。不将调用堆栈传播到辅助线程。 // // 参数: // waitObject: // 要注册的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 参数终止时调用的委托。 // // state: // 传递给委托的对象。 // // timeout: // System.TimeSpan 表示的超时时间。如果 timeout 为 0(零),则函数将测试对象的状态并立即返回。如果 timeout 为 -1,则函数的超时间隔永远不过期。 // // executeOnlyOnce: // 如果为 true,表示在调用了委托后,线程将不再在 waitObject 参数上等待;如果为 false,表示每次完成等待操作后都重置计时器,直到注销等待。 // // 返回结果: // System.Threading.RegisteredWaitHandle 对象,可用于取消已注册的等待操作。 // // 异常: // T:System.ArgumentOutOfRangeException: // timeout 参数小于 -1。 // // T:System.NotSupportedException: // timeout 参数大于 System.Int32.MaxValue。 // // T:System.Security.SecurityException: // 调用方没有所要求的权限。 [SecurityCritical] public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, TimeSpan timeout, bool executeOnlyOnce); // // 摘要: // 注册一个等待 System.Threading.WaitHandle 的委托,并使用一个 32 位带符号整数来表示超时时间(以毫秒为单位)。不将调用堆栈传播到辅助线程。 // // 参数: // waitObject: // 要注册的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 参数终止时调用的委托。 // // state: // 传递给委托的对象。 // // millisecondsTimeOutInterval: // 以毫秒为单位的超时。如果 millisecondsTimeOutInterval 参数为 0(零),函数将测试对象的状态并立即返回。如果 millisecondsTimeOutInterval // 为 -1,则函数的超时间隔永远不过期。 // // executeOnlyOnce: // 如果为 true,表示在调用了委托后,线程将不再在 waitObject 参数上等待;如果为 false,表示每次完成等待操作后都重置计时器,直到注销等待。 // // 返回结果: // System.Threading.RegisteredWaitHandle 对象,可用于取消已注册的等待操作。 // // 异常: // T:System.ArgumentOutOfRangeException: // millisecondsTimeOutInterval 参数小于 -1。 // // T:System.Security.SecurityException: // 调用方没有所要求的权限。 [SecurityCritical] public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, int millisecondsTimeOutInterval, bool executeOnlyOnce); } }
“承载公共语言运行时 (CLR) 的宿主不支持此操作”出现了好多次吧。试试反编译mscorlib.dll,看看有什么发现。
[SecurityCritical, SuppressUnmanagedCodeSecurity] [DllImport("QCall", CharSet = CharSet.Unicode)] internal static extern bool AdjustThreadsInPool(uint QueueLength); [CLSCompliant(false), SecurityCritical] public unsafe static bool UnsafeQueueNativeOverlapped(NativeOverlapped* overlapped) { } [SecurityCritical, SuppressUnmanagedCodeSecurity] [DllImport("QCall", CharSet = CharSet.Unicode)] internal static extern bool ShouldUseNewWorkerPool(); [SecurityCritical, SuppressUnmanagedCodeSecurity] [DllImport("QCall", CharSet = CharSet.Unicode)] internal static extern bool CompleteThreadPoolRequest(uint QueueLength); [SecurityCritical] [MethodImpl(MethodImplOptions.InternalCall)] internal static extern bool NotifyWorkItemComplete(); [SecurityCritical] [MethodImpl(MethodImplOptions.InternalCall)] internal static extern void ReportThreadStatus(bool isWorking); [SecuritySafeCritical] internal static void NotifyWorkItemProgress() { } [SecurityCritical] [MethodImpl(MethodImplOptions.InternalCall)] internal static extern void NotifyWorkItemProgressNative(); [SecurityCritical, SuppressUnmanagedCodeSecurity] [DllImport("QCall", CharSet = CharSet.Unicode)] internal static extern bool ShouldReturnToVm(); [ReliabilityContract(Consistency.WillNotCorruptState, Cer.Success), SecurityCritical, SuppressUnmanagedCodeSecurity] [DllImport("QCall", CharSet = CharSet.Unicode)] internal static extern bool SetAppDomainRequestActive(); [ReliabilityContract(Consistency.WillNotCorruptState, Cer.Success), SecurityCritical, SuppressUnmanagedCodeSecurity] [DllImport("QCall", CharSet = CharSet.Unicode)] internal static extern void ClearAppDomainRequestActive(); [SecurityCritical] [MethodImpl(MethodImplOptions.InternalCall)] internal static extern bool IsThreadPoolHosted(); [ReliabilityContract(Consistency.WillNotCorruptState, Cer.Success), SecurityCritical] [MethodImpl(MethodImplOptions.InternalCall)] internal static extern void SetNativeTpEvent();
[MethodImplAttribute(MethodImplOptions.Synchronized)]标签应用到实例方法,相当于对当前实例加锁 lock(this)
[MethodImplAttribute(MethodImplOptions.Synchronized)]标签应用到静态方法,相当于当前类型加锁。如 WithDraw 是静态方法,就相当于 lock (typeof(Account))
接下来我们再来看看SynchronizationAttribute类:
MSDN对SynchronizationAttribute的解释为:为当前上下文和所有共享同一实例的上下文强制一个同步域。
SynchronizationAttribute 的类:一个在 System.Runtime.Remoting.Contexts 命名空间中,另一个在 System.EnterpriseServices 命名空间中。System.EnterpriseServices.SynchronizationAttribute 类仅支持同步调用,并且只可与接受服务的组件一起使用。System.Runtime.Remoting.Contexts.SynchronizationAttribute 同时支持同步调用和异步调用,并且只可与上下文绑定对象一起使用。
毛发现都没有,看来还是功力尚浅,资质平庸啊。发现了一堆空方法,什么鬼?都是没有具体实现的。看来先猜上一猜了:SetMaxThreads和SetMinThreads说明可以设置工作线程的数量,线程的内部使用了完成端口(没文化的我理解为ConcurrentQueue,大白话就是说是一个线程安全的队列)。那么完成端口编程模式号称是windows系统最优秀的编程模型,会不会非常智能呢?是不是不调用SetMaxThreads和SetMinThreads操作系统就根据你机器的CPU核心数来自己设定最大值呢?
回到前面的几个问题,偶还是搞不清楚啊。谁能告诉我,什么是什么,什么是什么...咦,这兄台唱上了吧。^_^
反编译大神实现的CoreThreadPool
public class CoreThreadPool : IDisposable { /// <summary> /// 队列元素申明 /// </summary> [StructLayout(LayoutKind.Sequential)] private class PoolData { /// <summary> /// 外部要求放入队列的数据 /// </summary> public object Data; /// <summary> /// 需要执行的命令(Exit/Command(自定义)) /// </summary> public CoreThreadPool.PoolCommand Command; public PoolData() { this.Command = CoreThreadPool.PoolCommand.Exit; } public PoolData(object data) { this.Data = data; this.Command = CoreThreadPool.PoolCommand.Command; } public PoolData(CoreThreadPool.PoolCommand cmd) { this.Command = cmd; } } protected enum PoolCommand { Command, Exit } protected SafeFileHandle complatePort; /// <summary> /// 线程池主线程 /// </summary> protected Thread thread; protected volatile bool isOpened; [method: CompilerGenerated] [CompilerGenerated] public event Action<object> Exceute; [method: CompilerGenerated] [CompilerGenerated] public event Action<object> ExitExceute; /// <summary> /// 线程池是否正在运行 /// </summary> public bool IsOpened { get { return this.isOpened; } set { this.isOpened = value; } } [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)] private static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads); [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)] private static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort, out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey, out IntPtr lpOverlapped, uint dwMilliseconds); [DllImport("Kernel32", CharSet = CharSet.Auto)] private static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped); /// <summary> /// 启动线程池的主线程 /// </summary> public void Start() { isOpened = true; if (thread != null) { throw new Exception("线程池已经是启动状态!"); } complatePort = CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, IntPtr.Zero, 0u); if (complatePort.IsInvalid) { throw new Exception(string.Format("创建IOCP出错!原因是:{0}", Marshal.GetLastWin32Error().ToString())); } thread = new Thread(new ParameterizedThreadStart(this.Run)); thread.Start(complatePort); } /// <summary> /// 外部提交数据对象到队列 /// </summary> /// <param name="data"></param> public void Post(object data) { PostData(new CoreThreadPool.PoolData(data)); } /// <summary> /// 线程池主线程执行逻辑 /// </summary> /// <param name="CompletionPortID"></param> private void Run(object CompletionPortID) { SafeFileHandle completionPort = (SafeFileHandle)CompletionPortID; while (IsOpened) { uint num; IntPtr intPtr; IntPtr value; //从队列里取出最前面的对象 CoreThreadPool.GetQueuedCompletionStatus(completionPort, out num, out intPtr, out value, 4294967295u); if (num > 0u) { GCHandle gCHandle = GCHandle.FromIntPtr(value); CoreThreadPool.PoolData poolData = (CoreThreadPool.PoolData)gCHandle.Target; gCHandle.Free(); if (poolData.Command != CoreThreadPool.PoolCommand.Command) { IsOpened = false; break; } RaiseExecute(poolData.Data); } } RaiseExitExecute("线程池已经停止。"); isOpened = false; thread = null; } /// <summary> /// 触发Execute事件 /// </summary> /// <param name="data"></param> private void RaiseExecute(object data) { Exceute?.Invoke(data); } /// <summary> /// 触发ExitExecute事件 /// </summary> /// <param name="data"></param> private void RaiseExitExecute(object data) { ExitExceute?.Invoke(data); } /// <summary> /// 结束线程池主线程 /// </summary> public void Stop() { PostData(new PoolData(PoolCommand.Exit)); IsOpened = false; } /// <summary> /// 内部提交数据到线程池队列中 /// </summary> /// <param name="data"></param> private void PostData(PoolData data) { if (complatePort.IsClosed) { return; } GCHandle value = GCHandle.Alloc(data); PostQueuedCompletionStatus(complatePort, (uint)IntPtr.Size, IntPtr.Zero, GCHandle.ToIntPtr(value)); } public void Dispose() { if (this.thread != null && this.thread.ThreadState != System.Threading.ThreadState.Stopped) { this.Stop(); } } }
经过几个小时测试,最终得出一个结论:ThreadPool的性能已经上了天。微软威武!不用再去重复造轮子了。这个线程池的使用场景是生产线上的高速生产线的采集器上用的,毫秒级别的,但也是1秒几十个产品而已。
测试代码
class Program { private static Stopwatch sw = new Stopwatch(); static void Main(string[] args) { int id = Thread.CurrentThread.ManagedThreadId; Console.WriteLine("CurrentThread.ManagedThreadId是:" + id.ToString()); ThreadPool.QueueUserWorkItem(Pool_Exceute, null); Action task = () => { Thread thread = new Thread(() => { while (true) { object queueObj; queueObj = (object)DateTime.Now.Ticks; sw.Reset(); sw.Start(); if (ThreadPool.QueueUserWorkItem(Pool_Exceute, queueObj)) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine(DateTime.Now + "->成功抛入队列,抛入的委托对象的参数是:" + queueObj.ToString()); } Thread.Sleep(1); } }); thread.Start(); }; Parallel.Invoke(task, task, task, task, task); Console.ReadLine(); } private static void Pool_Exceute(object obj) { if (obj != null) { Console.ResetColor(); int id = Thread.CurrentThread.ManagedThreadId; Console.WriteLine("CurrentThread.ManagedThreadId是:" + id.ToString()); Console.WriteLine(DateTime.Now + "->委托对象是:" + obj.ToString()); int workThread_Count = 0; int id_IOCP = 0; ThreadPool.GetMaxThreads(out workThread_Count, out id_IOCP); Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine(DateTime.Now + string.Format("->线程池最大工作线程数是{0}.当前完成端口的ID是{1}.", workThread_Count, id_IOCP)); ThreadPool.GetAvailableThreads(out workThread_Count, out id_IOCP); Console.WriteLine(DateTime.Now + string.Format("->线程池当前可用的工作线程数是{0}.当前完成端口的ID是{1}.", workThread_Count, id_IOCP)); sw.Stop(); Console.ForegroundColor = ConsoleColor.Yellow; Console.WriteLine("从入队到出队耗时:" + sw.ElapsedMilliseconds); sw.Reset(); } } }