zoukankan      html  css  js  c++  java
  • .NET Core中使用IHostedService结合队列执行定时任务

    最近遇到了这样的场景:每隔一段时间,需要在后台使用队列对一批数据进行业务处理。

    Quartz.NET是一种选择,在 .NET Core中,可以使用IHostedService执行后台定时任务。在本篇中,首先尝试把队列还原到最简单、原始的状态,然后给出以上场景问题的具体解决方案。

    假设一个队列有8个元素。现在abcd依次进入队列。

    0 1 2 3 4 5 6 7
    a b c d
    head tail

    ab依次出队列。

    0 1 2 3 4 5 6 7
    c d
    head tail

    可以想象,随着不断地入列出列,head和tail的位置不断往后,当tail在7号位的时候,虽然队列里还有空间,但此时数据就无法入队列了。

    如何才可以继续入队列呢

    首先想到的是数据搬移。当数据无法进入队列,首先让队列项出列,进入到另外一个新队列,这个新队列就可以再次接收数据入队列了。但是,搬移整个队列中的数据的时间复杂度为O(n),而原先出队列的时间复杂度是O(1),这种方式不够理想。

    还有一种思路是使用循环队列。当tail指向最后一个位置,此时有新的数据进入队列,tail就来到头部指向0号位置,这样这个队列就可以循环使用了。

    0 1 2 3 4 5 6 7
    h i j
    head tail

    现在a入栈。

    0 1 2 3 4 5 6 7
    h i j a
    tail head

    队列有很多种实现

    比如在生产消费模型中可以用阻塞队列。当生产队列为空的时候,为了不让消费者取数据,生产队列的Dequeue行为会被阻塞;而当生产队列满的时候,为了不让更多的数据进来,生产队列的Enqueue行为被阻塞。

    线程安全的队列叫并发队列,如C#中的ConcurrentQueue

    线程池内部也使用了队列机制。因为CPU的资源是有限的,过多的线程会导致CPU频繁地在线程之间切换。线程池内通过维护一定数量的线程来减轻CPU的负担。当线程池没有多余的线程可供使用,请求过来,一种方式是拒绝请求,另外一种方式是让请求队列阻塞,等到线程池内有线程可供使用,请求队列出列执行。用链表实现的队列是无界队列(unbounded queue),这种做法可能会导致过多的请求在排队,等待响应时间过长。用数组实现的队列是有界队列(bounded queue),当线程池已满,请求过来就会被拒绝。对有界队列来说数组的大小设置很讲究。

    来模拟一个数组队列。

    public class ArrayQueue
    {
        private string[] items;
        private int n = 0; //数组长度
        private int head = 0;
        private int tail = 0;
    
        public ArrayQueue(int capacity)
        {
            n = capacity;
            items = new string[capacity];
        }
    
        public bool Enqueue(string item)
        {
            if(tail==n){
                return false;
            }
            items[tail] = item;
            ++tail;
            return true;
        }
    
        public string Dequeue()
        {
            if(head==null){
                return null;
            }
            string ret = items[head];
            ++head;
            return ret;
        }
    }
    

    以上就是一个最简单的、用数组实现的队列。

    再次回到要解决的场景问题。解决思路大致是:实现IHostedService接口,在其中执行定时任务,每次把队列项放到队列中,并定义出队列的方法,在其中执行业务逻辑。

    关于队列,通过以下的步骤使其在后台运行。

    • 队列项(MessageQueueItem):具备唯一标识、委托、添加到队列中的时间等属性
    • 队列(MessageQueue):维护着Dictionary<string, MessageQueueItem>静态字典集合
    • MessageQueueUtility类:决定着如何运行,比如队列执行的间隔时间、垃圾回收
    • MessageQueueThreadUtility类:维护队列线程,提供队列在后台运行的方法
    • Startup.cs中的Configure中调用MessageQueueThreadUtility中的方法使队列在后台运行

    队列项(MessageQueueItem)

    public class MessageQueueItem
    {
        public MessageQueueItem(string key, Action action, string description=null)
        {
            Key = key;
            Action = action;
            Description = description;
            AddTime = DateTime.Now;
        }
    
        public string Key{get;set;}
        public Actioin Action{get;set;}
        public DateTime AddTime{get;set;}
        public string Description{get;set;}
    }
    

    队列(MessageQueue),维护着针对队列项的一个静态字典集合。

    public class MessageQueue
    {
        public static Dictionary<string, MessageQueueItem> MessageQueueDictionary = new Dictionary<string, MessageQueueItem>(StringComparer.OrdinalIgnoreCase);
    
        public static object MessageQueueSyncLock = new object();
        public static object OperateLock = new object();
    
        public static void OperateQueue()
        {
            lock(OperateLock)
            {
                var mq = new MessageQueue();
                var key = mq.GetCurrentKey();
                while(!string.IsNullOrEmpty(key))
                {
                    var mqItem = mq.GetItem(key);
                    mqItem.Action();
                    mq.Remove(key);
                    key = mq.GetCurrentKey();
                }
            }
        }
    
        public string GetCurrentKey()
        {
            lock(MessageQueueSyncLock)
            {
                return MessageQueueDictionary.Keys.FirstOrDefault();
            }
        }
    
        public MessageQueueItem GetItem(string key)
        {
            lock(MessageQueueSyncLock)
            {
                if(MessageQueueDictionary.ContainsKey(key))
                {
                    return MessageQueueDictionary[key];
                }
                return null;
            }
        }
    
        public void Remove(string key)
        {
            lock(MessageQueueSyncLock)
            {
                if(MessageQueueDictionary.ContainsKey(key))
                {
                    MessageQueueDictionary.Remove(key);
                }
            }
        }
    
        public MessageQueueItem Add(string key, Action actioin)
        {
            lock(MessageQueueSyncLock)
            {
                var mqItem = new MessageQueueItem(key, action);
                MessageQueueDictionary[key] = mqItem;
                return mqItem;
            }
        }
    
        public int GetCount()
        {
            lock(MessageQueueSyncLock)
            {
                return MessageQueueDictionary.Count;
            }
        }
    }
    

    MessageQueueUtility类, 决定着队列运行的节奏。

    public class MessageQueueUtility
    {
        private readonly int _sleepMilliSeconds;
        public MessageQueueUtility(int sleepMilliSeconds=1000)
        {
            _sleepMilliSeconds = sleepMilliSeoncds;
        }
    
        ~MessageQueueUtility()
        {
            MessageQueue.OperateQueue();
        }
    
        public void Run
        {
            do
            {
                MessageQueue.OperateQueue();
                Thread.Sleep(_sleepMilliSeconds);
            } while(true)
        }
    }
    

    MessageQueueThreadUtility类,管理队列的线程,并让其在后台运行。

    public static class MessageQueueThreadUtility
    {
        public static Dictionary<string, Thread> AsyncThreadCollection = new Dictioanry<string, Thread>();
        public static void Register(string threadUniqueName)
        {
            {
                MessageQueueUtility messageQueueUtility = new MessageQueueUtility();
                Thread messageQueueThread = new Thread(messageQueueUtility.Run){
                    Name = threadUniqueName
                };
                AsyncThreadCollection.Add(messageQueueThread.Name, messageQueueThread);
            }
    
            AsyncThreadCollection.Values.ToList().ForEach(z => {
                z.IsBackground = true;
                z.Start();
            });
        }
    }
    

    Startup.cs中注册。

    public class Startup
    {
        public IServiceProvider ConfigureServices(IServiceCollection services)
        {
            ...
        }
    
        public void Configure(IApplicationBuilder app, IHostingEnvironment env...)
        {
            RegisterMessageQueueThreads();
        }
    
        private void RegisterMessageQueueThreads()
        {
            MessageQueueThreadUtility.Register("");
        }
    }
    

    最后在IHostedService的实现类中把队列项丢给队列。

    public class MyBackgroundSerivce : IHostedService, IDisposable
    {
        private Timer _timer;
        public IServiceProvider Services{get;}
        
        public MyBackgroundService(IServiceProvider services)
        {
            Serivces = services;
        }
    
        public void Dispose()
        {
            _timer?.Dispose();
        }
    
        public Task StartAsync(CancellationToken cancellationToken)
        {
            _timer = new Timer(DoWork, null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
            return Task.CompletedTask;
        }
    
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _timer?.Change(Timeout.Infinite,0);
            return Task.CompletedTask;
        }
    
        private void DoWork(object state)
        {
            using(var scope = Services.CreateScope())
            {
                using(var db = scope.ServiceProvider.GetRequiredService<MyDbContext>())
                {
                    ...
                    var mq = new MessageQueue();
                    mq.Add("somekey", DealQueueItem);
                }
            }
        }
    
        private void DealQueueItem()
        {
            var mq = new MessageQueue();
            var key = mq.GetCurrentKey();
            var item = mq.GetItem(key);
            if(item!=null)
            {
                using(var scope = Services.CreateScope())
                {
                    using(var db = scope.ServiceProvider.GetRequiredService<MyDbContext>())
                    {
                        //执行业务逻辑
                    }
                }
            }
        }
    }
    

    当需要使用上下文的时候,首先通过IServiceProviderCreateScope方法得到ISerivceScope,再通过它的ServiceProvider属性获取依赖倒置容器中的上下文服务。

    以上,用IHostedService结合队列解决了开篇提到的场景问题,如果您有很好的想法,我们一起交流吧。文中的队列部分来自"盛派网络"的Senparc.Weixin SDK源码。

  • 相关阅读:
    干货分享:路由与交换详解大全!
    基于ASCII字符集对比
    css文字两端对齐
    软件版本号(BETA、RC、ALPHA、Release、GA等)
    install和update区别
    Blazor入坑指南
    解决Electron7.0.0的坑,cnpm install electron 安装失败的问题
    Linux查看CPU和内存使用情况
    位运算符在JS中的妙用
    centos7通过yum安装mysql
  • 原文地址:https://www.cnblogs.com/darrenji/p/10254957.html
Copyright © 2011-2022 走看看