zoukankan      html  css  js  c++  java
  • Remoting Generic Async Queue (Release 2) override InitializeLifetimeService return null

    
    /*
    Remoting 异步队列实现,流程如下
    1.并发若干客户端程序通过调用 RemotingQueue Server 提供的公开远程方法 Enqueue 将数据元素入队尾
    2.RemotingQueue Server 发现队列不为空,则并发若干线程陆续 Dequeue 队首数据元素并处理
    注意:
    1.队列的数据元素定义需自行实现
    2.对出列数据元素的处理程序需自行实现
    */
    //AsyncQueue.cs
    namespace Microshaoft
    {
        using System;
        using System.Threading;
        using System.Collections.Generic;
        public class AsyncQueue<T>
                            where T : class
        {
            public delegate void QueueEventHandler(T element);
            public event QueueEventHandler OnDequeue;
            public delegate void QueueLogEventHandler(string logMessage);
            public event QueueLogEventHandler OnQueueLog;
            public delegate void ExceptionEventHandler(Exception exception);
            public event ExceptionEventHandler OnException;
            private Queue<T> _queue = new Queue<T>();
            private static object _SyncLockObject = new object();
            private int _concurrentThreadsCount = 0; //Microshaoft 用于控制并发线程数
            private volatile bool _queueRuning = false;
            private int _maxConcurrentThreadsCount = 1; //Microshaoft 允许并发出列处理线程数为 1
            public int MaxConcurrentThreadsCount
            {
                set
                {
                    _maxConcurrentThreadsCount = value;
                }
                get
                {
                    return _maxConcurrentThreadsCount;
                }
            }
            private long _EnqueueCount = 0; //入列计数器
            public long EnqueueCount
            {
                get
                {
                    return _EnqueueCount;
                }
            }
            private long _DequeueCount = 0; //出列计数器
            public long DequeueCount
            {
                get
                {
                    return _DequeueCount;
                }
            }
            //Microshaoft 服务启动后可立即开启新的线程调用此方法(死循环)
            private void QueueRun() //Microshaoft ThreadStart
            {
                if (!_queueRuning)
                {
                    _queueRuning = true;
                    lock (_SyncLockObject)
                    {
                        ThreadStart ts = new ThreadStart(QueueRunThreadProcess);
                        Thread t = new Thread(ts);
                        t.Name = "QueueRunThreadProcess";
                        t.Start();
                    }
                }
            }
            public int Count
            {
                get
                {
                    return _queue.Count;
                }
            }
            public int ConcurrentThreadsCount
            {
                get
                {
                    return _concurrentThreadsCount;
                }
            }
            private void QueueRunThreadProcess()
            {
                if (OnQueueLog != null)
                {
                    OnQueueLog
                        (
                            string.Format
                                    (
                                        "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                        , "Queue Runing Start ..."
                                        , _concurrentThreadsCount
                                        , _queue.Count
                                        , Thread.CurrentThread.Name
                                    )
                        );
                }
                while (_queue.Count > 0) //Microshaoft 死循环
                {
                    T element = null;
                    int threadID = -1;
                    lock (_SyncLockObject)
                    {
                        if (_concurrentThreadsCount < _maxConcurrentThreadsCount)
                        {
                            if (_queue.Count > 0)
                            {
                                Interlocked.Increment(ref _concurrentThreadsCount);
                                threadID = _concurrentThreadsCount;
                                if (_concurrentThreadsCount >= _maxConcurrentThreadsCount)
                                {
                                    if (OnQueueLog != null)
                                    {
                                        OnQueueLog
                                            (
                                                string.Format
                                                        (
                                                            "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                                            , "Threads is Full!"
                                                            , _concurrentThreadsCount
                                                            , _queue.Count
                                                            , Thread.CurrentThread.Name
                                                        )
                                            );
                                    }
                                }
                                if (OnQueueLog != null)
                                {
                                    OnQueueLog
                                        (
                                            string.Format
                                                    (
                                                        "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                                        , "Threads ++ !"
                                                        , _concurrentThreadsCount
                                                        , _queue.Count
                                                        , Thread.CurrentThread.Name
                                                    )
                                        );
                                }
                                element = _queue.Dequeue();
                            }
                        }
                    }
                    if (element != null)
                    {
                        //Microshaoft ThreadPool.QueueUserWorkelement(new WaitCallback(OnDequeueThreadProcess), element);
                        ThreadProcessState tps = new ThreadProcessState();
                        tps.element = element;
                        tps.Sender = this;
                        Thread t = new Thread(new ThreadStart(tps.ThreadProcess));
                        t.Name = string.Format("ConcurrentThread[{0}]", threadID);
                        t.Start();
                    }
                }
                _queueRuning = false;
                if (OnQueueLog != null)
                {
                    OnQueueLog
                        (
                            string.Format
                                (
                                    "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                    , "Queue Runing Stopped!"
                                    , _concurrentThreadsCount
                                    , _queue.Count
                                    , Thread.CurrentThread.Name
                                )
                        );
                }
            }
            public void Enqueue(T element)
            {
                try
                {
                    lock (_SyncLockObject) //还算并发吗?
                    {
                        _queue.Enqueue(element);
                    }
                    Interlocked.Increment(ref _EnqueueCount);
                }
                catch (Exception e)
                {
                    if (OnException != null)
                    {
                        OnException(e);
                    }
                }
                if (!_queueRuning)
                {
                    QueueRun();
                }
            }
            private void OnDequeueThreadProcess(T element)
            {
                try
                {
                    if (OnDequeue != null)
                    {
                        OnDequeue(element);
                    }
                    Interlocked.Increment(ref _DequeueCount);
                    DequeueProcess();
                }
                catch (Exception e)
                {
                    if (OnException != null)
                    {
                        OnException(e);
                    }
                }
                finally
                {
                    Interlocked.Decrement(ref _concurrentThreadsCount);
                    if (_concurrentThreadsCount == 0)
                    {
                        if (OnQueueLog != null)
                        {
                            OnQueueLog
                                (
                                    string.Format
                                            (
                                                "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                                , "All Threads Finished!"
                                                , _concurrentThreadsCount
                                                , _queue.Count
                                                , Thread.CurrentThread.Name
                                            )
                                );
                        }
                    }
                    if (OnQueueLog != null)
                    {
                        OnQueueLog
                            (
                                string.Format
                                        (
                                            "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                            , "Threads -- !"
                                            , _concurrentThreadsCount
                                            , _queue.Count
                                            , Thread.CurrentThread.Name
                                        )
                            );
                    }
                }
            }
            private void DequeueProcess()
            {
                while (_queue.Count > 0)
                {
                    T element = null;
                    lock (_SyncLockObject)
                    {
                        if (_queue.Count > 0)
                        {
                            element = _queue.Dequeue();
                        }
                    }
                    if (element != null)
                    {
                        if (OnDequeue != null)
                        {
                            OnDequeue(element);
                        }
                        Interlocked.Increment(ref _DequeueCount);
                    }
                }
            }
            private class ThreadProcessState
            {
                private AsyncQueue<T> _sender;
                public AsyncQueue<T> Sender
                {
                    get
                    {
                        return _sender;
                    }
                    set
                    {
                        _sender = value;
                    }
                }
                private T _element;
                public T element
                {
                    get
                    {
                        return _element;
                    }
                    set
                    {
                        _element = value;
                    }
                }
                public void ThreadProcess()
                {
                    _sender.OnDequeueThreadProcess(_element);
                }
            }
        }
    }
    namespace Test
    {
        using System;
        using System.Threading;
        using Microshaoft;
        public class Class1
        {
            static AsyncQueue<Item> _queue;
            public static void Main1()
            {
                Console.Title = "Client";
                Console.WriteLine(Environment.Version.ToString());
                Class1 a = new Class1();
                a.Run();
                Console.ReadLine();
            }
            public void Run()
            {
                _queue = new AsyncQueue<Item>();
                _queue.OnDequeue += new AsyncQueue<Item>.QueueEventHandler(_queue_OnDequeue);
                _queue.OnQueueLog += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog);
                _queue.OnException += new AsyncQueue<Item>.ExceptionEventHandler(_queue_OnException);
                _queue.MaxConcurrentThreadsCount = 200;
                Thread t = new Thread(new ThreadStart(ConsoleMonitor));
                t.Start();
                //Microshaoft 以下是耗时的主程序
                for (int i = 0; i < 1000; i++)
                {
                    Thread x = new Thread(new ThreadStart(ThreadProcess));
                    x.Start();
                }
            }
            public void ConsoleMonitor()
            {
                Console.WriteLine("press any key to check queue status ...");
                while (Console.ReadLine() != "q")
                {
                    Console.WriteLine
                                (
                                    "Queue elements: {0},Threads count: {1},{2},{3}"
                                    , _queue.Count
                                    , _queue.ConcurrentThreadsCount
                                    , _queue.EnqueueCount
                                    , _queue.DequeueCount
                                );
                }
            }
            void _queue_OnException(Exception e)
            {
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.WriteLine(e.Message);
                Console.ResetColor();
            }
            void _queue_OnQueueLog(string logMessage)
            {
    ///            Console.WriteLine(logMessage);
            }
            void _queue_OnDequeue(Item element)
            {
    ///            DateTime DequeueBeginTime = DateTime.Now;
    ///            DateTime DequeueEndTime = DateTime.Now;
    ///            Console.WriteLine
    ///                        (
    ///                            "QueueRemainCount {0}, Enqueue {1}, Dequeue {2},[{3}], End {4},[{5}],[{6}]"
    ///                            , _queue.Count
    ///                            , element.EnqueueTime
    ///                            , DequeueBeginTime
    ///                            , (DequeueBeginTime.Ticks - element.EnqueueTime.Ticks) / 10000 /1000
    ///                            , DequeueEndTime
    ///                            , (DequeueEndTime.Ticks - DequeueBeginTime.Ticks) / 10000 /1000
    ///                            , _queue.ConcurrentThreadsCount
    ///                        );
    ///            Console.WriteLine(element.EnqueueTime);
                Thread.Sleep(1);
            }
            public void ThreadProcess()
            {
                for (int i = 0; i < 1000; i++)
                {
                    Item x = new Item();
                    DateTime EnqueueTime = DateTime.Now;
                    x.Name = EnqueueTime.ToString();
                    x.EnqueueTime = EnqueueTime;
                    _queue.Enqueue(x);
    ///                Console.WriteLine
    ///                            (
    ///                                "Enqueue: {0},[{1}]"
    ///                                , EnqueueTime
    ///                                , (DateTime.Now.Ticks - EnqueueTime.Ticks) / 10000 / 1000
    ///                            );
                }
            }
        }
    }
    namespace Test
    {
        using System;
        public class Item
        {
            private string _Name;
            public string Name
            {
                get
                {
                    return _Name;
                }
                set
                {
                    _Name = value;
                }
            }
            private DateTime _EnqueueTime;
            public DateTime EnqueueTime
            {
                get
                {
                    return _EnqueueTime;
                }
                set
                {
                    _EnqueueTime = value;
                }
            }
        }
    }
    //Server.cs
    namespace Microshaoft.RemotingObjects.Server
    {
        using System;
        using System.Threading;
        using System.Collections;
        using System.Runtime.Remoting;
        using System.Runtime.Remoting.Channels;
        using System.Runtime.Remoting.Channels.Tcp;
        using System.Runtime.Serialization.Formatters;
        using System.ServiceProcess;
        using System.ComponentModel;
        using System.Configuration.Install;
        using System.Security.Principal;
        using Microshaoft.RemotingObjects;
        using Microshaoft.RemotingObjects.Share;
        using Microshaoft.Win32;
        public class ServiceHost : ServiceBase
        {
            ///// <summary>
            /// 应用程序的主入口点。
            /// </summary>
            //[STAThread]
            public static readonly string serviceName = "RemotingAsyncQueueService";
            static void Main(string[] args)
            {
                //Microshaoft
                //Microshaoft TODO: 在此处添加代码以启动应用程序
                //Microshaoft 
                ServiceHost service = new ServiceHost();
                int l = 0;
                bool needFreeConsole = false;
                if (args != null)
                {
                    l = args.Length;
                }
                if (l > 0)
                {
                    if (args[0].ToLower() == "/console")
                    {
                        needFreeConsole = true;
                        NativeMethods.AllocConsole();
                        Console.Title = "Server ...";
                        Console.WriteLine("Alloc Console ...");
                        Console.WriteLine("Current User Identity: {0}", WindowsIdentity.GetCurrent().Name);
                        Console.WriteLine(".Net Framework version: {0}", Environment.Version.ToString());
                    }
                    Console.Title = "Server"; //不能以服务运行
                    Console.WriteLine("Console");
                    service.OnStart(null);
                    Console.ReadLine();
                    return;
                }
                Console.WriteLine("Service");
                ServiceBase.Run(service);
                if (needFreeConsole)
                {
                    Console.WriteLine("Free Console ...");
                    NativeMethods.FreeConsole();
                }
            }
            public ServiceHost()
            {
                CanPauseAndContinue = true;
                ServiceName = ServiceHost.serviceName;
            }
            protected override void OnStart(string[] args)
            {
                Console.WriteLine(Environment.Version.ToString());
                RemotingHelper.StartRemoting<RemotingAsyncQueue>
                                    (
                                        "queueurl"
                                        , 8080
                                    );
                RemotingAsyncQueue.OnDequeue += new RemotingAsyncQueue.RemotingAsyncQueueEventHandler(DequeueProcess);
                Console.WriteLine("Server . , Press Enter key to exit.");
            }
            public static void DequeueProcess(Item item)
            {
                //Microshaoft TO DO
                //Microshaoft 队列的数据元素定义需自行实现
                //Microshaoft 数据库访问
                //Microshaoft 发邮件等
                //Microshoaft Thread.Sleep(10);
            }
        }
        [RunInstallerAttribute(true)]
        public class ProjectInstaller: Installer
        {
            private ServiceInstaller serviceInstaller;
            private ServiceProcessInstaller processInstaller;
            public ProjectInstaller()
            {
                processInstaller = new ServiceProcessInstaller();
                serviceInstaller = new ServiceInstaller();
                // Service will run under system account
                processInstaller.Account = ServiceAccount.LocalSystem;
                // Service will have Start Type of Manual
                serviceInstaller.StartType = ServiceStartMode.Manual;
                serviceInstaller.ServiceName = ServiceHost.serviceName;
                Installers.Add(serviceInstaller);
                Installers.Add(processInstaller);
            }
        }
    }
    namespace Microshaoft.Win32
    {
        using System.Runtime.InteropServices;
        public class NativeMethods
        {
            /// <summary>
            /// 启动控制台
            /// </summary>
            /// <returns></returns>
            [DllImport("kernel32.dll")]
            public static extern bool AllocConsole();
            /// <summary>
            /// 释放控制台
            /// </summary>
            /// <returns></returns>
            [DllImport("kernel32.dll")]
            public static extern bool FreeConsole();
        }
    }
    namespace Microshaoft.RemotingObjects
    {
        using System;
        using System.IO;
        using System.Net;
        using System.Web;
        using System.Text;
        using System.Threading;
        using System.Configuration;
        using System.Collections.Generic;
        using Microshaoft;
        using Microshaoft.RemotingObjects.Share;
        public class RemotingAsyncQueue : MarshalByRefObject
        {
            //private static AsyncQueue<Item> _AsyncQueue;
            public delegate void RemotingAsyncQueueEventHandler(Item item);
            public static event RemotingAsyncQueueEventHandler OnDequeue;
            private AsyncQueue<Item> _AsyncQueue;
            public RemotingAsyncQueue()
            {
                _AsyncQueue = new AsyncQueue<Item>();
                _AsyncQueue.OnDequeue += new AsyncQueue<Item>.QueueEventHandler(DequeueProcess);
                _AsyncQueue.OnQueueLog += new AsyncQueue<Item>.QueueLogEventHandler(QueueLog);
                _AsyncQueue.MaxConcurrentThreadsCount = 10;
            }
            public override object InitializeLifetimeService()
            {
                return null;
            }
            public void QueueLog(string message)
            {
                //Microshaoft 队列的数据元素定义需自行实现 Item
                Console.WriteLine(message);
            }
            public void Enqueue(Item item)
            {
                //Microshaoft 队列的数据元素定义需自行实现 Item
                _AsyncQueue.Enqueue(item);
            }
            public int ConcurrentThreadsCount
            {
                get
                {
                    return _AsyncQueue.ConcurrentThreadsCount;
                }
            }
            public int Count
            {
                get
                {
                    return _AsyncQueue.Count;
                }
            }
            public void DequeueProcess(Item item)
            {
                DateTime DequeueBeginTime = DateTime.Now;
                if (OnDequeue != null)
                {
                    OnDequeue(item);
                }
                DateTime DequeueEndTime = DateTime.Now;
                Console.WriteLine
                            (
                                "QueueRemainCount {0}, Enqueue {1}, Dequeue {2},[{3}], End {4},[{5}],[{6}]"
                                , _AsyncQueue.Count
                                , item.EnqueueTime
                                , DequeueBeginTime
                                , (DequeueBeginTime.Ticks - item.EnqueueTime.Ticks) / 10000 /1000
                                , DequeueEndTime
                                , (DequeueEndTime.Ticks - DequeueBeginTime.Ticks) / 10000 /1000
                                , _AsyncQueue.ConcurrentThreadsCount
                            );
            }
        }
    }
    //===============================================
    // Share.cs
    //Server、Client 均需引用此 share.dll
    //C:\WINDOWS\Microsoft.NET\Framework\v2.0.50727\csc.exe /t:library share.cs
    //TO DO
    //队列的数据元素定义需自行实现,示例如下:
    namespace Microshaoft.RemotingObjects.Share
    {
        using System;
        [Serializable]
        public class Item
        {
            private string _Name;
            public string Name
            {
                get
                {
                    return _Name;
                }
                set
                {
                    _Name = value; 
                }
            }
            private DateTime _EnqueueTime;
            public DateTime EnqueueTime
            {
                get
                {
                    return _EnqueueTime;
                }
                set
                {
                    _EnqueueTime = value; 
                }
            }
        }
    }
    // remoting helper
    //Share.cs
    namespace Microshaoft
    {
        using System;
        using System.Collections;
        using System.Collections.Generic;
        using System.Runtime.Remoting;
        using System.Runtime.Remoting.Channels;
        using System.Runtime.Remoting.Channels.Tcp;
        using System.Runtime.Serialization.Formatters;
        using System.Text;
        public static class RemotingHelper
        {
            public static void StartRemoting
                (
                    Type RemotingType
                    , string Url
                    , int Port
                )
            {
                BinaryServerFormatterSinkProvider provider = new BinaryServerFormatterSinkProvider();
                provider.TypeFilterLevel = TypeFilterLevel.Full;
                IDictionary ht = new Hashtable();
                ht["port"] = Port;
                TcpChannel tc = new TcpChannel(ht, null, provider);
                ChannelServices.RegisterChannel(tc, false);
                RemotingConfiguration.RegisterWellKnownServiceType(RemotingType, Url, WellKnownObjectMode.Singleton);
                Console.WriteLine("Remoting Object Started ...");
            }
            public static void StartRemoting<T>
                (
                    string Url
                    , int Port
                )
            {
                StartRemoting(typeof(T), Url, Port);
            }
            public static T GetRemotingLocalClientProxyObject<T>
                (
                    string Url
                )
            {
                return (T) Activator.GetObject
                                        (
                                            typeof(T)
                                            , Url
                                            //, "tcp://127.0.0.1:8080/queueUrl"
                                        );
            }
        }
    }
    //============================================
    // Client.cs
    //C:\WINDOWS\Microsoft.NET\Framework\v1.1.4322\csc.exe client.cs /r:share.dll
    namespace Microshaoft.RemotingObjects.Client
    {
        using System;
        using System.Collections;
        using System.Runtime.Remoting;
        using System.Runtime.Remoting.Channels;
        using System.Runtime.Remoting.Channels.Tcp;
        using System.Runtime.Serialization.Formatters;
        using System.Threading;
        using Microshaoft.RemotingObjects;
        using Microshaoft.RemotingObjects.Share;
        public class Class1
        {
            static RemotingAsyncQueue _queue;
            public static void Main()
            {
                Console.Title = "Client";
                Console.WriteLine(Environment.Version.ToString());
                Class1 a = new Class1();
                a.Run();
            }
            public void Run()
            {
                _queue = RemotingHelper.GetRemotingLocalClientProxyObject<RemotingAsyncQueue>("tcp://127.0.0.1:8080/queueUrl");
                //Microshaoft 以下是耗时的主程序
                for (int i = 0; i < 50; i++)
                {
                    Thread x = new Thread(new ThreadStart(ThreadProcess));
                    x.Start();
                }
            }
            public void ThreadProcess()
            {
                for (int i = 0; i < 800; i++)
                {
                    Item x = new Item();
                    DateTime EnqueueTime = DateTime.Now;
                    x.Name = EnqueueTime.ToString();
                    x.EnqueueTime = EnqueueTime;
                    _queue.Enqueue(x);
                    Console.WriteLine
                                (
                                    "Enqueue: {0},[{1}]"
                                    , EnqueueTime
                                    , (DateTime.Now.Ticks - EnqueueTime.Ticks)/10000/1000
                                );
                }
            }
        }
    }
    //Microshaoft =========================================
    //Microshaoft Remoting Object Client Local Proxy
    namespace Microshaoft.RemotingObjects
    {
        using System;
        using Microshaoft.RemotingObjects.Share;
        public interface RemotingAsyncQueue
        {
            void Enqueue(Item item);
        }
    }
    
    
  • 相关阅读:
    创建线程的方式三:实现Callable接口
    线程通信的应用 经典例题: 生产者/消费者问题
    剑指offer-26 树的子结构
    leetcode-567 字符串排列
    剑指offer -11题
    leetcode
    移动端开发利器vConsole.js,app内嵌H5开发时调试用
    TCP/IP分为几层?各层的作用是什么?
    如何测试一支笔
    日志分析
  • 原文地址:https://www.cnblogs.com/Microshaoft/p/1329064.html
Copyright © 2011-2022 走看看