//AsyncQueue.cs #define c4 //C# 4.0+ //#define c2 namespace Microshaoft { using System; using System.Threading; using System.Diagnostics; using System.Collections.Generic; #if c4 using System.Collections.Concurrent; #endif using Microshaoft; public class AsyncQueue<T> where T : class { public delegate void QueueEventHandler(T element); public event QueueEventHandler OnDequeue; public delegate void QueueLogEventHandler(string logMessage); //public event QueueLogEventHandler OnQueueLog; public event QueueLogEventHandler OnQueueRunningThreadStart; public event QueueLogEventHandler OnQueueRunningThreadEnd; public event QueueLogEventHandler OnDequeueThreadStart; public event QueueLogEventHandler OnDequeueThreadEnd; public event QueueLogEventHandler OnDequeueAllThreadsEnd; public delegate void ExceptionEventHandler(Exception exception); public event ExceptionEventHandler OnException; #if c2 private Queue<T> _queue = new Queue<T>(); #elif c4 private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>(); #endif private object _syncQueueLockObject = new object(); //private object _syncQueueRunningLockObject = new object(); private long _isQueueRunning = 0; private long _concurrentDequeueThreadsCount = 0; //Microshaoft 用于控制并发线程数 private PerformanceCounter _enqueuePerformanceCounter; private PerformanceCounter _dequeuePerformanceCounter; private PerformanceCounter _dequeueProcessedPerformanceCounter; private PerformanceCounter _queueLengthPerformanceCounter; private PerformanceCounter _dequeueThreadStartPerformanceCounter; private PerformanceCounter _dequeueThreadEndPerformanceCounter; private PerformanceCounter _dequeueThreadsCountPerformanceCounter; private PerformanceCounter _queueRunningThreadStartPerformanceCounter; private PerformanceCounter _queueRunningThreadEndPerformanceCounter; private PerformanceCounter _queueRunningThreadsCountPerformanceCounter; private bool _isAttachedPerformanceCounters = false; public void AttachPerformanceCounters(string instanceNamePrefix) { string category = "Microshaoft AsyncConurrentQueue Counters"; string counter = string.Empty; Process process = Process.GetCurrentProcess(); //int processID = 0;//process.Id; string processName = process.ProcessName; //string processStartTime = "";//process.StartTime; string instanceName = string.Empty; instanceName = string.Format ( "{0}-{1}" , instanceNamePrefix , processName //, processID //, processStartTime.ToString("yyyy-MM-dd HH:mm:ss.fff") ); CounterCreationDataCollection ccdc = new CounterCreationDataCollection(); if (PerformanceCounterCategory.Exists(category)) { PerformanceCounterCategory.Delete(category); } CounterCreationData ccd = null; counter = "EnqueueCounter"; ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64); ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64)); counter = "DequeueCounter"; ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64); ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64)); counter = "QueueLengthCounter"; ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64); ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64)); counter = "DequeueProcessedCounter"; ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64); ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64)); counter = "DequeueThreadStartCounter"; ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64); ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64)); counter = "DequeueThreadEndCounter"; ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64); ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64)); counter = "DequeueThreadsCountCounter"; ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64); ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64)); counter = "QueueRunningThreadStartCounter"; ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64); ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64)); counter = "QueueRunningThreadEndCounter"; ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64); ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64)); counter = "QueueRunningThreadsCountCounter"; ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64); ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64)); PerformanceCounterCategory.Create ( category, string.Format("{0} Category Help.", category), PerformanceCounterCategoryType.MultiInstance, ccdc ); counter = "EnqueueCounter"; _enqueuePerformanceCounter = new PerformanceCounter(); _enqueuePerformanceCounter.CategoryName = category; _enqueuePerformanceCounter.CounterName = counter; _enqueuePerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process; _enqueuePerformanceCounter.InstanceName = instanceName; _enqueuePerformanceCounter.ReadOnly = false; _enqueuePerformanceCounter.RawValue = 0; counter = "DequeueCounter"; _dequeuePerformanceCounter = new PerformanceCounter(); _dequeuePerformanceCounter.CategoryName = category; _dequeuePerformanceCounter.CounterName = counter; _dequeuePerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process; _dequeuePerformanceCounter.InstanceName = instanceName; _dequeuePerformanceCounter.ReadOnly = false; _dequeuePerformanceCounter.RawValue = 0; counter = "DequeueProcessedCounter"; _dequeueProcessedPerformanceCounter = new PerformanceCounter(); _dequeueProcessedPerformanceCounter.CategoryName = category; _dequeueProcessedPerformanceCounter.CounterName = counter; _dequeueProcessedPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process; _dequeueProcessedPerformanceCounter.InstanceName = instanceName; _dequeueProcessedPerformanceCounter.ReadOnly = false; _dequeueProcessedPerformanceCounter.RawValue = 0; counter = "QueueLengthCounter"; _queueLengthPerformanceCounter = new PerformanceCounter(); _queueLengthPerformanceCounter.CategoryName = category; _queueLengthPerformanceCounter.CounterName = counter; _queueLengthPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process; _queueLengthPerformanceCounter.InstanceName = instanceName; _queueLengthPerformanceCounter.ReadOnly = false; _queueLengthPerformanceCounter.RawValue = 0; counter = "DequeueThreadStartCounter"; _dequeueThreadStartPerformanceCounter = new PerformanceCounter(); _dequeueThreadStartPerformanceCounter.CategoryName = category; _dequeueThreadStartPerformanceCounter.CounterName = counter; _dequeueThreadStartPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process; _dequeueThreadStartPerformanceCounter.InstanceName = instanceName; _dequeueThreadStartPerformanceCounter.ReadOnly = false; _dequeueThreadStartPerformanceCounter.RawValue = 0; counter = "DequeueThreadEndCounter"; _dequeueThreadEndPerformanceCounter = new PerformanceCounter(); _dequeueThreadEndPerformanceCounter.CategoryName = category; _dequeueThreadEndPerformanceCounter.CounterName = counter; _dequeueThreadEndPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process; _dequeueThreadEndPerformanceCounter.InstanceName = instanceName; _dequeueThreadEndPerformanceCounter.ReadOnly = false; _dequeueThreadEndPerformanceCounter.RawValue = 0; counter = "DequeueThreadsCountCounter"; _dequeueThreadsCountPerformanceCounter = new PerformanceCounter(); _dequeueThreadsCountPerformanceCounter.CategoryName = category; _dequeueThreadsCountPerformanceCounter.CounterName = counter; _dequeueThreadsCountPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process; _dequeueThreadsCountPerformanceCounter.InstanceName = instanceName; _dequeueThreadsCountPerformanceCounter.ReadOnly = false; _dequeueThreadsCountPerformanceCounter.RawValue = 0; counter = "QueueRunningThreadStartCounter"; _queueRunningThreadStartPerformanceCounter = new PerformanceCounter(); _queueRunningThreadStartPerformanceCounter.CategoryName = category; _queueRunningThreadStartPerformanceCounter.CounterName = counter; _queueRunningThreadStartPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process; _queueRunningThreadStartPerformanceCounter.InstanceName = instanceName; _queueRunningThreadStartPerformanceCounter.ReadOnly = false; _queueRunningThreadStartPerformanceCounter.RawValue = 0; counter = "QueueRunningThreadEndCounter"; _queueRunningThreadEndPerformanceCounter = new PerformanceCounter(); _queueRunningThreadEndPerformanceCounter.CategoryName = category; _queueRunningThreadEndPerformanceCounter.CounterName = counter; _queueRunningThreadEndPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process; _queueRunningThreadEndPerformanceCounter.InstanceName = instanceName; _queueRunningThreadEndPerformanceCounter.ReadOnly = false; _queueRunningThreadEndPerformanceCounter.RawValue = 0; counter = "QueueRunningThreadsCountCounter"; _queueRunningThreadsCountPerformanceCounter = new PerformanceCounter(); _queueRunningThreadsCountPerformanceCounter.CategoryName = category; _queueRunningThreadsCountPerformanceCounter.CounterName = counter; _queueRunningThreadsCountPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process; _queueRunningThreadsCountPerformanceCounter.InstanceName = instanceName; _queueRunningThreadsCountPerformanceCounter.ReadOnly = false; _queueRunningThreadsCountPerformanceCounter.RawValue = 0; _isAttachedPerformanceCounters = true; } private int _maxConcurrentThreadsCount = 1; //Microshaoft 允许并发出列处理线程数为 1 public int MaxConcurrentThreadsCount { set { _maxConcurrentThreadsCount = value; } get { return _maxConcurrentThreadsCount; } } //Microshaoft 服务启动后可立即开启新的线程调用此方法(死循环) private void QueueRun() //Microshaoft ThreadStart { if (Interlocked.Read(ref _concurrentDequeueThreadsCount) < _maxConcurrentThreadsCount) { if (Interlocked.CompareExchange(ref _isQueueRunning, 0, 1) == 0) { ThreadStart ts = new ThreadStart(QueueRunThreadProcess); Thread t = new Thread(ts); t.Name = "QueueRunningThreadProcess"; t.Start(); } } } public int Count { get { return _queue.Count; } } public long ConcurrentThreadsCount { get { return _concurrentDequeueThreadsCount; } } private void QueueRunThreadProcess() { if (_isAttachedPerformanceCounters) { _queueRunningThreadStartPerformanceCounter.Increment(); _queueRunningThreadsCountPerformanceCounter.Increment(); } if (OnQueueRunningThreadStart != null) { OnQueueRunningThreadStart ( string.Format ( "{0} Threads Count {1},Queue Count {2},Current Thread: {3}({4}) at {5}" , "Queue Running Start ..." , _concurrentDequeueThreadsCount , _queue.Count , Thread.CurrentThread.Name , Thread.CurrentThread.ManagedThreadId , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff") ) ); } #if c2 while ((_queue.Count > 0)) //Microshaoft 死循环 #elif c4 while (!_queue.IsEmpty) //Microshaoft 死循环 #endif { int threadID = -1; { int r = (int)Interlocked.Read(ref _concurrentDequeueThreadsCount); if (r < _maxConcurrentThreadsCount) { //if (_queue.Count > 0) { r = (int)Interlocked.Increment(ref _concurrentDequeueThreadsCount); threadID = (int)_concurrentDequeueThreadsCount; //ThreadProcessState tps = new ThreadProcessState(); //tps.element = element; //tps.Sender = this; Thread t = new Thread(new ThreadStart(DequeueThreadProcess)); t.Name = string.Format("ConcurrentDequeueProcessThread[{0}]", threadID); t.Start(); } /// else /// { /// break; /// } } else { break; } } } //Interlocked.CompareExchange(ref _queueRuning, 0, 1); if (OnQueueRunningThreadEnd != null) { int r = (int)Interlocked.Read(ref _concurrentDequeueThreadsCount); OnQueueRunningThreadEnd ( string.Format ( "{0} Threads Count {1}, Queue Count {2}, Current Thread: {3}({4}) at {5}" , "Queue Running Stop ..." , r , _queue.Count , Thread.CurrentThread.Name , Thread.CurrentThread.ManagedThreadId , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff") ) ); } if (_isAttachedPerformanceCounters) { _queueRunningThreadEndPerformanceCounter.Increment(); _queueRunningThreadsCountPerformanceCounter.Decrement(); } Interlocked.Exchange(ref _isQueueRunning, 0); } public void Enqueue(T element) { try { #if c2 lock (_syncQueueLockObject) //还算并发吗? #endif { _queue.Enqueue(element); } if (_isAttachedPerformanceCounters) { _enqueuePerformanceCounter.Increment(); _queueLengthPerformanceCounter.Increment(); } } catch (Exception e) { if (OnException != null) { OnException(e); } } //int r = Interlocked.CompareExchange(ref _queueRuning, 1, 0)) //if (r == 1) //{ QueueRun(); //} } private void DequeueThreadProcess() { if (_isAttachedPerformanceCounters) { _dequeueThreadStartPerformanceCounter.Increment(); _dequeueThreadsCountPerformanceCounter.Increment(); } if (OnDequeueThreadStart != null) { int r = (int)Interlocked.Read(ref _concurrentDequeueThreadsCount); OnDequeueThreadStart ( string.Format ( "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}" , "Threads ++ !" , r , _queue.Count , Thread.CurrentThread.Name , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff") ) ); } bool queueWasNotEmpty = false; try { #if c2 while (true) #elif c4 while (!_queue.IsEmpty) #endif { T element = null; #if c2 lock (_syncQueueLockObject) { if (_queue.Count > 0) { element = _queue.Dequeue(); } else { //避免QueueRun 死循环 break; } } #elif c4 if (_queue.TryDequeue(out element)) { #elif c2 if (element != null) { #endif if (!queueWasNotEmpty) { queueWasNotEmpty = true; } if (_isAttachedPerformanceCounters) { _dequeuePerformanceCounter.Increment(); _queueLengthPerformanceCounter.Decrement(); } if (OnDequeue != null) { OnDequeue(element); } if (_isAttachedPerformanceCounters) { _dequeueProcessedPerformanceCounter.Increment(); } #if c2 } #elif c4 } } #endif } catch (Exception e) { if (OnException != null) { OnException(e); } } finally { int r = (int)Interlocked.Decrement(ref _concurrentDequeueThreadsCount); if (OnDequeueThreadEnd != null) { OnDequeueThreadEnd ( string.Format ( "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}" , "Threads--" , r , _queue.Count , Thread.CurrentThread.Name , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff") ) ); } if (r == 0) { if (OnDequeueAllThreadsEnd != null) { OnDequeueAllThreadsEnd ( string.Format ( "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}" , "All Threads End" , r , _queue.Count , Thread.CurrentThread.Name , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff") ) ); } } if (_isAttachedPerformanceCounters) { _dequeueThreadEndPerformanceCounter.Increment(); _dequeueThreadsCountPerformanceCounter.Decrement(); } if (queueWasNotEmpty) { QueueRun(); //死循环??? } } } } } namespace Microshaoft { using System; using System.Diagnostics; public static class PerformanceCounterHelper { public static CounterCreationData GetCounterCreationData(string counterName, PerformanceCounterType performanceCounterType) { CounterCreationData ccd = new CounterCreationData(); ccd.CounterName = counterName; ccd.CounterHelp = string.Format("{0} Help", counterName); ccd.CounterType = performanceCounterType; return ccd; } } } //======================================================================================================================================== // Server.cs namespace Microshaoft.WCF.Services { using System; using System.Collections.Generic; using System.Linq; using System.Text; using Contracts.Services; using Contracts.Services.Entitys; using Microshaoft.WCF.Server; public class AsyncQueueWcfNetNetTcpService : IQueueAble { public void Enqueue(Item item) { WindowsServiceHost.AsyncQueueProcessor.Enqueue(item); } } } namespace Microshaoft.WCF.Server { using System; using System.ComponentModel; using System.ServiceProcess; using System.Configuration.Install; using System.Security.Principal; using System.ServiceModel; using System.ServiceModel.Description; using Microshaoft; using Microshaoft.Win32; using Microshaoft.WCF.Services; using Contracts.Services; using Contracts.Services.Entitys; public class WindowsServiceHost : ServiceBase { ///// <summary> /// 应用程序的主入口点。 /// </summary> //[STAThread] public static readonly string serviceName = "AsyncConcurrentQueueWcfNetTcpService"; private static AsyncQueueProcessor _asyncQueueProcessor; public static AsyncQueueProcessor AsyncQueueProcessor { get { return _asyncQueueProcessor; } } static void Main(string[] args) { //Microshaoft //Microshaoft TODO: 在此处添加代码以启动应用程序 //Microshaoft WindowsServiceHost service = new WindowsServiceHost(); int l = 0; bool needFreeConsole = false; if (args != null) { l = args.Length; } if (l > 0) { if (args[0].ToLower() == "/console") { needFreeConsole = true; NativeMethods.AllocConsole(); Console.Title = "Server ..."; Console.WriteLine("Alloc Console ..."); Console.WriteLine("Current User Identity: {0}", WindowsIdentity.GetCurrent().Name); Console.WriteLine(".Net Framework version: {0}", Environment.Version.ToString()); Console.Title = string.Format ( "{0} Host Server" , WindowsServiceHost.serviceName ); //不能以服务运行 Console.WriteLine("Console"); service.OnStart(null); Console.ReadLine(); return; } } Console.WriteLine("Service"); ServiceBase.Run(service); if (needFreeConsole) { Console.WriteLine("Free Console ..."); NativeMethods.FreeConsole(); } } public static ServiceHost _serviceHost; public WindowsServiceHost() { CanPauseAndContinue = true; ServiceName = WindowsServiceHost.serviceName; } protected override void OnStart(string[] args) { Console.WriteLine(Environment.Version.ToString()); AsyncQueue<Item> queue = new AsyncQueue<Item>(); queue.AttachPerformanceCounters("Q1"); _asyncQueueProcessor = new AsyncQueueProcessor(queue); _serviceHost = new ServiceHost(typeof(AsyncQueueWcfNetNetTcpService)); string address = "{0}://localhost{1}/servicemodelsamples/service"; string netTcpUrl = string.Format ( address , "net.tcp" , ":9000" ); string httpUrl = string.Format ( address , "http" , ":8080" ); NetTcpBinding binding = new NetTcpBinding(); //binding.Security.Transport.MsmqAuthenticationMode = MsmqAuthenticationMode.None; //binding.Security.Mode = MsmqIntegrationSecurityMode.None; _serviceHost.AddServiceEndpoint ( typeof(IQueueAble) , binding , netTcpUrl ); ServiceMetadataBehavior smb = _serviceHost.Description.Behaviors.Find<ServiceMetadataBehavior>(); //发布元数据 if (smb == null) { smb = new ServiceMetadataBehavior(); } //smb.HttpGetEnabled = true; //smb.HttpGetUrl = new Uri(httpUrl); //smb.MetadataExporter.PolicyVersion = PolicyVersion.Policy15; _serviceHost.Description.Behaviors.Add(smb); // Add MEX endpoint _serviceHost.AddServiceEndpoint ( ServiceMetadataBehavior.MexContractName, MetadataExchangeBindings.CreateMexTcpBinding(), netTcpUrl + "/mex" ); ServiceThrottlingBehavior stb = new ServiceThrottlingBehavior(); stb.MaxConcurrentCalls = 1000; stb.MaxConcurrentInstances = 1000; stb.MaxConcurrentSessions = 1000; _serviceHost.Description.Behaviors.Add(stb); _serviceHost.Open(); Console.WriteLine("Wcf Service Host Opened ..."); } } [RunInstallerAttribute(true)] public class ProjectInstaller : Installer { private ServiceInstaller serviceInstaller; private ServiceProcessInstaller processInstaller; public ProjectInstaller() { processInstaller = new ServiceProcessInstaller(); serviceInstaller = new ServiceInstaller(); // Service will run under system account processInstaller.Account = ServiceAccount.LocalSystem; // Service will have Start Type of Manual serviceInstaller.StartType = ServiceStartMode.Manual; serviceInstaller.ServiceName = WindowsServiceHost.serviceName; Installers.Add(serviceInstaller); Installers.Add(processInstaller); } } } namespace Microshaoft.Win32 { using System.Runtime.InteropServices; public class NativeMethods { /// <summary> /// 启动控制台 /// </summary> /// <returns></returns> [DllImport("kernel32.dll")] public static extern bool AllocConsole(); /// <summary> /// 释放控制台 /// </summary> /// <returns></returns> [DllImport("kernel32.dll")] public static extern bool FreeConsole(); } } namespace Microshaoft { using System; using Contracts.Services.Entitys; public class AsyncQueueProcessor { private AsyncQueue<Item> _queue; public AsyncQueue<Item> Queue { get { return _queue; } } public AsyncQueueProcessor(AsyncQueue<Item> queue) { _queue = queue; _queue.OnDequeue += new AsyncQueue<Item>.QueueEventHandler(_queue_OnDequeue); //_queue.OnDequeueThreadStart += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog); _queue.OnDequeueAllThreadsEnd += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog); _queue.OnDequeueThreadEnd += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog); //_queue.OnQueueRunningThreadStart += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog); //_queue.OnQueueRunningThreadEnd += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog); _queue.OnException += new AsyncQueue<Item>.ExceptionEventHandler(_queue_OnException); _queue.MaxConcurrentThreadsCount = 64; } public void Enqueue(Item item) { _queue.Enqueue(item); } void _queue_OnQueueLog(string logMessage) { Console.WriteLine(logMessage); } void _queue_OnDequeue(Item item) { DateTime DequeueBeginTime = DateTime.Now; /// SqlConnection connection = null; /// try /// { /// connection = new SqlConnection(item.ConnectionString); /// SqlCommand command = new SqlCommand(item.SqlCommandText, connection); /// command.CommandType = CommandType.Text; /// connection.Open(); /// command.ExecuteNonQuery(); /// } /// catch (Exception e) /// { /// Console.WriteLine("Exception on Dequeue Process:{0}{1}", "\r\n", e.ToString()); /// } /// finally /// { /// connection.Close(); /// connection.Dispose(); /// connection = null; /// } DateTime DequeueEndTime = DateTime.Now; Console.WriteLine ( "QueueRemainCount {0}, Enqueue {1}, Dequeue {2},[{3}], End {4},[{5}],[{6}]" , _queue.Count , item.EnqueueTime , DequeueBeginTime , (DequeueBeginTime.Ticks - item.EnqueueTime.Ticks) / 10000 / 1000 , DequeueEndTime , (DequeueEndTime.Ticks - DequeueBeginTime.Ticks) / 10000 / 1000 , _queue.ConcurrentThreadsCount ); } void _queue_OnException(Exception e) { Console.WriteLine(e.ToString()); } } } //================================================================================================================ // Share.cs namespace Contracts.Services { using System.ServiceModel; using Contracts.Services.Entitys; [ServiceContract] public interface IQueueAble { [OperationContract] void Enqueue(Item element); } } namespace Contracts.Services.Entitys { using System; using System.Runtime.Serialization; [DataContract] public class Item { private DateTime _EnqueueTime; [DataMember] public DateTime EnqueueTime { get { return _EnqueueTime; } set { _EnqueueTime = value; } } private string _sql; [DataMember] public string SqlCommandText { get { return _sql; } set { _sql = value; } } private string _connectionString; [DataMember] public string ConnectionString { get { return _connectionString; } set { _connectionString = value; } } } } //======================================================================================= //======================================================================================================= ///// Client.cs //------------------------------------------------------------------------------ // <auto-generated> // 此代码由工具生成。 // 运行时版本:4.0.30319.1 // // 对此文件的更改可能会导致不正确的行为,并且如果 // 重新生成代码,这些更改将会丢失。 // </auto-generated> //------------------------------------------------------------------------------ // "D:\Microsoft.SDKs\Windows\v7.1\Bin\NETFX 4.0 Tools\SvcUtil.exe" Share.dll // "D:\Microsoft.SDKs\Windows\v7.1\Bin\NETFX 4.0 Tools\SvcUtil.exe" *.wsdl *.xsd // "D:\Microsoft.SDKs\Windows\v7.1\Bin\NETFX 4.0 Tools\SvcUtil.exe" net.tcp://localhost:9000/servicemodelsamples/service/mex namespace Contracts.Services.Entitys { using System.Runtime.Serialization; [System.Diagnostics.DebuggerStepThroughAttribute()] [System.CodeDom.Compiler.GeneratedCodeAttribute("System.Runtime.Serialization", "4.0.0.0")] [System.Runtime.Serialization.DataContractAttribute(Name="Item", Namespace="http://schemas.datacontract.org/2004/07/Contracts.Services.Entitys")] public partial class Item : object, System.Runtime.Serialization.IExtensibleDataObject { private System.Runtime.Serialization.ExtensionDataObject extensionDataField; private string ConnectionStringField; private System.DateTime EnqueueTimeField; private string SqlCommandTextField; public System.Runtime.Serialization.ExtensionDataObject ExtensionData { get { return this.extensionDataField; } set { this.extensionDataField = value; } } [System.Runtime.Serialization.DataMemberAttribute()] public string ConnectionString { get { return this.ConnectionStringField; } set { this.ConnectionStringField = value; } } [System.Runtime.Serialization.DataMemberAttribute()] public System.DateTime EnqueueTime { get { return this.EnqueueTimeField; } set { this.EnqueueTimeField = value; } } [System.Runtime.Serialization.DataMemberAttribute()] public string SqlCommandText { get { return this.SqlCommandTextField; } set { this.SqlCommandTextField = value; } } } } namespace Proxy { [System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "4.0.0.0")] [System.ServiceModel.ServiceContractAttribute(ConfigurationName="IQueueAble")] public interface IQueueAble { [System.ServiceModel.OperationContractAttribute(Action="http://tempuri.org/IQueueAble/Enqueue", ReplyAction="http://tempuri.org/IQueueAble/EnqueueResponse")] void Enqueue(Contracts.Services.Entitys.Item element); } [System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "4.0.0.0")] public interface IQueueAbleChannel : IQueueAble, System.ServiceModel.IClientChannel { } [System.Diagnostics.DebuggerStepThroughAttribute()] [System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "4.0.0.0")] public partial class QueueAbleClient : System.ServiceModel.ClientBase<IQueueAble>, IQueueAble { public QueueAbleClient() { } public QueueAbleClient(string endpointConfigurationName) : base(endpointConfigurationName) { } public QueueAbleClient(string endpointConfigurationName, string remoteAddress) : base(endpointConfigurationName, remoteAddress) { } public QueueAbleClient(string endpointConfigurationName, System.ServiceModel.EndpointAddress remoteAddress) : base(endpointConfigurationName, remoteAddress) { } public QueueAbleClient(System.ServiceModel.Channels.Binding binding, System.ServiceModel.EndpointAddress remoteAddress) : base(binding, remoteAddress) { } public void Enqueue(Contracts.Services.Entitys.Item element) { base.Channel.Enqueue(element); } } } namespace Microshaoft.WCF.Client { using System; using System.Collections; using System.Threading; using System.ServiceModel; using Proxy; using Contracts.Services.Entitys; public class Class1 { static QueueAbleClient _client; public static void Main() { Console.Title = "Client"; Console.WriteLine(Environment.Version.ToString()); Class1 a = new Class1(); a.Run(); } public void Run() { string address = @"net.tcp://localhost:9000/servicemodelsamples/service"; NetTcpBinding binding = new NetTcpBinding(); _client = new QueueAbleClient(binding, new EndpointAddress(address)); for (int i = 0; i < 20; i++) { Thread x = new Thread(new ThreadStart(ThreadProcess)); x.Start(); } } public void ThreadProcess() { for (int i = 0; i < 1000; i++) { Item x = new Item(); DateTime EnqueueTime = DateTime.Now; x.EnqueueTime = EnqueueTime; x.SqlCommandText = @" --========================== declare @ varchar(10) set @ = 'aaa' exec zsp_test @ --========================== "; x.ConnectionString = ""; _client.Enqueue(x); Console.WriteLine ( "Enqueue: {0},[{1}]" , EnqueueTime , (DateTime.Now.Ticks - EnqueueTime.Ticks)/10000 ); } } } } //============================================================================================================================