zoukankan      html  css  js  c++  java
  • WCF NetTcp AsyncQueue Service

    
    //AsyncQueue.cs
    #define c4 //C# 4.0+
    //#define c2
    namespace Microshaoft
    {
        using System;
        using System.Threading;
        using System.Diagnostics;
        using System.Collections.Generic;
    #if c4
        using System.Collections.Concurrent;
    #endif
        using Microshaoft;
        public class AsyncQueue<T>
                            where T : class
        {
            public delegate void QueueEventHandler(T element);
            public event QueueEventHandler OnDequeue;
            public delegate void QueueLogEventHandler(string logMessage);
            //public event QueueLogEventHandler OnQueueLog;
            public event QueueLogEventHandler OnQueueRunningThreadStart;
            public event QueueLogEventHandler OnQueueRunningThreadEnd;
            public event QueueLogEventHandler OnDequeueThreadStart;
            public event QueueLogEventHandler OnDequeueThreadEnd;
            public event QueueLogEventHandler OnDequeueAllThreadsEnd;
            public delegate void ExceptionEventHandler(Exception exception);
            public event ExceptionEventHandler OnException;
    #if c2
            private Queue<T> _queue = new Queue<T>();
    #elif c4
            private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    #endif
            private object _syncQueueLockObject = new object();
            //private object _syncQueueRunningLockObject = new object();
            private long _isQueueRunning = 0;
            private long _concurrentDequeueThreadsCount = 0; //Microshaoft 用于控制并发线程数
            private PerformanceCounter _enqueuePerformanceCounter;
            private PerformanceCounter _dequeuePerformanceCounter;
            private PerformanceCounter _dequeueProcessedPerformanceCounter;
            private PerformanceCounter _queueLengthPerformanceCounter;
            private PerformanceCounter _dequeueThreadStartPerformanceCounter;
            private PerformanceCounter _dequeueThreadEndPerformanceCounter;
            private PerformanceCounter _dequeueThreadsCountPerformanceCounter;
            private PerformanceCounter _queueRunningThreadStartPerformanceCounter;
            private PerformanceCounter _queueRunningThreadEndPerformanceCounter;
            private PerformanceCounter _queueRunningThreadsCountPerformanceCounter;
            private bool _isAttachedPerformanceCounters = false;
            public void AttachPerformanceCounters(string instanceNamePrefix)
            {
                string category = "Microshaoft AsyncConurrentQueue Counters";
                string counter = string.Empty;
                Process process = Process.GetCurrentProcess();
                //int processID = 0;//process.Id;
                string processName = process.ProcessName;
                //string processStartTime = "";//process.StartTime;
                string instanceName = string.Empty;
                instanceName = string.Format
                                        (
                                            "{0}-{1}"
                                            , instanceNamePrefix
                                            , processName
                    //, processID
                    //, processStartTime.ToString("yyyy-MM-dd HH:mm:ss.fff")
                                        );
                CounterCreationDataCollection ccdc = new CounterCreationDataCollection();
                if (PerformanceCounterCategory.Exists(category))
                {
                    PerformanceCounterCategory.Delete(category);
                }
                CounterCreationData ccd = null;
                counter = "EnqueueCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "DequeueCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "QueueLengthCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "DequeueProcessedCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "DequeueThreadStartCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "DequeueThreadEndCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "DequeueThreadsCountCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "QueueRunningThreadStartCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "QueueRunningThreadEndCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                counter = "QueueRunningThreadsCountCounter";
                ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
                ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
                PerformanceCounterCategory.Create
                                                (
                                                    category,
                                                    string.Format("{0} Category Help.", category),
                                                    PerformanceCounterCategoryType.MultiInstance,
                                                    ccdc
                                                );
                counter = "EnqueueCounter";
                _enqueuePerformanceCounter = new PerformanceCounter();
                _enqueuePerformanceCounter.CategoryName = category;
                _enqueuePerformanceCounter.CounterName = counter;
                _enqueuePerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _enqueuePerformanceCounter.InstanceName = instanceName;
                _enqueuePerformanceCounter.ReadOnly = false;
                _enqueuePerformanceCounter.RawValue = 0;
                counter = "DequeueCounter";
                _dequeuePerformanceCounter = new PerformanceCounter();
                _dequeuePerformanceCounter.CategoryName = category;
                _dequeuePerformanceCounter.CounterName = counter;
                _dequeuePerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _dequeuePerformanceCounter.InstanceName = instanceName;
                _dequeuePerformanceCounter.ReadOnly = false;
                _dequeuePerformanceCounter.RawValue = 0;
                counter = "DequeueProcessedCounter";
                _dequeueProcessedPerformanceCounter = new PerformanceCounter();
                _dequeueProcessedPerformanceCounter.CategoryName = category;
                _dequeueProcessedPerformanceCounter.CounterName = counter;
                _dequeueProcessedPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _dequeueProcessedPerformanceCounter.InstanceName = instanceName;
                _dequeueProcessedPerformanceCounter.ReadOnly = false;
                _dequeueProcessedPerformanceCounter.RawValue = 0;
                counter = "QueueLengthCounter";
                _queueLengthPerformanceCounter = new PerformanceCounter();
                _queueLengthPerformanceCounter.CategoryName = category;
                _queueLengthPerformanceCounter.CounterName = counter;
                _queueLengthPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _queueLengthPerformanceCounter.InstanceName = instanceName;
                _queueLengthPerformanceCounter.ReadOnly = false;
                _queueLengthPerformanceCounter.RawValue = 0;
                counter = "DequeueThreadStartCounter";
                _dequeueThreadStartPerformanceCounter = new PerformanceCounter();
                _dequeueThreadStartPerformanceCounter.CategoryName = category;
                _dequeueThreadStartPerformanceCounter.CounterName = counter;
                _dequeueThreadStartPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _dequeueThreadStartPerformanceCounter.InstanceName = instanceName;
                _dequeueThreadStartPerformanceCounter.ReadOnly = false;
                _dequeueThreadStartPerformanceCounter.RawValue = 0;
                counter = "DequeueThreadEndCounter";
                _dequeueThreadEndPerformanceCounter = new PerformanceCounter();
                _dequeueThreadEndPerformanceCounter.CategoryName = category;
                _dequeueThreadEndPerformanceCounter.CounterName = counter;
                _dequeueThreadEndPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _dequeueThreadEndPerformanceCounter.InstanceName = instanceName;
                _dequeueThreadEndPerformanceCounter.ReadOnly = false;
                _dequeueThreadEndPerformanceCounter.RawValue = 0;
                counter = "DequeueThreadsCountCounter";
                _dequeueThreadsCountPerformanceCounter = new PerformanceCounter();
                _dequeueThreadsCountPerformanceCounter.CategoryName = category;
                _dequeueThreadsCountPerformanceCounter.CounterName = counter;
                _dequeueThreadsCountPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _dequeueThreadsCountPerformanceCounter.InstanceName = instanceName;
                _dequeueThreadsCountPerformanceCounter.ReadOnly = false;
                _dequeueThreadsCountPerformanceCounter.RawValue = 0;
                counter = "QueueRunningThreadStartCounter";
                _queueRunningThreadStartPerformanceCounter = new PerformanceCounter();
                _queueRunningThreadStartPerformanceCounter.CategoryName = category;
                _queueRunningThreadStartPerformanceCounter.CounterName = counter;
                _queueRunningThreadStartPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _queueRunningThreadStartPerformanceCounter.InstanceName = instanceName;
                _queueRunningThreadStartPerformanceCounter.ReadOnly = false;
                _queueRunningThreadStartPerformanceCounter.RawValue = 0;
                counter = "QueueRunningThreadEndCounter";
                _queueRunningThreadEndPerformanceCounter = new PerformanceCounter();
                _queueRunningThreadEndPerformanceCounter.CategoryName = category;
                _queueRunningThreadEndPerformanceCounter.CounterName = counter;
                _queueRunningThreadEndPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _queueRunningThreadEndPerformanceCounter.InstanceName = instanceName;
                _queueRunningThreadEndPerformanceCounter.ReadOnly = false;
                _queueRunningThreadEndPerformanceCounter.RawValue = 0;
                counter = "QueueRunningThreadsCountCounter";
                _queueRunningThreadsCountPerformanceCounter = new PerformanceCounter();
                _queueRunningThreadsCountPerformanceCounter.CategoryName = category;
                _queueRunningThreadsCountPerformanceCounter.CounterName = counter;
                _queueRunningThreadsCountPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
                _queueRunningThreadsCountPerformanceCounter.InstanceName = instanceName;
                _queueRunningThreadsCountPerformanceCounter.ReadOnly = false;
                _queueRunningThreadsCountPerformanceCounter.RawValue = 0;
                _isAttachedPerformanceCounters = true;
            }
            private int _maxConcurrentThreadsCount = 1; //Microshaoft 允许并发出列处理线程数为 1
            public int MaxConcurrentThreadsCount
            {
                set
                {
                    _maxConcurrentThreadsCount = value;
                }
                get
                {
                    return _maxConcurrentThreadsCount;
                }
            }
            //Microshaoft 服务启动后可立即开启新的线程调用此方法(死循环)
            private void QueueRun() //Microshaoft ThreadStart
            {
                if (Interlocked.Read(ref _concurrentDequeueThreadsCount) < _maxConcurrentThreadsCount)
                {
                    if (Interlocked.CompareExchange(ref _isQueueRunning, 0, 1) == 0)
                    {
                        ThreadStart ts = new ThreadStart(QueueRunThreadProcess);
                        Thread t = new Thread(ts);
                        t.Name = "QueueRunningThreadProcess";
                        t.Start();
                    }
                }
            }
            public int Count
            {
                get
                {
                    return _queue.Count;
                }
            }
            public long ConcurrentThreadsCount
            {
                get
                {
                    return _concurrentDequeueThreadsCount;
                }
            }
            private void QueueRunThreadProcess()
            {
                if (_isAttachedPerformanceCounters)
                {
                    _queueRunningThreadStartPerformanceCounter.Increment();
                    _queueRunningThreadsCountPerformanceCounter.Increment();
                }
                if (OnQueueRunningThreadStart != null)
                {
                    OnQueueRunningThreadStart
                        (
                            string.Format
                                    (
                                        "{0} Threads Count {1},Queue Count {2},Current Thread: {3}({4}) at {5}"
                                        , "Queue Running Start ..."
                                        , _concurrentDequeueThreadsCount
                                        , _queue.Count
                                        , Thread.CurrentThread.Name
                                        , Thread.CurrentThread.ManagedThreadId
                                        , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                    )
                        );
                }
    #if c2
                while ((_queue.Count > 0)) //Microshaoft 死循环
    #elif c4
                while (!_queue.IsEmpty) //Microshaoft 死循环
    #endif
                {
                    int threadID = -1;
                    {
                        int r = (int)Interlocked.Read(ref _concurrentDequeueThreadsCount);
                        if (r < _maxConcurrentThreadsCount)
                        {
                            //if (_queue.Count > 0)
                            {
                                r = (int)Interlocked.Increment(ref _concurrentDequeueThreadsCount);
                                threadID = (int)_concurrentDequeueThreadsCount;
                                //ThreadProcessState tps = new ThreadProcessState();
                                //tps.element = element;
                                //tps.Sender = this;
                                Thread t = new Thread(new ThreadStart(DequeueThreadProcess));
                                t.Name = string.Format("ConcurrentDequeueProcessThread[{0}]", threadID);
                                t.Start();
                            }
                            ///                        else
                            ///                        {
                            ///                            break;
                            ///                        }
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                //Interlocked.CompareExchange(ref _queueRuning, 0, 1);
                if (OnQueueRunningThreadEnd != null)
                {
                    int r = (int)Interlocked.Read(ref _concurrentDequeueThreadsCount);
                    OnQueueRunningThreadEnd
                                (
                                    string.Format
                                            (
                                                "{0} Threads Count {1}, Queue Count {2}, Current Thread: {3}({4}) at {5}"
                                                , "Queue Running Stop ..."
                                                , r
                                                , _queue.Count
                                                , Thread.CurrentThread.Name
                                                , Thread.CurrentThread.ManagedThreadId
                                                , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                            )
                                );
                }
                if (_isAttachedPerformanceCounters)
                {
                    _queueRunningThreadEndPerformanceCounter.Increment();
                    _queueRunningThreadsCountPerformanceCounter.Decrement();
                }
                Interlocked.Exchange(ref _isQueueRunning, 0);
            }
            public void Enqueue(T element)
            {
                try
                {
    #if c2
                    lock (_syncQueueLockObject) //还算并发吗?
    #endif
                    {
                        _queue.Enqueue(element);
                    }
                    if (_isAttachedPerformanceCounters)
                    {
                        _enqueuePerformanceCounter.Increment();
                        _queueLengthPerformanceCounter.Increment();
                    }
                }
                catch (Exception e)
                {
                    if (OnException != null)
                    {
                        OnException(e);
                    }
                }
                //int r = Interlocked.CompareExchange(ref _queueRuning, 1, 0))
                //if (r == 1)
                //{
                QueueRun();
                //}
            }
            private void DequeueThreadProcess()
            {
                if (_isAttachedPerformanceCounters)
                {
                    _dequeueThreadStartPerformanceCounter.Increment();
                    _dequeueThreadsCountPerformanceCounter.Increment();
                }
                if (OnDequeueThreadStart != null)
                {
                    int r = (int)Interlocked.Read(ref _concurrentDequeueThreadsCount);
                    OnDequeueThreadStart
                                    (
                                        string.Format
                                                (
                                                    "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}"
                                                    , "Threads ++ !"
                                                    , r
                                                    , _queue.Count
                                                    , Thread.CurrentThread.Name
                                                    , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                                )
                                    );
                }
                bool queueWasNotEmpty = false;
                try
                {
    #if c2
                    while (true)
    #elif c4
                    while (!_queue.IsEmpty)
    #endif
                    {
                        T element = null;
    #if c2
                        lock (_syncQueueLockObject)
                        {
                            if (_queue.Count > 0)
                            {
                                element = _queue.Dequeue();
                            }
                            else
                            {
                                //避免QueueRun 死循环
                                break;
                            }
                        }
    #elif c4
                        if (_queue.TryDequeue(out element))
                        {
    #elif c2
                            if (element != null)
                            {
    #endif
                            if (!queueWasNotEmpty)
                            {
                                queueWasNotEmpty = true;
                            }
                            if (_isAttachedPerformanceCounters)
                            {
                                _dequeuePerformanceCounter.Increment();
                                _queueLengthPerformanceCounter.Decrement();
                            }
                            if (OnDequeue != null)
                            {
                                OnDequeue(element);
                            }
                            if (_isAttachedPerformanceCounters)
                            {
                                _dequeueProcessedPerformanceCounter.Increment();
                            }
    #if c2
                            }
    #elif c4
                        }
                    }
    #endif
                }
                catch (Exception e)
                {
                    if (OnException != null)
                    {
                        OnException(e);
                    }
                }
                finally
                {
                    int r = (int)Interlocked.Decrement(ref _concurrentDequeueThreadsCount);
                    if (OnDequeueThreadEnd != null)
                    {
                        OnDequeueThreadEnd
                                    (
                                        string.Format
                                                (
                                                    "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}"
                                                    , "Threads--"
                                                    , r
                                                    , _queue.Count
                                                    , Thread.CurrentThread.Name
                                                    , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                                )
                                    );
                    }
                    if (r == 0)
                    {
                        if (OnDequeueAllThreadsEnd != null)
                        {
                            OnDequeueAllThreadsEnd
                                        (
                                            string.Format
                                                    (
                                                        "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}"
                                                        , "All Threads End"
                                                        , r
                                                        , _queue.Count
                                                        , Thread.CurrentThread.Name
                                                        , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                                    )
                                        );
                        }
                    }
                    if (_isAttachedPerformanceCounters)
                    {
                        _dequeueThreadEndPerformanceCounter.Increment();
                        _dequeueThreadsCountPerformanceCounter.Decrement();
                    }
                    if (queueWasNotEmpty)
                    {
                        QueueRun(); //死循环???
                    }
                }
            }
        }
    }
    namespace Microshaoft
    {
        using System;
        using System.Diagnostics;
        public static class PerformanceCounterHelper
        {
            public static CounterCreationData GetCounterCreationData(string counterName, PerformanceCounterType performanceCounterType)
            {
                CounterCreationData ccd = new CounterCreationData();
                ccd.CounterName = counterName;
                ccd.CounterHelp = string.Format("{0} Help", counterName);
                ccd.CounterType = performanceCounterType;
                return ccd;
            }
        }
    }
    //========================================================================================================================================
    // Server.cs
    namespace Microshaoft.WCF.Services
    {
        using System;
        using System.Collections.Generic;
        using System.Linq;
        using System.Text;
        using Contracts.Services;
        using Contracts.Services.Entitys;
        using Microshaoft.WCF.Server;
        public class AsyncQueueWcfNetNetTcpService : IQueueAble
        {
            public void Enqueue(Item item)
            {
                WindowsServiceHost.AsyncQueueProcessor.Enqueue(item);
            }
        }
    }
    namespace Microshaoft.WCF.Server
    {
        using System;
        using System.ComponentModel;
        using System.ServiceProcess;
        using System.Configuration.Install;
        using System.Security.Principal;
        using System.ServiceModel;
        using System.ServiceModel.Description;
        using Microshaoft;
        using Microshaoft.Win32;
        using Microshaoft.WCF.Services;
        using Contracts.Services;
        using Contracts.Services.Entitys;
        public class WindowsServiceHost : ServiceBase
        {
            ///// <summary>
            /// 应用程序的主入口点。
            /// </summary>
            //[STAThread]
            public static readonly string serviceName = "AsyncConcurrentQueueWcfNetTcpService";
            private static AsyncQueueProcessor _asyncQueueProcessor;
            public static AsyncQueueProcessor AsyncQueueProcessor
            {
                get
                {
                    return _asyncQueueProcessor;
                }
            }
            static void Main(string[] args)
            {
                //Microshaoft
                //Microshaoft TODO: 在此处添加代码以启动应用程序
                //Microshaoft 
                WindowsServiceHost service = new WindowsServiceHost();
                int l = 0;
                bool needFreeConsole = false;
                if (args != null)
                {
                    l = args.Length;
                }
                if (l > 0)
                {
                    if (args[0].ToLower() == "/console")
                    {
                        needFreeConsole = true;
                        NativeMethods.AllocConsole();
                        Console.Title = "Server ...";
                        Console.WriteLine("Alloc Console ...");
                        Console.WriteLine("Current User Identity: {0}", WindowsIdentity.GetCurrent().Name);
                        Console.WriteLine(".Net Framework version: {0}", Environment.Version.ToString());
                        Console.Title = string.Format
                                                (
                                                    "{0} Host Server"
                                                    , WindowsServiceHost.serviceName
                                                ); //不能以服务运行
                        Console.WriteLine("Console");
                        service.OnStart(null);
                        Console.ReadLine();
                        return;
                    }
                }
                Console.WriteLine("Service");
                ServiceBase.Run(service);
                if (needFreeConsole)
                {
                    Console.WriteLine("Free Console ...");
                    NativeMethods.FreeConsole();
                }
            }
            public static ServiceHost _serviceHost;
            public WindowsServiceHost()
            {
                CanPauseAndContinue = true;
                ServiceName = WindowsServiceHost.serviceName;
            }
            protected override void OnStart(string[] args)
            {
                Console.WriteLine(Environment.Version.ToString());
                AsyncQueue<Item> queue = new AsyncQueue<Item>();
                queue.AttachPerformanceCounters("Q1");
                _asyncQueueProcessor = new AsyncQueueProcessor(queue);
                _serviceHost = new ServiceHost(typeof(AsyncQueueWcfNetNetTcpService));
                string address = "{0}://localhost{1}/servicemodelsamples/service";
                string netTcpUrl = string.Format
                                            (
                                                address
                                                , "net.tcp"
                                                , ":9000"
                                            );
                string httpUrl = string.Format
                                            (
                                                address
                                                , "http"
                                                , ":8080"
                                            );
                NetTcpBinding binding = new NetTcpBinding();
                //binding.Security.Transport.MsmqAuthenticationMode = MsmqAuthenticationMode.None;
                //binding.Security.Mode = MsmqIntegrationSecurityMode.None;
                _serviceHost.AddServiceEndpoint
                                        (
                                            typeof(IQueueAble)
                                            , binding
                                            , netTcpUrl
                                        );
                ServiceMetadataBehavior smb = _serviceHost.Description.Behaviors.Find<ServiceMetadataBehavior>();
                //发布元数据
                if (smb == null)
                {
                    smb = new ServiceMetadataBehavior();
                }
                //smb.HttpGetEnabled = true;
                //smb.HttpGetUrl = new Uri(httpUrl);
                //smb.MetadataExporter.PolicyVersion = PolicyVersion.Policy15;
                _serviceHost.Description.Behaviors.Add(smb);
                // Add MEX endpoint
                _serviceHost.AddServiceEndpoint
                                        (
                                            ServiceMetadataBehavior.MexContractName,
                                            MetadataExchangeBindings.CreateMexTcpBinding(),
                                            netTcpUrl + "/mex"
                                        );
                ServiceThrottlingBehavior stb = new ServiceThrottlingBehavior();
                stb.MaxConcurrentCalls = 1000;
                stb.MaxConcurrentInstances = 1000;
                stb.MaxConcurrentSessions = 1000;
                _serviceHost.Description.Behaviors.Add(stb);
                _serviceHost.Open();
                Console.WriteLine("Wcf Service Host Opened ...");
            }
        }
        [RunInstallerAttribute(true)]
        public class ProjectInstaller : Installer
        {
            private ServiceInstaller serviceInstaller;
            private ServiceProcessInstaller processInstaller;
            public ProjectInstaller()
            {
                processInstaller = new ServiceProcessInstaller();
                serviceInstaller = new ServiceInstaller();
                // Service will run under system account
                processInstaller.Account = ServiceAccount.LocalSystem;
                // Service will have Start Type of Manual
                serviceInstaller.StartType = ServiceStartMode.Manual;
                serviceInstaller.ServiceName = WindowsServiceHost.serviceName;
                Installers.Add(serviceInstaller);
                Installers.Add(processInstaller);
            }
        }
    }
    namespace Microshaoft.Win32
    {
        using System.Runtime.InteropServices;
        public class NativeMethods
        {
            /// <summary>
            /// 启动控制台
            /// </summary>
            /// <returns></returns>
            [DllImport("kernel32.dll")]
            public static extern bool AllocConsole();
            /// <summary>
            /// 释放控制台
            /// </summary>
            /// <returns></returns>
            [DllImport("kernel32.dll")]
            public static extern bool FreeConsole();
        }
    }
    namespace Microshaoft
    {
        using System;
        using Contracts.Services.Entitys;
        public class AsyncQueueProcessor
        {
            private AsyncQueue<Item> _queue;
            public AsyncQueue<Item> Queue
            {
                get
                {
                    return _queue;
                }
            }
            public AsyncQueueProcessor(AsyncQueue<Item> queue)
            {
                _queue = queue;
                _queue.OnDequeue += new AsyncQueue<Item>.QueueEventHandler(_queue_OnDequeue);
                //_queue.OnDequeueThreadStart += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog);
                _queue.OnDequeueAllThreadsEnd += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog);
                _queue.OnDequeueThreadEnd += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog);
                //_queue.OnQueueRunningThreadStart += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog);
                //_queue.OnQueueRunningThreadEnd += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog);
                _queue.OnException += new AsyncQueue<Item>.ExceptionEventHandler(_queue_OnException);
                _queue.MaxConcurrentThreadsCount = 64;
            }
            public void Enqueue(Item item)
            {
                _queue.Enqueue(item);
            }
            void _queue_OnQueueLog(string logMessage)
            {
                Console.WriteLine(logMessage);
            }
            void _queue_OnDequeue(Item item)
            {
                DateTime DequeueBeginTime = DateTime.Now;
                ///            SqlConnection connection = null;
                ///            try
                ///            {
                ///                connection = new SqlConnection(item.ConnectionString);
                ///                SqlCommand command = new SqlCommand(item.SqlCommandText, connection);
                ///                command.CommandType = CommandType.Text;
                ///                connection.Open();
                ///                command.ExecuteNonQuery();
                ///            }
                ///            catch (Exception e)
                ///            {
                ///                Console.WriteLine("Exception on Dequeue Process:{0}{1}", "\r\n", e.ToString());
                ///            }
                ///            finally
                ///            {
                ///                connection.Close();
                ///                connection.Dispose();
                ///                connection = null;
                ///            }
                DateTime DequeueEndTime = DateTime.Now;
                Console.WriteLine
                            (
                                "QueueRemainCount {0}, Enqueue {1}, Dequeue {2},[{3}], End {4},[{5}],[{6}]"
                                , _queue.Count
                                , item.EnqueueTime
                                , DequeueBeginTime
                                , (DequeueBeginTime.Ticks - item.EnqueueTime.Ticks) / 10000 / 1000
                                , DequeueEndTime
                                , (DequeueEndTime.Ticks - DequeueBeginTime.Ticks) / 10000 / 1000
                                , _queue.ConcurrentThreadsCount
                            );
            }
            void _queue_OnException(Exception e)
            {
                Console.WriteLine(e.ToString());
            }
        }
    }
    //================================================================================================================
    // Share.cs
    namespace Contracts.Services
    {
        using System.ServiceModel;
        using Contracts.Services.Entitys;
        [ServiceContract]
        public interface IQueueAble
        {
            [OperationContract]
            void Enqueue(Item element);
        }
    }
    namespace Contracts.Services.Entitys
    {
        using System;
        using System.Runtime.Serialization;
        [DataContract]
        public class Item
        {
            private DateTime _EnqueueTime;
            [DataMember]
            public DateTime EnqueueTime
            {
                get
                {
                    return _EnqueueTime;
                }
                set
                {
                    _EnqueueTime = value;
                }
            }
            private string _sql;
            [DataMember]
            public string SqlCommandText
            {
                get
                {
                    return _sql;
                }
                set
                {
                    _sql = value;
                }
            }
            private string _connectionString;
            [DataMember]
            public string ConnectionString
            {
                get
                {
                    return _connectionString;
                }
                set
                {
                    _connectionString = value;
                }
            }
        }
    }
    //=======================================================================================
    //=======================================================================================================
    ///// Client.cs
    //------------------------------------------------------------------------------
    // <auto-generated>
    //     此代码由工具生成。
    //     运行时版本:4.0.30319.1
    //
    //     对此文件的更改可能会导致不正确的行为,并且如果
    //     重新生成代码,这些更改将会丢失。
    // </auto-generated>
    //------------------------------------------------------------------------------
    // "D:\Microsoft.SDKs\Windows\v7.1\Bin\NETFX 4.0 Tools\SvcUtil.exe" Share.dll
    // "D:\Microsoft.SDKs\Windows\v7.1\Bin\NETFX 4.0 Tools\SvcUtil.exe" *.wsdl *.xsd
    // "D:\Microsoft.SDKs\Windows\v7.1\Bin\NETFX 4.0 Tools\SvcUtil.exe" net.tcp://localhost:9000/servicemodelsamples/service/mex
    namespace Contracts.Services.Entitys
    {
        using System.Runtime.Serialization;
        [System.Diagnostics.DebuggerStepThroughAttribute()]
        [System.CodeDom.Compiler.GeneratedCodeAttribute("System.Runtime.Serialization", "4.0.0.0")]
        [System.Runtime.Serialization.DataContractAttribute(Name="Item", Namespace="http://schemas.datacontract.org/2004/07/Contracts.Services.Entitys")]
        public partial class Item : object, System.Runtime.Serialization.IExtensibleDataObject
        {
            private System.Runtime.Serialization.ExtensionDataObject extensionDataField;
            private string ConnectionStringField;
            private System.DateTime EnqueueTimeField;
            private string SqlCommandTextField;
            public System.Runtime.Serialization.ExtensionDataObject ExtensionData
            {
                get
                {
                    return this.extensionDataField;
                }
                set
                {
                    this.extensionDataField = value;
                }
            }
            [System.Runtime.Serialization.DataMemberAttribute()]
            public string ConnectionString
            {
                get
                {
                    return this.ConnectionStringField;
                }
                set
                {
                    this.ConnectionStringField = value;
                }
            }
            [System.Runtime.Serialization.DataMemberAttribute()]
            public System.DateTime EnqueueTime
            {
                get
                {
                    return this.EnqueueTimeField;
                }
                set
                {
                    this.EnqueueTimeField = value;
                }
            }
            [System.Runtime.Serialization.DataMemberAttribute()]
            public string SqlCommandText
            {
                get
                {
                    return this.SqlCommandTextField;
                }
                set
                {
                    this.SqlCommandTextField = value;
                }
            }
        }
    }
    namespace Proxy
    {
        [System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "4.0.0.0")]
        [System.ServiceModel.ServiceContractAttribute(ConfigurationName="IQueueAble")]
        public interface IQueueAble
        {
            [System.ServiceModel.OperationContractAttribute(Action="http://tempuri.org/IQueueAble/Enqueue", ReplyAction="http://tempuri.org/IQueueAble/EnqueueResponse")]
            void Enqueue(Contracts.Services.Entitys.Item element);
        }
        [System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "4.0.0.0")]
        public interface IQueueAbleChannel : IQueueAble, System.ServiceModel.IClientChannel
        {
        }
        [System.Diagnostics.DebuggerStepThroughAttribute()]
        [System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "4.0.0.0")]
        public partial class QueueAbleClient : System.ServiceModel.ClientBase<IQueueAble>, IQueueAble
        {
            public QueueAbleClient()
            {
            }
            public QueueAbleClient(string endpointConfigurationName) : 
                    base(endpointConfigurationName)
            {
            }
            public QueueAbleClient(string endpointConfigurationName, string remoteAddress) : 
                    base(endpointConfigurationName, remoteAddress)
            {
            }
            public QueueAbleClient(string endpointConfigurationName, System.ServiceModel.EndpointAddress remoteAddress) : 
                    base(endpointConfigurationName, remoteAddress)
            {
            }
            public QueueAbleClient(System.ServiceModel.Channels.Binding binding, System.ServiceModel.EndpointAddress remoteAddress) : 
                    base(binding, remoteAddress)
            {
            }
            public void Enqueue(Contracts.Services.Entitys.Item element)
            {
                base.Channel.Enqueue(element);
            }
        }
    }
    namespace Microshaoft.WCF.Client
    {
        using System;
        using System.Collections;
        using System.Threading;
        using System.ServiceModel;
        using Proxy;
        using Contracts.Services.Entitys;
        public class Class1
        {
            static QueueAbleClient _client;
            public static void Main()
            {
                Console.Title = "Client";
                Console.WriteLine(Environment.Version.ToString());
                Class1 a = new Class1();
                a.Run();
            }
            public void Run()
            {
                string address = @"net.tcp://localhost:9000/servicemodelsamples/service";
                NetTcpBinding binding = new NetTcpBinding();
                _client = new QueueAbleClient(binding, new EndpointAddress(address));
                for (int i = 0; i < 20; i++)
                {
                    Thread x = new Thread(new ThreadStart(ThreadProcess));
                    x.Start();
                }
            }
            public void ThreadProcess()
            {
                for (int i = 0; i < 1000; i++)
                {
                    Item x = new Item();
                    DateTime EnqueueTime = DateTime.Now;
                    x.EnqueueTime = EnqueueTime;
                    x.SqlCommandText = @"
                            --==========================
                            declare @ varchar(10)
                            set @ = 'aaa'
                            exec zsp_test @
                            --==========================
                    ";
                    x.ConnectionString = "";
                    _client.Enqueue(x);
                    Console.WriteLine
                                (
                                    "Enqueue: {0},[{1}]"
                                    , EnqueueTime
                                    , (DateTime.Now.Ticks - EnqueueTime.Ticks)/10000
                                );
                }
            }
        }
    }
    //============================================================================================================================
    
    
  • 相关阅读:
    POJ-1318(list.sort()输出不为字典序,map才是按字典序排列)
    C++ 进阶
    命令模式在MVC框架中的应用
    使用NoSQL Manager for MongoDBclient连接mongodb
    第一部分 学习函数式思维
    【剑指offer】复杂链表的复制
    Hadoop-2.4.0分布式安装手冊
    Impala中多列转为一行
    js(jquery)绑定点击事件
    hdu1243 最长公共子序列(LCS)
  • 原文地址:https://www.cnblogs.com/Microshaoft/p/1866131.html
Copyright © 2011-2022 走看看