zoukankan      html  css  js  c++  java
  • dotnet core系列之Background tasks with hosted services (后台任务)

     这篇简单讲asp.net core 中的后台任务

    用到的包:

    Microsoft.AspNetCore.App metapackage 

    或者加入

    Microsoft.Extensions.Hosting

    一. Timed background tasks(定时后台任务)

    使用到System.Threading.Timer类。定时器触发任务的DoWork方法。定时器在StopAsync上停止,并且释放是在Dispose上

    internal class TimedHostedService : IHostedService, IDisposable
    {
        private readonly ILogger _logger;
        private Timer _timer;
    
        public TimedHostedService(ILogger<TimedHostedService> logger)
        {
            _logger = logger;
        }
    
        public Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Timed Background Service is starting.");
    
            _timer = new Timer(DoWork, null, TimeSpan.Zero, 
                TimeSpan.FromSeconds(5));
    
            return Task.CompletedTask;
        }
    
        private void DoWork(object state)
        {
            _logger.LogInformation("Timed Background Service is working.");
        }
    
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Timed Background Service is stopping.");
    
            _timer?.Change(Timeout.Infinite, 0);
    
            return Task.CompletedTask;
        }
    
        public void Dispose()
        {
            _timer?.Dispose();
        }
    }

    服务是在Startup.ConfigureServices上使用AddHostedService扩展方法注册:

    services.AddHostedService<TimedHostedService>();

    二. Consuming a scoped service in a background task 在后台任务中运行scoped service

    使用IHostService中的scoped services, 创建一个scope. 对于一个hosted service默认没有scope被创建。

    这个scoped 后台任务服务包含后台任务逻辑。下面的例子中,一个ILogger被注入到了service中:

    internal interface IScopedProcessingService
    {
        void DoWork();
    }
    
    internal class ScopedProcessingService : IScopedProcessingService
    {
        private readonly ILogger _logger;
        
        public ScopedProcessingService(ILogger<ScopedProcessingService> logger)
        {
            _logger = logger;
        }
    
        public void DoWork()
        {
            _logger.LogInformation("Scoped Processing Service is working.");
        }
    }

    这个hosted service 创建了一个scope解析了scoped后台任务服务来调用它的DoWork方法:

    internal class ConsumeScopedServiceHostedService : IHostedService
    {
        private readonly ILogger _logger;
    
        public ConsumeScopedServiceHostedService(IServiceProvider services, 
            ILogger<ConsumeScopedServiceHostedService> logger)
        {
            Services = services;
            _logger = logger;
        }
    
        public IServiceProvider Services { get; }
    
        public Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation(
                "Consume Scoped Service Hosted Service is starting.");
    
            DoWork();
    
            return Task.CompletedTask;
        }
    
        private void DoWork()
        {
            _logger.LogInformation(
                "Consume Scoped Service Hosted Service is working.");
    
            using (var scope = Services.CreateScope())
            {
                var scopedProcessingService = 
                    scope.ServiceProvider
                        .GetRequiredService<IScopedProcessingService>();
    
                scopedProcessingService.DoWork();
            }
        }
    
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation(
                "Consume Scoped Service Hosted Service is stopping.");
    
            return Task.CompletedTask;
        }
    }

    服务注册在Startup.ConfigureServices中。IHostedService的实现用AddHostedService扩展方法注册:

    services.AddHostedService<ConsumeScopedServiceHostedService>();
    services.AddScoped<IScopedProcessingService, ScopedProcessingService>();

    三. Queued background tasks 排队的后台任务

    public interface IBackgroundTaskQueue
    {
        void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);
    
        Task<Func<CancellationToken, Task>> DequeueAsync(
            CancellationToken cancellationToken);
    }
    
    public class BackgroundTaskQueue : IBackgroundTaskQueue
    {
        private ConcurrentQueue<Func<CancellationToken, Task>> _workItems = 
            new ConcurrentQueue<Func<CancellationToken, Task>>();
        private SemaphoreSlim _signal = new SemaphoreSlim(0);
    
        public void QueueBackgroundWorkItem(
            Func<CancellationToken, Task> workItem)
        {
            if (workItem == null)
            {
                throw new ArgumentNullException(nameof(workItem));
            }
    
            _workItems.Enqueue(workItem);
            _signal.Release();
        }
    
        public async Task<Func<CancellationToken, Task>> DequeueAsync(
            CancellationToken cancellationToken)
        {
            await _signal.WaitAsync(cancellationToken);
            _workItems.TryDequeue(out var workItem);
    
            return workItem;
        }
    }

    在 QueueHostedService中,队列中的后台任务出队列并且作为BackroundService执行。BackgroundService是一个实现了IHostedService接口的类。

    public class QueuedHostedService : BackgroundService
    {
        private readonly ILogger _logger;
    
        public QueuedHostedService(IBackgroundTaskQueue taskQueue, 
            ILoggerFactory loggerFactory)
        {
            TaskQueue = taskQueue;
            _logger = loggerFactory.CreateLogger<QueuedHostedService>();
        }
    
        public IBackgroundTaskQueue TaskQueue { get; }
    
        protected async override Task ExecuteAsync(
            CancellationToken cancellationToken)
        {
            _logger.LogInformation("Queued Hosted Service is starting.");
    
            while (!cancellationToken.IsCancellationRequested)
            {
                var workItem = await TaskQueue.DequeueAsync(cancellationToken);
    
                try
                {
                    await workItem(cancellationToken);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, 
                       $"Error occurred executing {nameof(workItem)}.");
                }
            }
    
            _logger.LogInformation("Queued Hosted Service is stopping.");
        }
    }

    服务注册在Startup.ConfigureService方法中。IHostedService的实现用AddHostedService扩展方法注册:

    services.AddHostedService<QueuedHostedService>();
    services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();

    在Index page model类中:

    • IBackgroundTaskQueue被注入到构造函数并且指定给Queue
    • 一个 IServiceScopeFactory被注入并且指定给_serviceScopeFactory. 这个工厂用来创建IServiceScope实例, IServiceScope实例是用来在scope内创建 services的。一个scope被创建时为了用应用的AppDbContext(a scoped service)来写数据库记录在 IBackgroundTaskQueue 中(a singleton service).
    public class IndexModel : PageModel
    {
        private readonly AppDbContext _db;
        private readonly ILogger _logger;
        private readonly IServiceScopeFactory _serviceScopeFactory;
    
        public IndexModel(AppDbContext db, IBackgroundTaskQueue queue, 
            ILogger<IndexModel> logger, IServiceScopeFactory serviceScopeFactory)
        {
            _db = db;
            _logger = logger;
            Queue = queue;
            _serviceScopeFactory = serviceScopeFactory;
        }
    
        public IBackgroundTaskQueue Queue { get; }

    当 Index page 上的Add Task按钮被选中时,OnPostAddTask方法被执行。QueueBackgroundWorkItem被调用来使work item入队。

    public IActionResult OnPostAddTaskAsync()
    {
        Queue.QueueBackgroundWorkItem(async token =>
        {
            var guid = Guid.NewGuid().ToString();
    
            using (var scope = _serviceScopeFactory.CreateScope())
            {
                var scopedServices = scope.ServiceProvider;
                var db = scopedServices.GetRequiredService<AppDbContext>();
    
                for (int delayLoop = 1; delayLoop < 4; delayLoop++)
                {
                    try
                    {
                        db.Messages.Add(
                            new Message() 
                            { 
                                Text = $"Queued Background Task {guid} has " +
                                    $"written a step. {delayLoop}/3"
                            });
                        await db.SaveChangesAsync();
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, 
                            "An error occurred writing to the " +
                            $"database. Error: {ex.Message}");
                    }
    
                    await Task.Delay(TimeSpan.FromSeconds(5), token);
                }
            }
    
            _logger.LogInformation(
                $"Queued Background Task {guid} is complete. 3/3");
        });
    
        return RedirectToPage();
    }

    四. 总结

    注意上面的方法都有一个共同点:即直接或间接实现 IHostedService 方法

    IHostedService interface

    Hosted servcies实现IHostService接口. 这个接口定义了两个方法,为被主机管理的对象:

    • StartAsync - StartAsync包含启动后台任务的逻辑。
    • StopAsync - 当host 执行关闭时触发。StopAsync包含终止后台任务的逻辑。实现IDisposable 和finalizers 来释放任意unmanaged resources.

    你可以把这种用法的后台任务加到任意应用,例如web api , mvc , 控制台等,因为后台服务在应用启动时,就被加载了。它是被以服务的方式加到了管道上了

    参考网址:

    https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-2.2&tabs=visual-studio

  • 相关阅读:
    Apache 虚拟主机 VirtualHost 配置
    EAX、ECX、EDX、EBX寄存器的作用
    Python中文文档 目录(转载)
    八度
    POJ 3268 Silver Cow Party (最短路)
    POJ 2253 Frogger (求每条路径中最大值的最小值,Dijkstra变形)
    2013金山西山居创意游戏程序挑战赛——复赛(1) HDU 4557 非诚勿扰 HDU 4558 剑侠情缘 HDU 4559 涂色游戏 HDU 4560 我是歌手
    HDU 4549 M斐波那契数列(矩阵快速幂+欧拉定理)
    UVA 11624 Fire! (简单图论基础)
    HDU 3534 Tree (树形DP)
  • 原文地址:https://www.cnblogs.com/Vincent-yuan/p/11048748.html
Copyright © 2011-2022 走看看