zoukankan      html  css  js  c++  java
  • 利用TaskScheduler处理Queue、Stack等类型的操作队列(生产者消费者场景)

    我们经常会遇到生产者消费者模式,比如前端各种UI操作事件触发后台逻辑等。在这种典型的应用场景中,我们可能会有4个业务处理逻辑(下文以P代表生产者,C代表消费者):

    1. FIFO(先进先出)
         P产生1,2,3,4,5,6,3,2
         C处理顺序应为1,2,3,4,5,6,3,2
    2.LIFO(后进先出)
         P产生1,2,3,4,5,6,3,2
         C处理顺序应为2,3,6,5,4,3,2,1
    3.Dynamic FIFO(我定义为:去掉相同数据的FIFO, 如果产生的数据队列里已经有相同数据,后进的数据优先级高)
         P产生1,2,3,4,5,6,3,2
         C处理顺序为1,4,5,6,3,2

    4.Dynamic LIFO(我定义为:去掉相同数据的LIFO, 如果产生的数据栈里已经有相同数据,后进的数据优先级高)
         P产生1,2,3,4,5,6,3,2
         C处理顺序为2,3,6,5,4,1
         

    1,2情况为基本处理逻辑,3,4可能和我们实际场景有关系(包括:判断相同的逻辑可能不同、已存在和后续数据哪个优先级高)

    C#中有个Task类进行异步操作,我们可以通过TaskScheduler类进行任务调度,实现上述的4种基本场景。

    定义上述4种场景的通用接口以及其遍历类
    public interface IScheduler : IEnumerable<Task >
        {
            void Add (Task t);
            void Remove (Task t);

            int Count { get; }
            Task this [int index] { get; set ; }
        }

        public class SchedulerEnumerator : IEnumerator< Task>
        {
            private IScheduler _collection;
            private int _currentIndex;
            private Task _currentTask;


            public SchedulerEnumerator (IScheduler collection)
            {
                _collection = collection ;
                _currentIndex = -1;
                _currentTask = default (Task);
            }

            public bool MoveNext()
            {
                //Avoids going beyond the end of the collection.
                if (++_currentIndex >= _collection. Count)
                {
                    return false ;
                }
                else
                {
                    // Set current box to next item in collection.
                    _currentTask = _collection [_currentIndex];
                }
                return true ;
            }

            public void Reset() { _currentIndex = -1; }

            void IDisposable .Dispose() { }

            public Task Current
            {
                get { return _currentTask; }
            }


            object IEnumerator .Current
            {
                get { return Current; }
            }

        }


    实现我们自己的任务调度类模板,可以通过T传递我们想要的队列类型
     public class TaskSchedulerBase <T> : TaskScheduler
            where T : IScheduler , new ()

        {
            private Thread _processThread;
            private readonly object _lock = new object ();

            public TaskSchedulerBase()
            {
                _processThread = new Thread (this.Process);
            }

            private void Process()
            {
                lock (_lock)
                {
                    var tasks = GetScheduledTasks();
                    if (null != tasks)
                    {
                        foreach (var t in tasks)
                        {
                            TryExecuteTask(t);

                            TryDequeue(t);
                        }
                    }
                }
            }

            protected override void QueueTask( Task task)
            {
                lock (_lock)
                {
                    Scheduler.Add(task);

                    if (_processThread.ThreadState.Equals(ThreadState .Stopped))
                    {
                        _processThread = new Thread (Process);
                    }

                    if (!_processThread.IsAlive
                        && !_processThread.ThreadState.Equals( ThreadState.Running))
                    {
                        try
                        {
                            _processThread.Start();
                        }
                        catch (System.Exception )
                        {
                            if (!_processThread.ThreadState.Equals(ThreadState .Running))
                            {
                                _processThread = new Thread (Process);
                                _processThread.Start();
                            }
                        }
                    }
                }
            }

            protected override bool TryDequeue( Task task)
            {
                Scheduler.Remove(task);

                return true ;
            }

            protected override IEnumerable< Task> GetScheduledTasks()
            {
                return Scheduler.ToArray();
            }

            protected override bool TryExecuteTaskInline( Task task, bool taskWasPreviouslyQueued)
            {
                if (taskWasPreviouslyQueued)
                {
                    if (TryDequeue(task))
                    {
                        return base .TryExecuteTask(task);
                    }
                    else
                    {
                        return false ;
                    }
                }
                else
                {
                    return base .TryExecuteTask(task);
                }
            }

            private readonly T _scheduler = new T();
            public T Scheduler
            {
                get
                {
                    return _scheduler;
                }
            }
        }

    实现4种队列
         1.FIFO
         
     public class QueueScheduler : IScheduler
        {
            protected Queue <Task> _queue;

            public QueueScheduler ()
            {
                _queue = new Queue< Task>();
            }

            public void Add( Task t )
            {
                if (!Contains (t))
                {
                    _queue.Enqueue (t);
                }
            }

            public void Remove( Task t )
            {
                _queue.Dequeue ();
            }

            public bool Contains( Task t )
            {
                bool found = false;

                foreach (var task in _queue )
                {
                    if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))
                    {
                        found = true ;
                        break;
                    }
                }

                return found ;
            }

            public bool Contains( Task t , EqualityComparer< Task> comp )
            {
                throw new NotImplementedException();
            }

            public IEnumerator <Task> GetEnumerator()
            {
                return new SchedulerEnumerator( this);
            }

            IEnumerator IEnumerable .GetEnumerator()
            {
                return new SchedulerEnumerator( this);
            }

            public int Count
            {
                get { return _queue. Count; }
            }

            public Task this[ int index]
            {
                get { return (Task) _queue.ToArray ()[index]; }
                set { _queue .ToArray()[index] = value; }
            }
        }

         2.LIFO
    public class StackScheduler : IScheduler
        {
            protected Stack <Task> _stack;

            public StackScheduler ()
            {
                _stack = new Stack< Task>();
            }

            public void Add( Task t )
            {
                if (!Contains (t))
                {
                    _stack.Push (t);
                }
            }

            public void Remove( Task t )
            {
                _stack.Pop ();
            }

            public bool Contains( Task t )
            {
                bool found = false;

                foreach (var task in _stack )
                {
                    if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))
                    {
                        found = true ;
                        break;
                    }
                }

                return found ;
            }

            public bool Contains( Task t , EqualityComparer< Task> comp )
            {
                throw new NotImplementedException();
            }

            public IEnumerator <Task> GetEnumerator()
            {
                return new SchedulerEnumerator( this);
            }

            IEnumerator IEnumerable .GetEnumerator()
            {
                return new SchedulerEnumerator( this);
            }

            public int Count
            {
                get { return _stack. Count; }
            }

            public Task this[ int index]
            {
                get { return (Task) _stack.ToArray ()[index]; }
                set { _stack .ToArray()[index] = value; }
            }
        }

         3.Dynamic FIFO

    public class DynamicQueueScheduler : IScheduler
        {
            protected List <Task> _queue;

            public DynamicQueueScheduler ()
            {
                _queue = new List< Task>();
            }

            public virtual void Add(Task t)
            {
                Task oldTask = null;
                if (Contains (t, out oldTask ))
                {
                    _queue.Remove (oldTask);
                }
      
                _queue.Add (t);
            }

            public virtual void Remove(Task t)
            {
                _queue.Remove (t);
            }

            public virtual bool Contains(Task t)
            {
                Task oldTask = null;
                bool found = Contains( t, out oldTask);
                return found ;
            }

            public virtual bool Contains(Task t, out Task oldTask)
            {
                bool found = false;
                oldTask = null ;

                foreach (var task in _queue )
                {
                    if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))
                    {
                        oldTask = task ;
                        found = true ;

                        break;
                    }
                }

                return found ;
            }

            public virtual bool Contains(Task t, EqualityComparer<Task > comp)
            {
                throw new NotImplementedException();
            }

            public IEnumerator <Task> GetEnumerator()
            {
                return new SchedulerEnumerator( this);
            }

            IEnumerator IEnumerable .GetEnumerator()
            {
                return new SchedulerEnumerator( this);
            }

            public int Count
            {
                get { return _queue. Count; }
            }

            public Task this[ int index]
            {
                get { return (Task) _queue[index]; }
                set { _queue [index] = value; }
            }
        }

         4.Dynamic LIFO
     public class DynamicStackScheduler : IScheduler
        {
            protected List <Task> _queue;

            public DynamicStackScheduler ()
            {
                _queue = new List< Task>();
            }

            public void Add( Task t )
            {
                Task oldTask = null;
                if (Contains (t, out oldTask ))
                {
                    _queue.Remove (oldTask);
                }
      
                _queue.Insert (0,t);
            }

            public void Remove( Task t )
            {
                _queue.Remove (t);
            }

            public bool Contains( Task t )
            {
                Task oldTask = null;
                bool found = Contains( t, out oldTask);
                return found ;
            }

            public bool Contains( Task t , out Task oldTask )
            {
                bool found = false;
                oldTask = null ;

                foreach (var task in _queue )
                {
                    if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))
                    {
                        oldTask = task ;
                        found = true ;

                        break;
                    }
                }

                return found ;
            }

            public bool Contains( Task t , EqualityComparer< Task> comp )
            {
                throw new NotImplementedException();
            }

            public IEnumerator <Task> GetEnumerator()
            {
                return new SchedulerEnumerator( this);
            }

            IEnumerator IEnumerable .GetEnumerator()
            {
                return new SchedulerEnumerator( this);
            }

            public int Count
            {
                get { return _queue. Count; }
            }

            public Task this[ int index]
            {
                get { return (Task) _queue[index]; }
                set { _queue [index] = value; }
            }
        }


    测试代码
      class Program
        {
            static Queue <int> _queue = new Queue< int>();

            //static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<QueueScheduler>());
            //static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<StackScheduler>());
            //static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicQueueScheduler>());
            //static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicStackScheduler>());
            static TaskFactory _factory = new TaskFactory (new TaskSchedulerBase<DynamicQueueScheduler >());
            static void Main( string[] args )
            {
                var thread1 = new Thread(Producer );
                var thread2 = new Thread(Consumer );

                thread1.Start ();
                thread2.Start ();

                Console.ReadKey ();
            }

            static void Producer()
            {
                for (int i = 0; i < 7; i ++)
                {
                    _queue.Enqueue (i);
                }

                _queue.Enqueue (3);
                _queue.Enqueue (2);
            }

            static void Consumer()
            {
                while (true )
                {
                    if (_queue .Count > 0)
                    {
                        foreach (var i in _queue )
                        {
                            _factory.StartNew ((s) =>
                            {
                                Console.Write ("{0} on thread {1} {2} ", s, Thread.CurrentThread .ManagedThreadId,
                                              DateTime.Now .ToLongTimeString());
                            }, i);
                        }

                        _queue.Clear ();
                    }
                    else
                    {
                        Thread.Sleep (1);
                    }
                }
            }
        }

  • 相关阅读:
    开源云平台离普通用户还有多远?
    你的云桌面和阿里的云主机有什么区别?
    容器和虚拟机谁会是未来云计算的方向?
    白话为什么需要虚拟机和云计算有什么关系
    OpenStack是什么?
    Kubernetes是什么?
    第二夜:万圣节,讲一个关于程序员的故事
    万圣节,讲一个关于程序员的故事
    云计算社区新增两枚 .group 社群专属域名
    正式激活 .group 域名:云原生.社群
  • 原文地址:https://www.cnblogs.com/muzizongheng/p/3756796.html
Copyright © 2011-2022 走看看