zoukankan      html  css  js  c++  java
  • .net core 实现基于 cron 表达式的任务调度

    .net core 实现基于 cron 表达式的任务调度

    Intro

    上次我们实现了一个简单的基于 Timer 的定时任务,详细信息可以看这篇文章

    但是使用过程中慢慢发现这种方式可能并不太合适,有些任务可能只希望在某个时间段内执行,只使用 timer 就显得不是那么灵活了,希望可以像 quartz 那样指定一个 cron 表达式来指定任务的执行时间。

    cron 表达式介绍

    cron 常见于Unix类Unix操作系统之中,用于设置周期性被执行的指令。该命令从标准输入设备读取指令,并将其存放于“crontab”文件中,以供之后读取和执行。该词来源于希腊语 chronos(χρόνος),原意是时间。

    通常,crontab储存的指令被守护进程激活,crond 常常在后台运行,每一分钟检查是否有预定的作业需要执行。这类作业一般称为cron jobs

    cron 可以比较准确的描述周期性执行任务的执行时间,标准的 cron 表达式是五位:

    30 4 * * ? 五个位置上的值分别对应 分钟/小时/日期/月份/周(day of week)

    现在有一些扩展,有6位的,也有7位的,6位的表达式第一个对应的是秒,7个的第一个对应是秒,最后一个对应的是年份

    0 0 12 * * ? 每天中午12点
    0 15 10 ? * * 每天 10:15
    0 15 10 * * ? 每天 10:15
    30 15 10 * * ? * 每天 10:15:30
    0 15 10 * * ? 2005 2005年每天 10:15

    详细信息可以参考:http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html

    .NET Core CRON service

    CRON 解析库 使用的是 https://github.com/HangfireIO/Cronos
    ,支持五位/六位,暂不支持年份的解析(7位)

    基于 BackgroundService 的 CRON 定时服务,实现如下:

    public abstract class CronScheduleServiceBase : BackgroundService
    {
            /// <summary>
            /// job cron trigger expression
            /// refer to: http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html
            /// </summary>
            public abstract string CronExpression { get; }
    
            protected abstract bool ConcurrentAllowed { get; }
    
            protected readonly ILogger Logger;
    
            private readonly string JobClientsCache = "JobClientsHash";
    
            protected CronScheduleServiceBase(ILogger logger)
            {
                Logger = logger;
            }
    
            protected abstract Task ProcessAsync(CancellationToken cancellationToken);
    
            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                {
                    var next = CronHelper.GetNextOccurrence(CronExpression);
                    while (!stoppingToken.IsCancellationRequested && next.HasValue)
                    {
                        var now = DateTimeOffset.UtcNow;
    
                        if (now >= next)
                        {
                            if (ConcurrentAllowed)
                            {
                                _ = ProcessAsync(stoppingToken);
                                next = CronHelper.GetNextOccurrence(CronExpression);
                                if (next.HasValue)
                                {
                                    Logger.LogInformation("Next at {next}", next);
                                }
                            }
                            else
                            {
                                var machineName = RedisManager.HashClient.GetOrSet(JobClientsCache, GetType().FullName, () => Environment.MachineName); // try get job master
                                if (machineName == Environment.MachineName) // IsMaster
                                {
                                    using (var locker = RedisManager.GetRedLockClient($"{GetType().FullName}_cronService"))
                                    {
                                        // redis 互斥锁
                                        if (await locker.TryLockAsync())
                                        {
                                            // 执行 job
                                            await ProcessAsync(stoppingToken);
    
                                            next = CronHelper.GetNextOccurrence(CronExpression);
                                            if (next.HasValue)
                                            {
                                                Logger.LogInformation("Next at {next}", next);
                                                await Task.Delay(next.Value - DateTimeOffset.UtcNow, stoppingToken);
                                            }
                                        }
                                        else
                                        {
                                            Logger.LogInformation($"failed to acquire lock");
                                        }
                                    }
                                }
                            }
                        }
                        else
                        {
                            // needed for graceful shutdown for some reason.
                            // 1000ms so it doesn't affect calculating the next
                            // cron occurence (lowest possible: every second)
                            await Task.Delay(1000, stoppingToken);
                        }
                    }
                }
            }
    
            public override Task StopAsync(CancellationToken cancellationToken)
            {
                RedisManager.HashClient.Remove(JobClientsCache, GetType().FullName); // unregister from jobClients
                return base.StopAsync(cancellationToken);
            }
        }
    

    因为网站部署在多台机器上,所以为了防止并发执行,使用 redis 做了一些事情,Job执行的时候尝试获取 redis 中 job 对应的 master 的 hostname,没有的话就设置为当前机器的 hostname,在 job 停止的时候也就是应用停止的时候,删除 redis 中当前 job 对应的 master,job执行的时候判断是否是 master 节点,是 master 才执行job,不是 master 则不执行。完整实现代码:https://github.com/WeihanLi/ActivityReservation/blob/dev/ActivityReservation.Helper/Services/CronScheduleServiceBase.cs#L11

    定时 Job 示例:

    public class RemoveOverdueReservationService : CronScheduleServiceBase
    {
        private readonly IServiceProvider _serviceProvider;
        private readonly IConfiguration _configuration;
    
        public RemoveOverdueReservationService(ILogger<RemoveOverdueReservationService> logger,
            IServiceProvider serviceProvider, IConfiguration configuration) : base(logger)
        {
            _serviceProvider = serviceProvider;
            _configuration = configuration;
        }
    
        public override string CronExpression => _configuration.GetAppSetting("RemoveOverdueReservationCron") ?? "0 0 18 * * ?";
    
        protected override bool ConcurrentAllowed => false;
    
        protected override async Task ProcessAsync(CancellationToken cancellationToken)
        {
            using (var scope = _serviceProvider.CreateScope())
            {
                var reservationRepo = scope.ServiceProvider.GetRequiredService<IEFRepository<ReservationDbContext, Reservation>>();
                await reservationRepo.DeleteAsync(reservation => reservation.ReservationStatus == 0 && (reservation.ReservationForDate < DateTime.Today.AddDays(-3)));
            }
        }
    }
    

    完整实现代码:https://github.com/WeihanLi/ActivityReservation/blob/dev/ActivityReservation.Helper/Services/RemoveOverdueReservationService.cs

    Memo

    使用 redis 这种方式来决定 master 并不是特别可靠,正常结束的没有什么问题,最好还是用比较成熟的服务注册发现框架比较好

    Reference

  • 相关阅读:
    至最近写的微博记录(一)
    对古人“一命二运三风水,四积德五读书”的人生命运总结的理解
    福建省获得央行颁发的非金融机构支付业务许可牌照的公司(至20120801)
    替信息系统运维工作正名
    在信息系统运维开发中,对MVC框架认识上的一种变通
    Elasticsearch 从入门到学会之六(索引API特殊管理)
    Python的Web项目实现 Flask
    python的方法整理
    Shell脚本
    各种测试场景case整理
  • 原文地址:https://www.cnblogs.com/weihanli/p/implement-job-schedule-via-cron-for-dotnetcore.html
Copyright © 2011-2022 走看看