zoukankan      html  css  js  c++  java
  • 8、quartz.net 支持多点部署的job服务

    github地址

    https://github.com/quartznet/quartznet

    1、概念、作用

    quartz是一个job工具,什么是job,可以理解成windows的计划任务。也可以理解成数据库的作业

    为什么要用,因为需要用,为什么不用数据库作业去跑,请放过数据库吧

    2、安装

    因为我是集成在程序里的,所以直接nuget安装库就就可以了

    Quartz

    Quartz.Serialization.Json(这个必须要,不用问为啥)

    3、思路

    .net core 基本就是ioc的理念,所以接下来用依赖注入的思路来实现一个小功能,每10秒,输出当前时间到日志内,简约而不简单,因为基本上定时任务就是干这个的

    4、开始

    4.1 数据库脚本执行

    quartz依赖数据库做持久化,支持很多种数据库,可以在下面的网址查看

    https://github.com/quartznet/quartznet/tree/master/database/tables

    弄下来直接执行就行了

    mysql数据库安装配置前面说过了

    https://www.cnblogs.com/ares-core/p/12956219.html

    4.2 配置文件

    我们在apollo里,增加一个namespace,然后加入到程序里

    Apollo 安装配置请看

    https://www.cnblogs.com/ares-core/p/12964701.html

    https://www.cnblogs.com/ares-core/p/12975477.html

    配置信息如下

    {
    	"Quartz": {
    	  "Enable": true,
    	  "JobStoreType": "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz",
    	  "JobStoreTablePrefix": "QRTZ_",
    	  "DriverDelegateType": "Quartz.Impl.AdoJobStore.MySQLDelegate,Quartz",
    	  "JobStoreDataSource": "myDS",
    	  "DataSourceProvider": "MySql",
    	  "ConnectionString": "Server=192.168.137.220; Port=3306; Database=quartznet; Uid=root; Pwd=123456; persistsecurityinfo=True; CharSet=utf8; SslMode=none;",
    	  "Tasks": [{
                  "JobName": "GameApiTest",
                  "JobGroup": "GameApi",
                  "ScheduleTaskType": "GameApi.Web.ScheduleTask.ITest",
                  "Type": 3,
                  "Cron": "0/10 * * * * ?",
                  "Data": "",
                  "Description": "测试"
                }]
    	}
    }
    

     先别管干什么的,一会儿写代码的时候会挨个说

    4.3 创建启动服务

    干啥用?就是程序启动的时候,根据上面的配置文件,创建任务,当然要判断一下,有没有这个任务,有的话就跳过添加,没有的话就添加

    怎么实现?继承 IHostedService 接口即可,IHostedService是啥?去问msdn

    首先我们创建一个job全局类,原因后面说

    using System;
    using Quartz;
    namespace GameApi.Quartz4Net
    {
        public static class JobApplicationContext
        {
            public static IScheduler Scheduler { get; set; }
        }
    }
    

    然后是两个entity类,用来反序列化Apollo里的参数

    using System.Collections.Generic;
    
    namespace GameApi.Quartz4Net
    {
        public class ScheduleTaskParameter
        {
            public int Type { get; set; }
            public string ScheduleTaskType { get; set; }
            public int DelaySeconds { get; set; }
            public int RepeatCount { get; set; }
            public string Data { get; set; }
            public string Cron { get; set; } = string.Empty;
            public string JobName { get; set; } = string.Empty;
            public string JobGroup { get; set; } = string.Empty;
            public string Description { get; set; } = string.Empty;
            public string TriggerName { get; set; } = string.Empty;
            public string TriggerGroup { get; set; } = string.Empty;
        }
    
        public class QuartzOptions
        {
            public bool Enable { get; set; }
            public string JobStoreType { get; set; }
            public string JobStoreTablePrefix { get; set; }
            public string DriverDelegateType { get; set; }
            public string JobStoreDataSource { get; set; }
            public string DataSourceProvider { get; set; }
            public string ConnectionString { get; set; }
            public List<ScheduleTaskParameter> Tasks { get; set; }
        }
    }
    

      

    我们新建一个类 ScheduleTaskHostedService 继承IHostedService 接口 ,实现在程序启动的时候,去初始化任务

    代码

    using GameApi.Quartz4Net.Internals;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Options;
    using Quartz;
    using Quartz.Impl;
    using System;
    using System.Collections.Specialized;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace GameApi.Quartz4Net
    {
        public class ScheduleTaskHostedService : IHostedService
        {
            // 获取注入的参数
            private readonly QuartzOptions _options;
            // 获取注入的serviceProvider
            private readonly IServiceProvider _serviceProvider;
            // 构造函数
            public ScheduleTaskHostedService(IOptions<QuartzOptions> options,IServiceProvider serviceProvider)
            {
                this._options = options.Value;
                this._serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
            }
            // 实现 IHostedService 接口, 在系统启动时会执行
            public async Task StartAsync(CancellationToken cancellationToken)
            {
                // 验证apollo内的enable参数是否开启
                if (!this._options.Enable) return;
                // 初始化一些属性,用来创建任务对象(单例模式)
                var properties = new NameValueCollection
                {
                    // 去Apollo看配置 
                    ["quartz.jobStore.type"] = this._options.JobStoreType,
                    // 去Apollo看配置 
                    ["quartz.jobStore.tablePrefix"] = this._options.JobStoreTablePrefix,
                    // 去Apollo看配置 
                    ["quartz.jobStore.driverDelegateType"] = this._options.DriverDelegateType,
                    // 固定的myDS 
                    ["quartz.jobStore.dataSource"] = this._options.JobStoreDataSource,
                    // 数据库连接字符串
                    ["quartz.dataSource.myDS.connectionString"] = this._options.ConnectionString,
                    // 数据库类型
                    ["quartz.dataSource.myDS.provider"] = this._options.DataSourceProvider,
                    // 序列化方式 有json 和 binary 如果是json 的话,必须nuget引用Quartz.Serialization.Json
                    ["quartz.serializer.type"] = "json",
                    // 是否多点部署 
                    ["quartz.jobStore.clustered"] = "true",
                    ["quartz.scheduler.instanceId"] = "AUTO"
                };
                // 初始化一个schedule对象 因为是单例模式的,所以要用一个全局的静态类来存一下
                JobApplicationContext.Scheduler = await new StdSchedulerFactory(properties).GetScheduler();
                // 设置1s的延时启动 没啥特别的,就是记录一下它可以延时启动
                await JobApplicationContext.Scheduler.StartDelayed(TimeSpan.FromSeconds(1d), cancellationToken);
    
                // 通过生命周期获取 scheduleTaskProvider
                using (var scope = this._serviceProvider.CreateScope())
                {
                    //var scheduleTaskProvider = scope.ServiceProvider.GetRequiredService<IScheduleTaskProvider>();
    
                    // 循环获取apollo中的任务列表
                    foreach (var parameter in this._options.Tasks)
                    {
                        // 获取到任务的Key
                        var jobKey = JobKey.Create(parameter.JobName, parameter.JobGroup);
                        // 验证任务是否已经创建
                        var isExist = await JobApplicationContext.Scheduler.CheckExists(jobKey, cancellationToken);
                        if (isExist) continue;
                        // 没有创建的进行创建
                        var jobName = parameter.JobName;
                        var jobGroup = parameter.JobGroup;
                        // 创建任务明细
                        var jobBuilder = JobBuilder
                            // 这个InnerJob类 是最终执行业务代码的入口类,代码下面给出
                            .Create<InnerJob>()
                            .WithIdentity(jobName, jobGroup)
                            .UsingJobData("type", parameter.Type)
                            .UsingJobData("schedule_task_type", parameter.ScheduleTaskType)
                            .UsingJobData("data", parameter.Data)
                            .UsingJobData("delay_seconds", parameter.DelaySeconds)
                            .UsingJobData("cron", parameter.Cron)
                            .UsingJobData("repeatCount", parameter.RepeatCount)
                            .UsingJobData("repeat_count", parameter.RepeatCount);
                        jobBuilder.WithDescription(parameter.Description ?? string.Empty);
                        var jobDetail = jobBuilder.Build();
    
                        // 创建触发器
                        // cron 表达式 https://cron.qqe2.com/
                        var triggerBuilder = TriggerBuilder.Create().WithIdentity(jobName, jobGroup);
                        switch (parameter.Type)
                        {
                            case 3: // 基于 cron 表达式的周期性任务
                                if (string.IsNullOrWhiteSpace(parameter.Cron))
                                {
                                    throw new ArgumentException("计划任务 cron 表达式不能为空");
                                }
    
                                triggerBuilder = triggerBuilder.WithCronSchedule(parameter.Cron, builder =>
                                {
                                    builder.InTimeZone(TimeZoneInfo.Local);
                                });
                                break;
                            default:
                                throw new ArgumentException("未知任务类型 Type ");
                        }
                        var trigger = triggerBuilder.ForJob(jobDetail.Key).Build();
    
                        // 创建任务
                        await JobApplicationContext.Scheduler.ScheduleJob(jobDetail, trigger);
                         
                    }
                }
            }
    
            public async Task StopAsync(CancellationToken cancellationToken)
            {
                if (!this._options.Enable) return;
    
                await JobApplicationContext.Scheduler.Shutdown(cancellationToken);
            }
        }
    }
    

     

    定义一个接口 用来反射实现执行任务,凡是继承了这个接口的,并且接口名称与配置文件相匹配的,就执行

    using System.Threading.Tasks;
    
    namespace GameApi.Quartz4Net
    {
        public interface IScheduleTask
        {
            // 执行计划任务。
            Task RunAsync(ScheduleTaskContext context);
        }
    }
    

      

    下面是InnerJob类,他继承了IJob,通俗的来说,就是触发器的实现,下面用反射的思路来实现

    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Logging;
    using Quartz;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reflection;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace GameApi.Quartz4Net.Internals
    {
        internal class InnerJob : IJob
        {
            public InnerJob()
            {
    
            }
            public async Task Execute(IJobExecutionContext context)
            {
                using (var childScope = JobApplicationContext.IoC.CreateScope())
                {
                    var logger = childScope.ServiceProvider.GetRequiredService<ILoggerFactory>().CreateLogger<InnerJob>();
                    var scheduleTaskType = context.JobDetail.JobDataMap.GetString("schedule_task_type");
                    var data = context.JobDetail.JobDataMap.GetString("data");
                    var delaySeconds = context.JobDetail.JobDataMap.GetInt("delay_seconds");
    
                    foreach (var kv in context.JobDetail.JobDataMap)
                    {
                        logger.LogInformation("计划任务参数:{0} = {1}", kv.Key, kv.Value);
                    }
    
                    var type = AssemblyHelper.GetType(scheduleTaskType);
                    if (type == null)
                    {
                        logger.LogWarning("未找到指定的任务类型: {0}", scheduleTaskType);
                        return;
                    }
                    var instances = childScope.ServiceProvider.GetServices(type);
                    if (instances == null || !instances.Any()) return;
    
                    foreach (var obj in instances)
                    {
                        var instance = obj as IScheduleTask;
                        if (instance == null)
                        {
                            logger.LogWarning("任务 {0} 必须继承 {1} 接口", scheduleTaskType, nameof(IScheduleTask));
                            continue;
                        }
                        try
                        {
                            var scheduleTaskContext = new ScheduleTaskContext
                            {
                                Data = data,
                                TaskId = context.JobDetail.Key.Name,
                                TaskGroup = context.JobDetail.Key.Group
                            };
                            await instance.RunAsync(scheduleTaskContext);
                        }
                        catch (Exception e)
                        {
                            logger.LogError(e, "调用计划任务异常。");
                        }
                    }
                }
            }
        }
    
        public static class AssemblyHelper
        {
            private static List<Assembly> _applicationAssemblies { get; } =
                AppDomain.CurrentDomain.GetAssemblies()
                .Where(asm => asm.FullName.StartsWith("GameApi.", StringComparison.OrdinalIgnoreCase)).ToList();
    
            /// <summary>
            /// 通过指定的类型名称获取一个类型 <see cref="Type"/> 。
            /// </summary>
            /// <param name="typeFullName"></param>
            /// <returns></returns>
            public static Type GetType(string typeFullName)
            {
                var items = typeFullName.Split(new[] { '.' }, StringSplitOptions.RemoveEmptyEntries);
                var builder = new StringBuilder(typeFullName.Length);
                var index = items.Length;
                Assembly assembly = null;
                while (index > 0)
                {
                    builder.Clear();
                    if (index < 0) return null;
                    for (var i = 0; i < index; i++)
                    {
                        builder.Append(items[i]).Append('.');
                    }
                    builder.Remove(builder.Length - 1, 1);
                    --index;
                    assembly = _applicationAssemblies.FirstOrDefault(_ => _.FullName.StartsWith(builder.ToString(), StringComparison.OrdinalIgnoreCase));
                    if (assembly != null)
                    {
                        break;
                    }
                }
                var type = assembly.GetType(typeFullName, false, true);
                return type;
            }
        }
    }
    

    然后我们把  ScheduleTaskHostedService  这个类,注入到容器内,为了可扩展性,这种东西我们一般都写一个扩展类来实现,我们在新建一个 ServiceCollectionExtensions 类

    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    
    namespace GameApi.Quartz4Net
    {
        public static class ServiceCollectionExtensions
        {
            // 扩展类 添加 Quartz.NET 组件支持。
            public static IServiceCollection AddQuartz(this IServiceCollection services, IConfiguration configuration)
            {
                // 单例注入
                services.AddSingleton<IHostedService, ScheduleTaskHostedService>();
                // 配置
                services.Configure<QuartzOptions>(configuration.GetSection("Quartz"));
                return services;
            }
        }
    }
    

     

    然后在startup里,add一下

     services.AddQuartz(this.Configuration);

    至此,服务端的实现已经完毕,接下来就是怎么用了

    我们回头来解读一下apollo的配置文件,只说重要的部分

    DataSourceProvider 和 ConnectionString 是数据库相关的配置

    Tasks 这个就是一个job集合,可以是多个,下面是task子节点的说明

    JobName 名称,需要全局唯一,不可为空

    JobGroup 分组,不能为空

    ScheduleTaskType 任务类型,这个下面在定义客户端的时候会用到,因为是用反射执行代码,所以这个一定不可以错,错i的话任务创建成功了但是不会执行

    Type 任务执行的方式 目前只实现了基于Cron表达式的方式,其他的可以自己去查文档

    Cron 周期表达式 参考  https://cron.qqe2.com/

    Data 自定义参数

    Description 任务描述(可以用来看日志)

    4.4 执行任务

    上面已经把任务定义好了,那么怎么实现呢,在4..3结尾的地方说到了用反射的方式去执行任务,下面给出具体实现,其实就是实现上面定义的IScheduleTask接口

    先定义一个接口,继承 IScheduleTask 接口(面向对象已经过时了,要面向接口)

    using GameApi.Quartz4Net;
    
    namespace GameApi.Web.ScheduleTask
    {
        public interface ITest : IScheduleTask
        {
        }
    }

    注意,上面Apollo里面的ScheduleTaskType参数,是

    GameApi.Web.ScheduleTask.ITest

    这个接口的名字和namespace,要跟上面的配置文件对应上

    然后实现这个test接口

    using GameApi.Quartz4Net;
    using Microsoft.Extensions.Logging;
    using System;
    using System.Threading.Tasks;
    
    namespace GameApi.Web.ScheduleTask
    {
        public class Test : ITest
        {
            private readonly ILogger<Test> _logger;
            public Test(ILogger<Test> logger)
            {
                this._logger = logger;
            }
            public Task RunAsync(ScheduleTaskContext context)
            {
                this._logger.LogInformation($"当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
    
                return Task.CompletedTask;
            }
        }
    }
    

    同样,注入到service里面

    services.AddTransient<ITest, Test>();

    F5 运行 ,结果如下

    接下来多点部署,看会不会起冲突,其实就是试试

     ["quartz.jobStore.clustered"] = "true",

    这个参数好不好使

    起了8801,8802两个端口,结果如下

     结果证明,多点部署后,只会打到一个节点上,不会有多点消费的情况

  • 相关阅读:
    <转>使用IdentifyTask查询图层属性
    转:Java+blazeds+Flex的例子 .
    转 ArcGIS Runtime 加载SHAPE数据的另一种方式动态图层 .
    序列密码之A5
    哈希函数之MD5
    DjangoRestFramework使用总结
    公钥密码之RSA
    Request Line is too large (xxxx > 4094) 问题处理
    古典密码之仿射密码
    Linux重定向
  • 原文地址:https://www.cnblogs.com/ares-core/p/12989194.html
Copyright © 2011-2022 走看看