zoukankan      html  css  js  c++  java
  • dotnet core系列之Background tasks with hosted services (后台任务)

     这篇简单讲asp.net core 中的后台任务

    用到的包:

    Microsoft.AspNetCore.App metapackage 

    或者加入

    Microsoft.Extensions.Hosting

    一. Timed background tasks(定时后台任务)

    使用到System.Threading.Timer类。定时器触发任务的DoWork方法。定时器在StopAsync上停止,并且释放是在Dispose上

    internal class TimedHostedService : IHostedService, IDisposable
    {
        private readonly ILogger _logger;
        private Timer _timer;
    
        public TimedHostedService(ILogger<TimedHostedService> logger)
        {
            _logger = logger;
        }
    
        public Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Timed Background Service is starting.");
    
            _timer = new Timer(DoWork, null, TimeSpan.Zero, 
                TimeSpan.FromSeconds(5));
    
            return Task.CompletedTask;
        }
    
        private void DoWork(object state)
        {
            _logger.LogInformation("Timed Background Service is working.");
        }
    
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Timed Background Service is stopping.");
    
            _timer?.Change(Timeout.Infinite, 0);
    
            return Task.CompletedTask;
        }
    
        public void Dispose()
        {
            _timer?.Dispose();
        }
    }

    服务是在Startup.ConfigureServices上使用AddHostedService扩展方法注册:

    services.AddHostedService<TimedHostedService>();

    二. Consuming a scoped service in a background task 在后台任务中运行scoped service

    使用IHostService中的scoped services, 创建一个scope. 对于一个hosted service默认没有scope被创建。

    这个scoped 后台任务服务包含后台任务逻辑。下面的例子中,一个ILogger被注入到了service中:

    internal interface IScopedProcessingService
    {
        void DoWork();
    }
    
    internal class ScopedProcessingService : IScopedProcessingService
    {
        private readonly ILogger _logger;
        
        public ScopedProcessingService(ILogger<ScopedProcessingService> logger)
        {
            _logger = logger;
        }
    
        public void DoWork()
        {
            _logger.LogInformation("Scoped Processing Service is working.");
        }
    }

    这个hosted service 创建了一个scope解析了scoped后台任务服务来调用它的DoWork方法:

    internal class ConsumeScopedServiceHostedService : IHostedService
    {
        private readonly ILogger _logger;
    
        public ConsumeScopedServiceHostedService(IServiceProvider services, 
            ILogger<ConsumeScopedServiceHostedService> logger)
        {
            Services = services;
            _logger = logger;
        }
    
        public IServiceProvider Services { get; }
    
        public Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation(
                "Consume Scoped Service Hosted Service is starting.");
    
            DoWork();
    
            return Task.CompletedTask;
        }
    
        private void DoWork()
        {
            _logger.LogInformation(
                "Consume Scoped Service Hosted Service is working.");
    
            using (var scope = Services.CreateScope())
            {
                var scopedProcessingService = 
                    scope.ServiceProvider
                        .GetRequiredService<IScopedProcessingService>();
    
                scopedProcessingService.DoWork();
            }
        }
    
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation(
                "Consume Scoped Service Hosted Service is stopping.");
    
            return Task.CompletedTask;
        }
    }

    服务注册在Startup.ConfigureServices中。IHostedService的实现用AddHostedService扩展方法注册:

    services.AddHostedService<ConsumeScopedServiceHostedService>();
    services.AddScoped<IScopedProcessingService, ScopedProcessingService>();

    三. Queued background tasks 排队的后台任务

    public interface IBackgroundTaskQueue
    {
        void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);
    
        Task<Func<CancellationToken, Task>> DequeueAsync(
            CancellationToken cancellationToken);
    }
    
    public class BackgroundTaskQueue : IBackgroundTaskQueue
    {
        private ConcurrentQueue<Func<CancellationToken, Task>> _workItems = 
            new ConcurrentQueue<Func<CancellationToken, Task>>();
        private SemaphoreSlim _signal = new SemaphoreSlim(0);
    
        public void QueueBackgroundWorkItem(
            Func<CancellationToken, Task> workItem)
        {
            if (workItem == null)
            {
                throw new ArgumentNullException(nameof(workItem));
            }
    
            _workItems.Enqueue(workItem);
            _signal.Release();
        }
    
        public async Task<Func<CancellationToken, Task>> DequeueAsync(
            CancellationToken cancellationToken)
        {
            await _signal.WaitAsync(cancellationToken);
            _workItems.TryDequeue(out var workItem);
    
            return workItem;
        }
    }

    在 QueueHostedService中,队列中的后台任务出队列并且作为BackroundService执行。BackgroundService是一个实现了IHostedService接口的类。

    public class QueuedHostedService : BackgroundService
    {
        private readonly ILogger _logger;
    
        public QueuedHostedService(IBackgroundTaskQueue taskQueue, 
            ILoggerFactory loggerFactory)
        {
            TaskQueue = taskQueue;
            _logger = loggerFactory.CreateLogger<QueuedHostedService>();
        }
    
        public IBackgroundTaskQueue TaskQueue { get; }
    
        protected async override Task ExecuteAsync(
            CancellationToken cancellationToken)
        {
            _logger.LogInformation("Queued Hosted Service is starting.");
    
            while (!cancellationToken.IsCancellationRequested)
            {
                var workItem = await TaskQueue.DequeueAsync(cancellationToken);
    
                try
                {
                    await workItem(cancellationToken);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, 
                       $"Error occurred executing {nameof(workItem)}.");
                }
            }
    
            _logger.LogInformation("Queued Hosted Service is stopping.");
        }
    }

    服务注册在Startup.ConfigureService方法中。IHostedService的实现用AddHostedService扩展方法注册:

    services.AddHostedService<QueuedHostedService>();
    services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();

    在Index page model类中:

    • IBackgroundTaskQueue被注入到构造函数并且指定给Queue
    • 一个 IServiceScopeFactory被注入并且指定给_serviceScopeFactory. 这个工厂用来创建IServiceScope实例, IServiceScope实例是用来在scope内创建 services的。一个scope被创建时为了用应用的AppDbContext(a scoped service)来写数据库记录在 IBackgroundTaskQueue 中(a singleton service).
    public class IndexModel : PageModel
    {
        private readonly AppDbContext _db;
        private readonly ILogger _logger;
        private readonly IServiceScopeFactory _serviceScopeFactory;
    
        public IndexModel(AppDbContext db, IBackgroundTaskQueue queue, 
            ILogger<IndexModel> logger, IServiceScopeFactory serviceScopeFactory)
        {
            _db = db;
            _logger = logger;
            Queue = queue;
            _serviceScopeFactory = serviceScopeFactory;
        }
    
        public IBackgroundTaskQueue Queue { get; }

    当 Index page 上的Add Task按钮被选中时,OnPostAddTask方法被执行。QueueBackgroundWorkItem被调用来使work item入队。

    public IActionResult OnPostAddTaskAsync()
    {
        Queue.QueueBackgroundWorkItem(async token =>
        {
            var guid = Guid.NewGuid().ToString();
    
            using (var scope = _serviceScopeFactory.CreateScope())
            {
                var scopedServices = scope.ServiceProvider;
                var db = scopedServices.GetRequiredService<AppDbContext>();
    
                for (int delayLoop = 1; delayLoop < 4; delayLoop++)
                {
                    try
                    {
                        db.Messages.Add(
                            new Message() 
                            { 
                                Text = $"Queued Background Task {guid} has " +
                                    $"written a step. {delayLoop}/3"
                            });
                        await db.SaveChangesAsync();
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, 
                            "An error occurred writing to the " +
                            $"database. Error: {ex.Message}");
                    }
    
                    await Task.Delay(TimeSpan.FromSeconds(5), token);
                }
            }
    
            _logger.LogInformation(
                $"Queued Background Task {guid} is complete. 3/3");
        });
    
        return RedirectToPage();
    }

    四. 总结

    注意上面的方法都有一个共同点:即直接或间接实现 IHostedService 方法

    IHostedService interface

    Hosted servcies实现IHostService接口. 这个接口定义了两个方法,为被主机管理的对象:

    • StartAsync - StartAsync包含启动后台任务的逻辑。
    • StopAsync - 当host 执行关闭时触发。StopAsync包含终止后台任务的逻辑。实现IDisposable 和finalizers 来释放任意unmanaged resources.

    你可以把这种用法的后台任务加到任意应用,例如web api , mvc , 控制台等,因为后台服务在应用启动时,就被加载了。它是被以服务的方式加到了管道上了

    参考网址:

    https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-2.2&tabs=visual-studio

  • 相关阅读:
    文本特征选择的关键算法总结
    偏置-方差分解(Bias-Variance Decomposition)
    排列木桩
    七夕鹊桥分析
    第五十七课、模型视图设计模式(下)------------------狄泰软件学院
    第五十六课、模型视图设计模式(中)------------------狄泰软件学院
    第五十五课、模型视图设计模式(上)------------------狄泰软件学院
    第八十五课、多线程与界面组件的通信(下)------------------狄泰软件学院
    第八十四课、多线程与界面组件的通信(上)------------------狄泰软件学院
    第八十三课、另一种创建线程的方式------------------狄泰软件学院
  • 原文地址:https://www.cnblogs.com/Vincent-yuan/p/11048748.html
Copyright © 2011-2022 走看看