namespace Test { using System; using System.Threading; using System.Threading.Tasks; using Microshaoft; class Program { static void Main() { var q = new ConcurrentAsyncQueue<int>(); q.AttachPerformanceCounters("new"); Random random = new Random(); q.OnDequeue += new ConcurrentAsyncQueue<int>.QueueEventHandler ( (x) => { int sleep = random.Next(0, 4) * 500; Thread.Sleep(sleep); //Console.WriteLine(x); } ); q.OnException += new ConcurrentAsyncQueue<int>.ExceptionEventHandler ( (x) => { Console.WriteLine(x.ToString()); } ); Console.WriteLine("begin ..."); //q.StartAdd(10); string r = string.Empty; while ((r = Console.ReadLine()) != "q") { int i; if (int.TryParse(r, out i)) { Console.WriteLine("Parallel Enqueue {0} begin ...", i); new Thread ( new ParameterizedThreadStart ( (x) => { Parallel.For ( 0 , i , (xx) => { q.Enqueue(xx); } ); Console.WriteLine("Parallel Enqueue {0} end ...", i); } ) ).Start(); } else if (r.ToLower() == "stop") { q.StartStop(10); } else if (r.ToLower() == "add") { q.StartAdd(20); } //else if (r.ToLower() == "count") //{ // q.EnablePerformanceCountersCount = true; //} //else if (r.ToLower() == "uncount") //{ // q.EnablePerformanceCountersCount = false; //} else { Console.WriteLine("please input Number!"); } } } } } namespace Microshaoft { using System; using System.Threading; using System.Diagnostics; using System.Linq; using System.Collections.Generic; using System.Collections.Concurrent; using System.Threading.Tasks; using System.Reflection; using Microshaoft; public class ConcurrentAsyncQueue<T> { public delegate void QueueEventHandler(T element); public event QueueEventHandler OnDequeue; public delegate void QueueLogEventHandler(string logMessage); public QueueLogEventHandler OnQueueLog , OnDequeueThreadStart , OnDequeueThreadEnd; public delegate void ExceptionEventHandler(Exception exception); public event ExceptionEventHandler OnException; private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>(); private ConcurrentQueue<Action> _callbackProcessBreaksActions; private long _concurrentDequeueThreadsCount = 0; //Microshaoft 用于控制并发线程数 private ConcurrentQueue<ThreadProcessor> _dequeueThreadsProcessorsPool; private int _dequeueIdleSleepSeconds = 10; public int DequeueIdleSleepSeconds { set { _dequeueIdleSleepSeconds = value; } get { return _dequeueIdleSleepSeconds; } } private PerformanceCounter _enqueuePerformanceCounter; public PerformanceCounter EnqueuePerformanceCounter { private set { Interlocked.Exchange<PerformanceCounter>(ref _enqueuePerformanceCounter, value); } get { return _enqueuePerformanceCounter; } } private PerformanceCounter _dequeuePerformanceCounter; public PerformanceCounter DequeuePerformanceCounter { private set { Interlocked.Exchange<PerformanceCounter>(ref _dequeuePerformanceCounter, value); } get { return _dequeuePerformanceCounter; } } private PerformanceCounter _dequeueProcessedPerformanceCounter; public PerformanceCounter DequeueProcessedPerformanceCounter { private set { Interlocked.Exchange<PerformanceCounter>(ref _dequeueProcessedPerformanceCounter, value); } get { return _dequeueProcessedPerformanceCounter; } } private PerformanceCounter _queueLengthPerformanceCounter; public PerformanceCounter QueueLengthPerformanceCounter { private set { Interlocked.Exchange<PerformanceCounter>(ref _queueLengthPerformanceCounter, value); } get { return _queueLengthPerformanceCounter; } } private PerformanceCounter _dequeueThreadStartPerformanceCounter; public PerformanceCounter DequeueThreadStartPerformanceCounter { private set { Interlocked.Exchange<PerformanceCounter>(ref _dequeueThreadStartPerformanceCounter, value); } get { return _dequeueThreadStartPerformanceCounter; } } private PerformanceCounter _dequeueThreadEndPerformanceCounter; public PerformanceCounter DequeueThreadEndPerformanceCounter { private set { Interlocked.Exchange<PerformanceCounter>(ref _dequeueThreadEndPerformanceCounter, value); } get { return _dequeueThreadEndPerformanceCounter; } } private PerformanceCounter _dequeueThreadsCountPerformanceCounter; public PerformanceCounter DequeueThreadsCountPerformanceCounter { private set { Interlocked.Exchange<PerformanceCounter>(ref _dequeueThreadsCountPerformanceCounter, value); } get { return _dequeueThreadsCountPerformanceCounter; } } private bool _isAttachedPerformanceCounters = false; private class ThreadProcessor { public bool Break { set; get; } public EventWaitHandle Wait { private set; get; } public ConcurrentAsyncQueue<T> Sender { private set; get; } public void StopOne() { Break = true; } public ThreadProcessor ( ConcurrentAsyncQueue<T> queue , EventWaitHandle wait ) { Wait = wait; Sender = queue; } public void ThreadProcess() { Interlocked.Increment(ref Sender._concurrentDequeueThreadsCount); if (Sender._isAttachedPerformanceCounters) { Sender.DequeueThreadStartPerformanceCounter.Increment(); Sender.DequeueThreadsCountPerformanceCounter.Increment(); } long r = 0; try { if (Sender.OnDequeueThreadStart != null) { r = Interlocked.Read(ref Sender._concurrentDequeueThreadsCount); Sender.OnDequeueThreadStart ( string.Format ( "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}" , "Threads ++ !" , r , Sender._queue.Count , Thread.CurrentThread.Name , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff") ) ); } while (true) { #region while true if (Break) { break; } while (!Sender._queue.IsEmpty) { #region while queue.IsEmpty if (Break) { break; } T element; if (Sender._queue.TryDequeue(out element)) { if (Sender._isAttachedPerformanceCounters) { Sender.DequeuePerformanceCounter.Increment(); Sender.QueueLengthPerformanceCounter.Decrement(); } if (Sender.OnDequeue != null) { Sender.OnDequeue(element); } if (Sender._isAttachedPerformanceCounters) { Sender.DequeueProcessedPerformanceCounter.Increment(); } } #endregion while queue.IsEmpty } #region wait Sender._dequeueThreadsProcessorsPool.Enqueue(this); if (Break) { } if (!Wait.WaitOne(Sender.DequeueIdleSleepSeconds * 1000)) { } #endregion wait #endregion while 1 == 1 } } catch (Exception e) { if (Sender.OnException != null) { Sender.OnException(e); } } finally { r = Interlocked.Decrement(ref Sender._concurrentDequeueThreadsCount); if (r < 0) { Interlocked.Exchange(ref Sender._concurrentDequeueThreadsCount, 0); if (Sender._isAttachedPerformanceCounters) { if (Sender.DequeueThreadsCountPerformanceCounter.RawValue < 0) { Sender.DequeueThreadsCountPerformanceCounter.RawValue = Sender._concurrentDequeueThreadsCount; } } } if (Sender.OnDequeueThreadEnd != null) { Sender.OnDequeueThreadEnd ( string.Format ( "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}" , "Threads--" , r , Sender._queue.Count , Thread.CurrentThread.Name , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff") ) ); } if (Sender._isAttachedPerformanceCounters) { Sender.DequeueThreadEndPerformanceCounter.Increment(); Sender.DequeueThreadsCountPerformanceCounter.Decrement(); } if (!Break) { Sender.StartAdd(1); } Break = false; } } } public void AttachPerformanceCounters(string instanceNamePrefix) { var category = "Microshaoft AsyncConurrentQueue Counters"; var process = Process.GetCurrentProcess(); var processName = process.ProcessName; var instanceName = string.Format ( "{0}-{1}" , instanceNamePrefix , processName //, processID //, processStartTime.ToString("yyyy-MM-dd HH:mm:ss.fff") ); var type = this.GetType(); PerformanceCountersHelper.AttachPerformanceCountersToProperties<ConcurrentAsyncQueue<T>>(instanceName, category, this); _isAttachedPerformanceCounters = true; } public int Count { get { return _queue.Count; } } public long ConcurrentThreadsCount { get { return _concurrentDequeueThreadsCount; } } private void Stop(int count) { Action action; for (var i = 0; i < count; i++) { if (_callbackProcessBreaksActions.TryDequeue(out action)) { action(); } } } public void StartStop(int count) { new Thread ( new ThreadStart ( () => { Stop(count); } ) ).Start(); } public void StartAdd(int count) { new Thread ( new ThreadStart ( () => { Add(count); } ) ).Start(); } private void Add(int count) { for (int i = 0; i < count; i++) { Interlocked.Increment(ref _concurrentDequeueThreadsCount); if (_dequeueThreadsProcessorsPool == null) { _dequeueThreadsProcessorsPool = new ConcurrentQueue<ThreadProcessor>(); } var processor = new ThreadProcessor ( this , new AutoResetEvent(false) ); var thread = new Thread ( new ThreadStart ( processor.ThreadProcess ) ); if (_callbackProcessBreaksActions == null) { _callbackProcessBreaksActions = new ConcurrentQueue<Action>(); } var callbackProcessBreakAction = new Action ( processor.StopOne ); _callbackProcessBreaksActions.Enqueue(callbackProcessBreakAction); _dequeueThreadsProcessorsPool.Enqueue(processor); thread.Start(); } } public void Enqueue(T element) { try { _queue.Enqueue(element); if (_isAttachedPerformanceCounters) { EnqueuePerformanceCounter.Increment(); QueueLengthPerformanceCounter.Increment(); } if ( _dequeueThreadsProcessorsPool != null && !_dequeueThreadsProcessorsPool.IsEmpty ) { ThreadProcessor processor; if (_dequeueThreadsProcessorsPool.TryDequeue(out processor)) { processor.Wait.Set(); } } } catch (Exception e) { if (OnException != null) { OnException(e); } } } } } namespace Microshaoft { using System; using System.Linq; using System.Diagnostics; using System.Reflection; using System.Collections.Generic; public static class PerformanceCountersHelper { public static void AttachPerformanceCountersToProperties<T> ( string performanceCounterInstanceName , string category , T target = default(T) ) { var type = typeof(T); var propertiesList = type.GetProperties().ToList(); propertiesList = propertiesList.Where ( (pi) => { return (pi.PropertyType == typeof(PerformanceCounter)); } ).ToList(); if (PerformanceCounterCategory.Exists(category)) { propertiesList.ForEach ( (pi) => { if (PerformanceCounterCategory.CounterExists(pi.Name, category)) { if (PerformanceCounterCategory.InstanceExists(performanceCounterInstanceName, category)) { //var pc = new PerformanceCounter(category, pi.Name, instanceName, false); //pc.InstanceName = instanceName; //pc.RemoveInstance(); } } } ); PerformanceCounterCategory.Delete(category); } var ccdc = new CounterCreationDataCollection(); propertiesList.ForEach ( (pi) => { var propertyName = pi.Name; var ccd = PerformanceCounterHelper.GetCounterCreationData ( propertyName , PerformanceCounterType.NumberOfItems64 ); ccdc.Add(ccd); } ); PerformanceCounterCategory.Create ( category, string.Format("{0} Category Help.", category), PerformanceCounterCategoryType.MultiInstance, ccdc ); propertiesList.ForEach ( (pi) => { var propertyName = pi.Name; var pc = new PerformanceCounter() { CategoryName = category , CounterName = propertyName , InstanceLifetime = PerformanceCounterInstanceLifetime.Process , InstanceName = performanceCounterInstanceName , ReadOnly = false , RawValue = 0 }; if (pi.GetGetMethod().IsStatic) { var setter = DynamicPropertyAccessor.CreateSetStaticPropertyValueAction<PerformanceCounter>(type, propertyName); setter(pc); } else { if (target != null) { var setter = DynamicPropertyAccessor.CreateSetPropertyValueAction<PerformanceCounter>(type, propertyName); setter(target, pc); } } } ); } } } namespace Microshaoft { using System; using System.Diagnostics; public static class PerformanceCounterHelper { public static CounterCreationData GetCounterCreationData(string counterName, PerformanceCounterType performanceCounterType) { return new CounterCreationData() { CounterName = counterName , CounterHelp = string.Format("{0} Help", counterName) , CounterType = performanceCounterType }; } } } namespace Microshaoft { using System; using System.Reflection; using System.Linq; using System.Linq.Expressions; public class DynamicPropertyAccessor { public static Func<object, object> CreateGetPropertyValueFunc(string typeName, string propertyName, bool isTypeFromAssembly = false) { Type type; if (isTypeFromAssembly) { var assembly = AppDomain.CurrentDomain.GetAssemblies().First ( (a) => { return a.GetTypes().Any ( (t) => { return (t.FullName == typeName); } ); } ); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateGetPropertyValueFunc(type, propertyName); } public static Func<object, object> CreateGetPropertyValueFunc(Type type, string propertyName) { var target = Expression.Parameter(typeof(object)); var castTarget = Expression.Convert(target, type); var getPropertyValue = Expression.Property(castTarget, propertyName); var castPropertyValue = Expression.Convert(getPropertyValue, typeof(object)); var lambda = Expression.Lambda<Func<object, object>>(castPropertyValue, target); return lambda.Compile(); } public static Func<object, TProperty> CreateGetPropertyValueFunc<TProperty>(string typeName, string propertyName, bool isTypeFromAssembly = false) { Type type; if (isTypeFromAssembly) { var assembly = AppDomain.CurrentDomain.GetAssemblies().First ( (a) => { return a.GetTypes().Any ( (t) => { return (t.FullName == typeName); } ); } ); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateGetPropertyValueFunc<TProperty>(type, propertyName); } public static Func<object, TProperty> CreateGetPropertyValueFunc<TProperty>(Type type, string propertyName) { var target = Expression.Parameter(typeof(object)); var castTarget = Expression.Convert(target, type); var getPropertyValue = Expression.Property(castTarget, propertyName); var lambda = Expression.Lambda<Func<object, TProperty>>(getPropertyValue, target); return lambda.Compile(); } public static Func<TProperty> CreateGetStaticPropertyValueFunc<TProperty>(string typeName, string propertyName, bool isTypeFromAssembly = false) { Type type; if (isTypeFromAssembly) { var assembly = AppDomain.CurrentDomain.GetAssemblies().First ( (a) => { return a.GetTypes().Any ( (t) => { return (t.FullName == typeName); } ); } ); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateGetStaticPropertyValueFunc<TProperty>(type, propertyName); } public static Func<TProperty> CreateGetStaticPropertyValueFunc<TProperty>(Type type, string propertyName) { var property = type.GetProperty(propertyName, typeof(TProperty)); var getPropertyValue = Expression.Property(null, property); var lambda = Expression.Lambda<Func<TProperty>>(getPropertyValue, null); return lambda.Compile(); } public static Func<object> CreateGetStaticPropertyValueFunc(Type type, string propertyName) { var property = type.GetProperty(propertyName); var getPropertyValue = Expression.Property(null, property); var castPropertyValue = Expression.Convert(getPropertyValue, typeof(object)); var lambda = Expression.Lambda<Func<object>>(castPropertyValue, null); return lambda.Compile(); } public static Func<object> CreateGetStaticPropertyValueFunc(string typeName, string propertyName, bool isTypeFromAssembly = false) { Type type; if (isTypeFromAssembly) { var assembly = AppDomain.CurrentDomain.GetAssemblies().First ( (a) => { return a.GetTypes().Any ( (t) => { return (t.FullName == typeName); } ); } ); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateGetStaticPropertyValueFunc(type, propertyName); } public static Action<object, object> CreateSetPropertyValueAction(Type type, string propertyName) { var property = type.GetProperty(propertyName); var target = Expression.Parameter(typeof(object)); var propertyValue = Expression.Parameter(typeof(object)); var castTarget = Expression.Convert(target, type); var castPropertyValue = Expression.Convert(propertyValue, property.PropertyType); var getSetMethod = property.GetSetMethod(); if (getSetMethod == null) { getSetMethod = property.GetSetMethod(true); } var call = Expression.Call(castTarget, getSetMethod, castPropertyValue); var lambda = Expression.Lambda<Action<object, object>>(call, target, propertyValue); return lambda.Compile(); } public static Action<object, object> CreateSetPropertyValueAction(string typeName, string propertyName, bool isTypeFromAssembly = false) { Type type; if (isTypeFromAssembly) { var assembly = AppDomain.CurrentDomain.GetAssemblies().First ( (a) => { return a.GetTypes().Any ( (t) => { return (t.FullName == typeName); } ); } ); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateSetPropertyValueAction(type, propertyName); } public static Action<object, TProperty> CreateSetPropertyValueAction<TProperty>(Type type, string propertyName) { var property = type.GetProperty(propertyName); var target = Expression.Parameter(typeof(object)); var propertyValue = Expression.Parameter(typeof(TProperty)); var castTarget = Expression.Convert(target, type); var castPropertyValue = Expression.Convert(propertyValue, property.PropertyType); var getSetMethod = property.GetSetMethod(); if (getSetMethod == null) { getSetMethod = property.GetSetMethod(true); } var call = Expression.Call(castTarget, getSetMethod, castPropertyValue); return Expression.Lambda<Action<object, TProperty>>(call, target, propertyValue).Compile(); } public static Action<object, TProperty> CreateSetPropertyValueAction<TProperty>(string typeName, string propertyName, bool isTypeFromAssembly = false) { Type type; if (isTypeFromAssembly) { var assembly = AppDomain.CurrentDomain.GetAssemblies().First ( (a) => { return a.GetTypes().Any ( (t) => { return (t.FullName == typeName); } ); } ); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateSetPropertyValueAction<TProperty>(type, propertyName); } public static Action<object> CreateSetStaticPropertyValueAction(Type type, string propertyName) { var property = type.GetProperty(propertyName); var propertyValue = Expression.Parameter(typeof(object)); var castPropertyValue = Expression.Convert(propertyValue, property.PropertyType); var getSetMethod = property.GetSetMethod(); if (getSetMethod == null) { getSetMethod = property.GetSetMethod(true); } var call = Expression.Call(null, getSetMethod, castPropertyValue); var lambda = Expression.Lambda<Action<object>>(call, propertyValue); return lambda.Compile(); } public static Action<object> CreateSetStaticPropertyValueAction(string typeName, string propertyName, bool isTypeFromAssembly = false) { Type type; if (isTypeFromAssembly) { var assembly = AppDomain.CurrentDomain.GetAssemblies().First ( (a) => { return a.GetTypes().Any ( (t) => { return (t.FullName == typeName); } ); } ); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateSetStaticPropertyValueAction(type, propertyName); } public static Action<TProperty> CreateSetStaticPropertyValueAction<TProperty>(Type type, string propertyName) { var property = type.GetProperty(propertyName); var propertyValue = Expression.Parameter(typeof(TProperty)); //var castPropertyValue = Expression.Convert(propertyValue, property.PropertyType); var getSetMethod = property.GetSetMethod(); if (getSetMethod == null) { getSetMethod = property.GetSetMethod(true); } var call = Expression.Call(null, getSetMethod, propertyValue); var lambda = Expression.Lambda<Action<TProperty>>(call, propertyValue); return lambda.Compile(); } public static Action<TProperty> CreateSetStaticPropertyValueAction<TProperty>(string typeName, string propertyName, bool isTypeFromAssembly = false) { Type type; if (isTypeFromAssembly) { var assembly = AppDomain.CurrentDomain.GetAssemblies().First ( (a) => { return a.GetTypes().Any ( (t) => { return (t.FullName == typeName); } ); } ); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateSetStaticPropertyValueAction<TProperty>(type, propertyName); } } } |