zoukankan      html  css  js  c++  java
  • 接上文,支持并发数量的完美版本

    增加了并发数量的支持,做了些完美主义化:

        public abstract class MessageQueueConcurrentHandlerBase<T> : IMessageQueueHandler
        {
            public MessageQueueConcurrentHandlerBase(string queueName, int maxConcurrency = 5, Action<string> logDelegate = null)
            {
                if (!MessageQueue.Exists(queueName))
                    throw new Exception(string.Format("No such a queue: {0}", queueName));
                if (maxConcurrency < 1)
                    throw new ArgumentOutOfRangeException("maxConcurrency");
    
                this._queueName = queueName;
                this._poolForConsumer = new Semaphore(0, maxConcurrency);
                this._producerAutoResetEvent = new AutoResetEvent(false);
                this._maxConcurrency = maxConcurrency;
                this._logDelegate = logDelegate;
            }
    
            public void StartRead()
            {
                this._queue = new MessageQueue(this._queueName) { Formatter = new XmlMessageFormatter(new Type[] { typeof(long) }) };
                this._queue.PeekCompleted += new PeekCompletedEventHandler(Produce);
                this._producerAutoResetEvent.Set();
                this._poolForConsumer.Release(this._maxConcurrency);
    
                this._queue.BeginPeek();
            }
    
            public override string ToString()
            {
                return string.Format("{0}-{1}", this.HandlerName, this._queueName);
            }
    
            public long CurrentWorkerCount { get { return Interlocked.Read(ref this._currentWorkerCount); } }
    
            public int MaxConcurrency { get { return _maxConcurrency; } }
    
            protected abstract string HandlerName { get; }
    
            protected abstract void MainProcess(T messageItem, string consumerName);
    
            protected void LogInfo(string msg)
            {
                if (_logDelegate != null)
                {
                    this._logDelegate(msg);
                }
            }
    
            #region private
            private void Produce(object sender, PeekCompletedEventArgs e)
            {
                this._producerAutoResetEvent.WaitOne();
    
                var message = this._queue.EndPeek(e.AsyncResult);
    
                long consumerIndex = Interlocked.Increment(ref this._consumerIndex);
                ThreadPool.QueueUserWorkItem(new WaitCallback(this.Consume), consumerIndex);
                this._queue.BeginPeek();
            }
    
            private void Consume(object stateInfo)
            {
                long consumerIndex = (long)stateInfo;
                string consumerName = string.Format("{0}-{1}", this.HandlerName, consumerIndex);
    
                this._poolForConsumer.WaitOne();
    
                var message = this._queue.Receive();
                this._producerAutoResetEvent.Set();
    
                T messageItem = (T)message.Body;
    
                this.LogInfo(string.Format("{0} Received a message, MessageItem = {1}", consumerName, messageItem));
                Interlocked.Increment(ref this._currentWorkerCount);
    
                try
                {
                    this.LogInfo(string.Format("{0} will run MainProcess, MessageItem = {1}, CurrentWorkerCount = {2}", consumerName, messageItem, this.CurrentWorkerCount));
                    MainProcess(messageItem, consumerName);
                }
                catch (Exception ex)
                {
                    this.HandleException(ex, messageItem);
                }
                finally
                {
                    Interlocked.Decrement(ref this._currentWorkerCount);
    
                    this.LogInfo(string.Format("{0} run over, messageItem = {1}, CurrentWorkerCount = {2}", consumerName, messageItem, this.CurrentWorkerCount));
                }
    
                this._poolForConsumer.Release();
            }
    
            private void HandleException(Exception ex, T messageItem)
            {
                this.LogInfo(string.Format("Exception in {0}:[Message]={1},[StackTrace]={2},[Type]={3},[CurrentWorkerCount]={4},[messageItem]={5}", this.HandlerName, ex.GetBaseException().Message, ex.StackTrace, ex.GetType(), this.CurrentWorkerCount, messageItem));
            }
    
            private readonly string _queueName;
            private MessageQueue _queue;
            private long _currentWorkerCount;
            private Semaphore _poolForConsumer;
            private AutoResetEvent _producerAutoResetEvent;
            private int _maxConcurrency;
            private Action<string> _logDelegate;
            private long _consumerIndex = 0;
            #endregion
        }

    以下是测试代码,觉得好就表扬一下啊:)

        class Program : MessageQueueConcurrentHandlerBase<long>
        {
            public Program(string queueName, int maxConcurrency, Action<string> logDelegate)
                : base(queueName, maxConcurrency, logDelegate)
            {
    
            }
    
            protected override void MainProcess(long messageItem, string consumerName)
            {
                Thread.Sleep(5 * 1000);
                this.LogInfo(string.Format("{0}-MainProcess Over:	messageItem = {1}, CurrentWorkerCount = {2}", consumerName, messageItem, this.CurrentWorkerCount));
            }
    
            protected override string HandlerName { get { return "TestQueueHandler"; } }
    
            static void Main(string[] args)
            {
                string queuePath = CreateQueueIfNotExist("MyQueue");
                MessageQueue myQueue = new MessageQueue(queuePath);
    
                IMessageQueueHandler messageQueueHandler = new Program(queuePath, 3, s => { Console.WriteLine(s); });
                messageQueueHandler.StartRead();
                long x = 0;
                do
                {
                    foreach (var s in Console.ReadLine().Split(",".ToCharArray()))
                    {
                        if (long.TryParse(s.Trim(), out x))
                            myQueue.Send(x);
                    }
                } while (x > 0);
    
                Console.ReadLine();
            }
    
            private static string CreateQueueIfNotExist(string queueName)
            {
                string queuePath = string.Format(@".Private${0}", queueName);
                if (!MessageQueue.Exists(queuePath))
                    MessageQueue.Create(queuePath);
    
                return queuePath;
            }
        }
  • 相关阅读:
    大家一起写mvc(二)
    大家一起写mvc(一)
    jquery读取XML 生成页面文件
    jquery点击区域显示或隐藏DIV,点击非该DIV的地方隐藏该DIV
    struts2 iterator排序
    解决JS传参中文乱码
    关于解决 请求被中止:无法建立SSL / TLS安全通道
    查看sqlserver被锁的表以及如何解锁
    查询sqlserver数据库视图、存储过程等包含特定的字符串
    C#中查询字符串中是否包含指定字符/串,使用IndexOf还是Contains?
  • 原文地址:https://www.cnblogs.com/bighuiwolf/p/3972091.html
Copyright © 2011-2022 走看看