zoukankan      html  css  js  c++  java
  • .net core 下 Quartz 任务调度的使用和配置 

    Quarzt 是一个开源的作业调度框架;允许程序开发人员根据时间间隔来调度作业;实现了任务和触发器的多对多关系.

    1.Quarzt 核心概念:
    • Job : 表示一个工作,要执行的具体内容; 接口中定义了一个方法 void excute(JobExecutionContext context);
    • JobDetail:  表示一个具体的可执行的调度程序;包含了这个任务调度的方案和策略.
    • Trigger:  一个调度参数的配置,配置调用的时间和周期.框架提供了5种触发器类型(SimpleTrigger、CronTrigger、DateIntervalTrigger、 NthlncludedDayTrigger、Calendar类).
    • Scheduler: 一个调度容器可以注册多个JobDetail和Trigger.调度需要组合JobDetail和Trigger;

    附: 常用的trigger为SimpleTrigger和CronTrigger. 
    SimpleTrigger 执行N次,重复N次;
    CronTrigger: 几秒 几分 几时 哪日 哪月 哪周 哪年执行

    quartz表达式在线网址:  http://cron.qqe2.com/

    1.1 Quarzt 运行环境:

    可以嵌入在应用程序,也可以作为一个独立的程序运行.

    1.2.Quarzt 存储方式:
    • RAMJobStore(内存作业存储类型)和JDBCJobStore(数据库作业存储类型),两种方式对比如下:
    优点 缺点
    RAMJobStore 不要外部数据库,配置容易,运行速度快 因为调度程序信息是存储在被分配给JVM的内存里面, 所以,当应用程序停止运行时,所有调度信息将被丢失。另外因为存储到JVM内存里面,所以可以存储多少个Job和Trigger将会受到限制
    BCJobStor 支持集群,因为所有的任务信息都会保存 运行速度的快慢取决与连接数据库的快慢到数据库中,可以控制事物,还有就是如果应用服务器关闭或者重启,任务信息都不会丢失,并且可以恢复因服务器关闭或者重启而导致执行失败的任务
    2. Quarzt 在.net的实现
    2.1 Quarzt ISchedulerCenter接口定义
    • ISchedulerCenter接口主要定义了以下几个接口

    开启任务调度 StartScheduleAsync()
    停止任务调度 StopScheduleAsync()
    添加一个任务 AddScheduleJobAsync(TasksQz sysSchedule)
    停止一个任务 StopScheduleJobAsync(TasksQz sysSchedule)
    恢复一个任务 ResumeJob(TasksQz sysSchedule)

        /// <summary>
        /// 服务调度接口
        /// </summary>
        public interface ISchedulerCenter
        {
    
            /// <summary>
            /// 开启任务调度
            /// </summary>
            /// <returns></returns>
            Task<MessageModel<string>> StartScheduleAsync();
    
            /// <summary>
            /// 停止任务调度
            /// </summary>
            /// <returns></returns>
            Task<MessageModel<string>> StopScheduleAsync();
    
            /// <summary>
            /// 添加一个任务
            /// </summary>
            /// <returns></returns>
            Task<MessageModel<string>> AddScheduleJobAsync(TasksQz sysSchedule);
    
           
            /// <summary>
            /// 停止一个任务
            /// </summary>
            /// <returns></returns>
            Task<MessageModel<string>> StopScheduleJobAsync(TasksQz sysSchedule);
    
            /// <summary>
            /// 恢复一个任务
            /// </summary>
            /// <returns></returns>
            Task<MessageModel<string>> ResumeJob(TasksQz sysSchedule);
    
        }
    
    
    


    2.2 Quarzt 任务调度服务中心 SchedulerCenterServer 封装
    • 任务调度中心 SchedulerCenterServer 类的实现, 继承实现 ISchedulerCenter 接口;包含任务的开启、任务的关闭方法;以及对指定任务的开启、暂停和恢复方法的封装;
    
        /// <summary>
        /// 任务调度管理中心
        /// </summary>
        public class SchedulerCenterServer : ISchedulerCenter
        {
            private Task<IScheduler> _scheduler;
            private readonly IJobFactory _iocjobFactory;
    
            public SchedulerCenterServer(IJobFactory jobFactory)
            {
                _iocjobFactory = jobFactory;
                _scheduler = GetSchedulerAsync();
            }
    
            private Task<IScheduler> GetSchedulerAsync()
            {
                if (_scheduler != null)
                    return this._scheduler;
                else
                {
                    //从Factory中获取Scheduler实例
                    NameValueCollection collection = new NameValueCollection
                    {
                        {"quartz.serializer.type","binary" }
                    };
                    StdSchedulerFactory factory = new StdSchedulerFactory(collection);
                    return _scheduler = factory.GetScheduler();
                }
            }
    
    
            /// <summary>
            /// 开启任务调度
            /// </summary>
            /// <returns></returns>
            public async Task<MessageModel<string>> StartScheduleAsync()
            {
                var result = new MessageModel<string>();
                try
                {
                    this._scheduler.Result.JobFactory = this._iocjobFactory;
                    if (!this._scheduler.Result.IsStarted)
                    {
                        //等待任务运行完成
                        await this._scheduler.Result.Start();
                        await Console.Out.WriteLineAsync("任务调度开启!");
                        result.success = true;
                        result.msg = $"任务调度开启成功";
                        return result;
                    }
                    else
                    {
                        result.success = false;
                        result.msg = $"任务调度已经开启";
                        return result;
                    }
                }
                catch (Exception ex)
                {
                    throw ex;
                }
                throw new NotImplementedException();
            }
    
            /// <summary>
            /// 停止任务调度
            /// </summary>
            /// <returns></returns>
            public async Task<MessageModel<string>> StopScheduleAsync()
            {
                var result = new MessageModel<string>();
                try
                {
                    if (!this._scheduler.Result.IsShutdown)
                    {
                        //等待任务运行完成
                        await this._scheduler.Result.Shutdown();
                        await Console.Out.WriteLineAsync("任务调度停止! ");
    
                        result.success = true;
                        result.msg = $"任务调度停止成功";
                        return result;
                    }
                    else
                    {
                        result.success = false;
                        result.msg = $"任务调度已经停止";
                        return result;
                    }
                }
                catch (Exception ex)
                {
                    throw ex;
                }
                throw new NotImplementedException();
            }
    
    
    
    
            /// <summary>
            /// 添加一个计划任务(映射程序集指定IJob实现类)
            /// </summary>
            /// <param name="sysSchedule"></param>
            /// <returns></returns>
            public async Task<MessageModel<string>> AddScheduleJobAsync(TasksQz sysSchedule)
            {
                var result = new MessageModel<string>();
                if (sysSchedule != null)
                {
                    try
                    {
                        JobKey jobKey = new JobKey(sysSchedule.Id.ToString());
                        if (await _scheduler.Result.CheckExists(jobKey))
                        {
                            result.success = false;
                            result.msg = $"该任务计划已经在执行:【{sysSchedule.Name}】";
                            return result;
                        }
                        #region 设置开始时间和结束时间
                        if (sysSchedule.BeginTime == null)
                        {
                            sysSchedule.BeginTime = DateTime.Now;
                        }
                        DateTimeOffset startRunTime = DateBuilder.NextGivenSecondDate(sysSchedule.BeginTime, 1);//设置开始时间
                        if (sysSchedule.EndTime == null)
                        {
                            sysSchedule.EndTime = DateTime.MaxValue.AddDays(-1);
                        }
                        DateTimeOffset endRunTime = DateBuilder.NextGivenSecondDate(sysSchedule.EndTime, 1);//设置暂停时间
    
                        #endregion
    
                        #region 通过反射获取程序集类型和类
    
                        Assembly assembly = Assembly.Load(sysSchedule.AssemblyName);
                        Type jobType = assembly.GetType(sysSchedule.AssemblyName + "." + sysSchedule.ClassName);
    
                        #endregion
    
                        //判断任务调度是否开启
                        if (!_scheduler.Result.IsStarted)
                        {
                            await StartScheduleAsync();
                        }
    
                        //传入反射出来的执行程序集
                        IJobDetail job = new JobDetailImpl(sysSchedule.Id.ToString(), sysSchedule.JobGroup, jobType);
                        job.JobDataMap.Add("JobParam", sysSchedule.JobParams);
                        ITrigger trigger;
                        #region 泛型传递
                        //IJobDetail job = JobBuilder.Create<T>()
                        //    .WithIdentity(sysSchedule.Name, sysSchedule.JobGroup)
                        //    .Build();
                        #endregion
                        if (sysSchedule.Cron != null && CronExpression.IsValidExpression(sysSchedule.Cron) && sysSchedule.TriggerType > 0)
                        {
                            trigger = CreateCronTrigger(sysSchedule);
                        }
                        else
                        {
                            trigger = CreateSimpleTrigger(sysSchedule);
                        }
    
                        //告诉Quartz使用我们的触发器来安排作业
                        await _scheduler.Result.ScheduleJob(job, trigger);
    
                        //await Task.Delay(TimeSpan.FromSeconds(120));
                        //await Console.Out.WriteLineAsync("关闭了调度器!");
                        //await _scheduler.Result.Shutdown();
    
                        result.success = true;
                        result.msg = $"启动任务: 【{sysSchedule.Name}】成功";
                        return result;
    
                    }
                    catch (Exception ex)
                    {
                        result.success = false;
                        result.msg = $"任务计划异常:【{ex.Message}】";
                        return result;
                    }
                }
                else
                {
                    result.success = false;
                    result.msg = $"任务计划不存在:【{sysSchedule?.Name}】";
                    return result;
                }
            }
    
            /// <summary>
            /// 暂停一个指定的计划任务
            /// </summary>
            /// <param name="sysSchedule"></param>
            /// <returns></returns>
            public async Task<MessageModel<string>> StopScheduleJobAsync(TasksQz sysSchedule)
            {
                var result = new MessageModel<string>();
                try
                {
                    JobKey jobKey = new JobKey(sysSchedule.Id.ToString(), sysSchedule.JobGroup);
                    if (!await _scheduler.Result.CheckExists(jobKey))
                    {
                        result.success = false;
                        result.msg = $"未找到要暂停的任务:【{sysSchedule.Name}】";
                        return result;
                    }
                    else
                    {
                        await this._scheduler.Result.PauseJob(jobKey);
                        result.success = true;
                        result.msg = $"暂停任务:【{sysSchedule.Name}】成功";
                        return result;
                    }
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    
    
            /// <summary>
            /// 恢复指定的计划任务
            /// </summary>
            /// <param name="sysSchedule"></param>
            /// <returns></returns>
            public async Task<MessageModel<string>> ResumeJob(TasksQz sysSchedule)
            {
                var result = new MessageModel<string>();
                try
                {
                    JobKey jobKey = new JobKey(sysSchedule.Id.ToString(), sysSchedule.JobGroup);
                    if (!await _scheduler.Result.CheckExists(jobKey))
                    {
                        result.success = false;
                        result.msg = $"未找到要重新运行的任务,【{sysSchedule.Name}】";
                        return result;
                    }
                    //await this._scheduler.Result.ResumeJob(jobKey);
    
                    ITrigger trigger;
                    if (sysSchedule.Cron != null && CronExpression.IsValidExpression(sysSchedule.Cron) && sysSchedule.TriggerType > 0)
                    {
                        trigger = CreateCronTrigger(sysSchedule);
                    }
                    else
                    {
                        trigger = CreateSimpleTrigger(sysSchedule);
                    }
                    TriggerKey triggerKey = new TriggerKey(sysSchedule.Id.ToString(), sysSchedule.JobGroup); ;
                    await _scheduler.Result.RescheduleJob(triggerKey, trigger);
    
                    result.success = true;
                    result.msg = $"恢复计划任务: 【{sysSchedule.Name}】 成功";
                    return result;
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    
    
            #region 创建触发器帮助方法
    
            private ITrigger CreateSimpleTrigger(TasksQz sysSchedule)
            {
                if (sysSchedule.RunTimes > 0)
                {
                    ITrigger trigger = TriggerBuilder.Create()
                        .WithIdentity(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
                        .StartAt(sysSchedule.BeginTime.Value)
                        .EndAt(sysSchedule.EndTime.Value)
                        .WithSimpleSchedule(x =>
                        x.WithIntervalInSeconds(sysSchedule.IntervalSecond)
                        .WithRepeatCount(sysSchedule.RunTimes)).ForJob(sysSchedule.Id.ToString(), sysSchedule.JobGroup).Build();
                    return trigger;
                }
                else
                {
                    ITrigger trigger = TriggerBuilder.Create()
                        .WithIdentity(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
                        .StartAt(sysSchedule.BeginTime.Value)
                        .EndAt(sysSchedule.EndTime.Value)
                        .WithSimpleSchedule(x =>
                        x.WithIntervalInSeconds(sysSchedule.IntervalSecond)
                        .RepeatForever()).ForJob(sysSchedule.Id.ToString(), sysSchedule.JobGroup).Build();
                    return trigger;
                }
                //触发作业立即运行,然后每10秒重复一次,无限循环
            }
    
    
            /// <summary>
            /// 创建类型Cron的触发器
            /// </summary>
            /// <param name="sysSchedule"></param>
            /// <returns></returns>
            private ITrigger CreateCronTrigger(TasksQz sysSchedule)
            {
                //作业触发器
                return TriggerBuilder.Create()
                    .WithIdentity(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
                    .StartAt(sysSchedule.BeginTime.Value)
                    .EndAt(sysSchedule.EndTime.Value)
                    .WithCronSchedule(sysSchedule.Cron)//指定cron表达式
                    .ForJob(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
                    .Build();
            }
            #endregion
        }
    
    
    2.3 Job工厂类 JobFactory 实现:
    public class JobFactory : IJobFactory
    {
            private readonly IServiceProvider _serviceProvider;  
    
            public JobFactory(IServiceProvider serviceProvider)
            {
                _serviceProvider = serviceProvider;
            }
    
    
    
            /// <summary>
            /// 实现接口Job
            /// </summary>
            /// <param name="bundle"></param>
            /// <param name="scheduler"></param>
            /// <returns></returns>
            public IJob NewJob(TriggerFiredBundle bundle, IScheduler scheduler)
            {
                try
                {
    
                    var serviceScope = _serviceProvider.CreateScope();
    
                    var job = serviceScope.ServiceProvider.GetService(bundle.JobDetail.JobType) as IJob;
    
                    return job;
    
                }
                catch(Exception ex)
                {
                    throw ex;
                }
    
            }
    
    
            public void ReturnJob(IJob job)
            {
    
                var disposable = job as IDisposable;
    
                if(disposable != null)
                {
                    disposable.Dispose();
                }
    
            }
        }
    
    
    2.4 单个任务调度的示例 :
    • Job_Users_Quartz 这里是要执行的服务和方法 比如每隔一段时间对平台用户的总数量进行统计
        public class Job_Users_Quartz : JobBase, IJob
        {
    
            private readonly ITasksQzServices _tasksQzServices;
    
            private readonly IUserServices _userServices;
    
            private readonly ILogger<Job_Users_Quartz> _logger;
    
            public Job_Users_Quartz(ITasksQzServices tasksQzServices, IUserServices userServices, ILogger<Job_Users_Quartz> logger)
            {
    
                _tasksQzServices = tasksQzServices;
                _userServices = userServices;
                _logger = logger;
    
            }
    
    
    
            public async Task Execute(IJobExecutionContext context)
            {
    
                //var param = context.MergedJobDataMap;
                // 可以直接获取 JobDetail 的值
                var jobKey = context.JobDetail.Key;
                var jobId = jobKey.Name;
    
                var executeLog = await ExecuteJob(context, async () => await Run(context, jobId.ObjToInt()));
    
                //通过数据库配置,获取传递过来的参数
                JobDataMap data = context.JobDetail.JobDataMap;
          
            }
            
            public async Task Run(IJobExecutionContext context, int jobid)
            {
    
                var count = await _userServices.QueryCount(x=>x.ID > 0);
                if (jobid > 0)
                {
    
                    var model = await _tasksQzServices.QueryById(jobid);
                    if (model != null)
                    {
    
                        model.RunTimes += 1;
    
                        var separator = "<br>";
    
                        string remark = $"【{DateTime.Now}】执行任务【Id:{context.JobDetail.Key.Name},组别:context.JobDetail.Key.Group}】【执行成功】{separator}";
    
                        model.Remark = remark + string.Join(separator, StringHelper.GetTopDataBySeparator(model.Remark, separator, 3));
    
                        //_logger.LogInformation(remark);
                        await _tasksQzServices.Update(model);
                    }
                }
                //_logger.LogInformation("用户总数量" + count.ToString());
                await Console.Out.WriteLineAsync("用户总数量" + count.ToString());
            }
        }
    
    2.5 Quartz 启动服务中间件
        /// <summary>
        /// Quartz 启动服务
        /// </summary>
        public static class QuartzJobMildd
        {
    
            public static void UseQuartzJobMildd(this IApplicationBuilder app, ITasksQzServices tasksQzServices, ISchedulerCenter schedulerCenter)
            {
    
                if (app == null) throw new 
    ArgumentNullException(nameof(QuartzJobMildd));
    
    
                try
                {
    
                    if (Appsettings.app("Middleware", "QuartzNetJob", "Enabled").ObjToBool())
                    {
    
                        var allQzServices = tasksQzServices.Query().Result;
    
                        foreach (var item in allQzServices)
                        {
    
                            if (item.IsStart)
                            {
    
                                var resuleModel = schedulerCenter.AddScheduleJobAsync(item).Result;
    
                                if (resuleModel.success)
                                {
                                    Console.WriteLine($"QuartzNetJob{item.Name}启动成功! ");
                                }
                                else
                                {
                                    Console.WriteLine($"QuartzNetJob{item.Name}启动失败! 错误信息{resuleModel.msg}");
                                }
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
    
                    Console.WriteLine($"An error was reported when starting the job service.\n{ex.Message}");
                    throw;
                }
            }
        }
    
    
    2.6 任务调度启动服务 JobSetup
        /// <summary>
        /// 任务调度 启动服务
        /// </summary>
        public static class JobSetup
        {
    
            public static void AddJobSetup(this IServiceCollection services)
            {
    
                if (services == null) throw new ArgumentNullException(nameof(services));
    
                //services.AddHostedService<Job1TimedService>();
                //services.AddHostedService<Job2TimedService>();
    
                services.AddSingleton<IJobFactory, JobFactory>();
                services.AddTransient<Job_Users_Quartz>();//Job使用瞬时依赖注入
                services.AddSingleton<ISchedulerCenter, SchedulerCenterServer>();
    
            }
    
        }
    
    
    
    2.7 Quartz 中间件和服务在Startup的的启用
     public void ConfigureServices(IServiceCollection services)
    {
     services.AddJobSetup();
    }
    
    
     
      public void Configure(IApplicationBuilder app, IWebHostEnvironment env, 
    ITasksQzServices tasksQzServices, ISchedulerCenter schedulerCenter)
     {
          
     //// 开启QuartzNetJob调度服务
    app.UseQuartzJobMildd(tasksQzServices, schedulerCenter);
    
    }
    
    
    2.8 Quartz 的 TasksQz 类定义
        /// <summary>
        /// 任务计划表
        /// </summary>
    
        public class TasksQz 
        {
             /// <summary>
            /// ID
            /// </summary>
            [SugarColumn(IsNullable = false,IsPrimaryKey = true,IsIdentity = true)]
            public int Id { get; set; }
    
            /// <summary>
            /// 任务名称
            /// </summary>
            [SugarColumn(ColumnDataType = "nvarchar",Length = 200,IsNullable = true)]
            public string Name { get; set;}
    
            /// <summary>
            /// 任务分组
            /// </summary>
            [SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
            public string JobGroup { get; set; }
    
            /// <summary>
            /// 任务运行时间表达式
            /// </summary>
            [SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
            public string Cron { get; set; }
    
            /// <summary>
            /// 任务所在DLL对应的程序集名称
            /// </summary>
            [SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
            public string AssemblyName { get; set; }
    
            /// <summary>
            /// 任务所在类
            /// </summary>
            [SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
            public string ClassName { get; set; }
    
            /// <summary>
            /// 任务描述
            /// </summary>
            [SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
            public string Remark { get; set; }  
    
    
            /// <summary>
            /// 执行次数
            /// </summary>
            public int RunTimes { get; set; }
    
            /// <summary>
            /// 开始时间
            /// </summary>
            public DateTime? BeginTime { get; set; }
    
            /// <summary>
            /// 结束时间
            /// </summary>
            public DateTime? EndTime { get; set; }
    
            /// <summary>
            ///  触发器类型(0 simple  1 cron)
            /// </summary>
            public int TriggerType { get; set; }
    
    
    
            /// <summary>
            /// 执行间隔时间,秒为单位
            /// </summary>
            public int IntervalSecond { get; set; }
    
            /// <summary>
            /// 是否启动
            /// </summary>
            public bool IsStart { get; set; }
    
    
            /// <summary>
            /// 执行传参
            /// </summary>
            public string JobParams { get; set; }
    
    
            /// <summary>
            /// 是否删除 1 删除 
            /// </summary>
            [SugarColumn(IsNullable = true)]
            public bool? IsDeleted { get; set; }
    
            /// <summary>
            /// 创建时间
            /// </summary>
            [SugarColumn(IsNullable = true)]
            public DateTime CreateTime { get; set; } = DateTime.Now;
    
        }
    

    参考源码:
    https://github.com/anjoy8/Blog.Core

    Quartz管理工具:
    https://github.com/guryanovev/crystalquartz

  • 相关阅读:
    8.使用axios实现登录功能
    7.django配置跨域并开发测试接口
    9.Vue组件
    2.初始化项目结构
    1.Django基础
    团队冲刺——第七天
    团队冲刺——第六天
    十天冲刺——第五天
    十天冲刺——第四天
    十天冲刺——第三天
  • 原文地址:https://www.cnblogs.com/jerque/p/15757761.html
Copyright © 2011-2022 走看看