zoukankan      html  css  js  c++  java
  • Actor Mailbox

    ActorMailbox:

    internal static class MailboxStatus
        {
            public const int Idle = 0;
            public const int Busy = 1;
        }
    
        public class UnboundedMailboxQueue
        {
            private readonly ConcurrentQueue<object> _messages = new ConcurrentQueue<object>();
    
            public void Push(object message)
            {
                _messages.Enqueue(message);
            }
    
            public object Pop()
            {
                object message;
                return _messages.TryDequeue(out message) ? message : null;
            }
    
            public bool HasMessages { get { return !_messages.IsEmpty; } }
        }
    
        public interface IMessageInvoker
        {
            Task InvokeMessageAsync(object msg);
            void EscalateFailure(Exception reason, object message);
        }
    
        public interface IDispatcher
        {
            int Throughput { get; }
            void Schedule(Func<Task> runner);
        }
    
        public sealed class ThreadPoolDispatcher : IDispatcher
        {
            public ThreadPoolDispatcher()
            {
                Throughput = 300;
            }
    
            public void Schedule(Func<Task> runner) => Task.Factory.StartNew(runner, TaskCreationOptions.None);
    
            public int Throughput { get; set; }
        }
    
        public static class Dispatchers
        {
            public static ThreadPoolDispatcher DefaultDispatcher { get; } = new ThreadPoolDispatcher();
        }
    
        public class ActorMailbox
        {
            private readonly UnboundedMailboxQueue _mailbox;
            private IDispatcher _dispatcher;
            private IMessageInvoker _invoker;
    
            private int _status = MailboxStatus.Idle;
    
            public ActorMailbox()
            {
                _mailbox = new UnboundedMailboxQueue();
            }
    
            public void RegisterHandlers(IMessageInvoker invoker)
            {
                _invoker = invoker;
                _dispatcher = Dispatchers.DefaultDispatcher;
            }
    
            public void PostMessage(object msg)
            {
                _mailbox.Push(msg);
                Schedule();
            }
    
            private Task RunAsync()
            {
                var done = ProcessMessages();
    
                if (!done)
                    // mailbox is halted, awaiting completion of a message task, upon which mailbox will be rescheduled
                    return Task.FromResult(0);
    
                Interlocked.Exchange(ref _status, MailboxStatus.Idle);
    
                if (_mailbox.HasMessages)
                {
                    Schedule();
                }
                return Task.FromResult(0);
            }
    
            private bool ProcessMessages()
            {
                object msg = null;
                try
                {
                    for (var i = 0; i < _dispatcher.Throughput; i++)
                    {
                        if ((msg = _mailbox.Pop()) != null)
                        {
                            var t = _invoker.InvokeMessageAsync(msg);
                            if (t.IsFaulted)
                            {
                                _invoker.EscalateFailure(t.Exception, msg);
                                continue;
                            }
                            if (!t.IsCompleted)
                            {
                                // if task didn't complete immediately, halt processing and reschedule a new run when task completes
                                t.ContinueWith(RescheduleOnTaskComplete, msg);
                                return false;
                            }
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                catch (Exception e)
                {
                    _invoker.EscalateFailure(e, msg);
                }
                return true;
            }
    
            private void RescheduleOnTaskComplete(Task task, object message)
            {
                if (task.IsFaulted)
                {
                    _invoker.EscalateFailure(task.Exception, message);
                }
                _dispatcher.Schedule(RunAsync);
            }
    
            protected void Schedule()
            {
                if (Interlocked.CompareExchange(ref _status, MailboxStatus.Busy, MailboxStatus.Idle) == MailboxStatus.Idle)
                {
                    _dispatcher.Schedule(RunAsync);
                }
            }
    
        }
    

      使用方法:

    class Program
        {
            public static AutoResetEvent resetEvent = new AutoResetEvent(false);
            public static int maxCount = 10000000;
            static void Main(string[] args)
            { 
                RoomData room = new RoomData();
                Stopwatch watch = new Stopwatch();
                watch.Start();
                int j = 1;
                for (int i = 0; i < maxCount; i++)
                {
                    room.Tell(i);   
                    //j++;
                    //if (j >= Program.maxCount)
                    //    Program.resetEvent.Set();
                }
                resetEvent.WaitOne();
                watch.Stop();
                Console.WriteLine("{0},{1}", maxCount * 1000.0 / watch.ElapsedMilliseconds, watch.ElapsedMilliseconds);
                Console.Read();
            }
        }
    
        public class TestMessage
        {
            public string Message { get; set; }
        }
    
        public class MailboxHandler : IMessageInvoker
        {
            int i = 1;
            public Task InvokeMessageAsync(object msg)
            {
                //return ((TestMessage)msg).TaskCompletionSource.Task;
                //Console.Write(msg);
                //throw new Exception(msg.ToString());
                i++;
                if (i >= Program.maxCount)
                    Program.resetEvent.Set();
                return Task.FromResult(0);
            }
    
            public void EscalateFailure(Exception reason, object message)
            {
                //EscalatedFailures.Add(reason);
                Console.WriteLine("执行异常:{0},{1},{2}", message, reason.Message, reason.StackTrace);
            }
        }
    
        public class RoomData
        {
            private ActorMailbox _mailbox;
            private MailboxHandler _handler;
            public RoomData()
            {
                _mailbox = new ActorMailbox();
                _handler = new MailboxHandler();
                _mailbox.RegisterHandlers(_handler);
            }
    
            public void Tell(object msg)
            {
                _mailbox.PostMessage(msg);
            }
        }
    

      

  • 相关阅读:
    [20141124]sql server密码过期,通过SSMS修改策略报错
    [20141121]无法通过powershell读取sql server性能计数器问题
    深入解析Windows操作系统笔记——CH3系统机制
    深入解析Windows操作系统笔记——CH2系统结构
    [MySQL Reference Manual] 8 优化
    [20140928]创建连接到MySQL的连接服务器
    [20140829]spinlock导致cpu居高不下
    Percona XtraBackup User Manual 阅读笔记
    [MySQL Reference Manual] 7 备份和恢复
    [Linux 存储管理] LVM结构
  • 原文地址:https://www.cnblogs.com/Jeece/p/8094039.html
Copyright © 2011-2022 走看看