正如标题所示,文章主要是围绕Quartz.Net作业调度框架话题展开的,内容出自博主学习官方Examples的学习心得与体会,文中难免会有错误之处,还请指出得以指教。
一:触发的优先级
在往一个作业添加多个触发器之后,如果同一个作业多个触发时机在某个时间点同时触发,那么会先执行那个触发器执行的轮询呢?
触发的东西由触发器说了算,通过设置触发的Priority来解决先触发谁的问题。
可以通过一下代码设置触发的优先级(仅供参考):
ITrigger trigger1 = TriggerBuilder.Create() .WithIdentity("Priority1Trigger5SecondRepeat") .StartAt(startTime) .WithSimpleSchedule(x => x.WithRepeatCount(1).WithIntervalInSeconds(5)) .WithPriority(1) .ForJob(job) .Build();
设置的数值越大,相应的触发优先级就越高。
二:服务端与客户端
Quart.Net可以在客户端定义轮询作业,在服务端进行调度。
首先定义服务端,如下:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using Quartz.Impl; using Common.Logging; using System.Collections.Specialized; namespace Quartz.Examples { /// <author>Bill Kratzer</author> /// <author>Marko Lahma (.NET)</author> public class RemoteServerExample : IExample { public string Name { get { return GetType().Name; } } /// <summary> /// This example will start a server that will allow clients to remotely schedule jobs. /// </summary> /// <author> James House, Bill Kratzer /// </author> public virtual void Run() { ILog log = LogManager.GetLogger(typeof(RemoteServerExample)); NameValueCollection properties = new NameValueCollection(); properties["quartz.scheduler.instanceName"] = "RemoteServer"; // set thread pool info properties["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz"; properties["quartz.threadPool.threadCount"] = "5"; properties["quartz.threadPool.threadPriority"] = "Normal"; // set remoting exporter properties["quartz.scheduler.exporter.type"] = "Quartz.Simpl.RemotingSchedulerExporter, Quartz"; properties["quartz.scheduler.exporter.port"] = "555"; properties["quartz.scheduler.exporter.bindName"] = "QuartzScheduler"; properties["quartz.scheduler.exporter.channelType"] = "tcp"; properties["quartz.scheduler.exporter.channelName"] = "httpQuartz"; // reject non-local requests properties["quartz.scheduler.exporter.rejectRemoteRequests"] = "true"; ISchedulerFactory sf = new StdSchedulerFactory(properties); IScheduler sched = sf.GetScheduler(); log.Info("------- Initialization Complete -----------"); log.Info("------- Not scheduling any Jobs - relying on a remote client to schedule jobs --"); log.Info("------- Starting Scheduler ----------------"); // start the schedule sched.Start(); log.Info("------- Started Scheduler -----------------"); log.Info("------- Waiting 5 minutes... ------------"); // wait to give our jobs a chance to run try { Thread.Sleep(TimeSpan.FromMinutes(5)); } catch (ThreadInterruptedException) { } // shut down the scheduler log.Info("------- Shutting Down ---------------------"); sched.Shutdown(true); log.Info("------- Shutdown Complete -----------------"); SchedulerMetaData metaData = sched.GetMetaData(); log.Info("Executed " + metaData.NumberOfJobsExecuted + " jobs."); } } }
通过绑定的路径,端口,以及协议。等待客户端的接入。
客户端:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using Quartz.Impl; using System.Collections.Specialized; using Common.Logging; namespace Quartz.Examples { /// <summary> /// This example is a client program that will remotely /// talk to the scheduler to schedule a job. In this /// example, we will need to use the JDBC Job Store. The /// client will connect to the JDBC Job Store remotely to /// schedule the job. /// /// 在客户端绑定作业连接到服务端调度作业 /// 1.先开启服务器进行监听 /// 2.再开启客户端发送作业到服务器进行调度 /// </summary> /// <author>James House</author> /// <author>Bill Kratzer</author> /// <author>Marko Lahma (.NET)</author> public class RemoteClientExample : IExample { public virtual void Run() { ILog log = LogManager.GetLogger(typeof(RemoteClientExample)); NameValueCollection properties = new NameValueCollection(); properties["quartz.scheduler.instanceName"] = "RemoteClient"; // set thread pool info properties["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz"; properties["quartz.threadPool.threadCount"] = "5"; properties["quartz.threadPool.threadPriority"] = "Normal"; // set remoting exporter properties["quartz.scheduler.proxy"] = "true"; properties["quartz.scheduler.proxy.address"] = "tcp://127.0.0.1:555/QuartzScheduler"; // First we must get a reference to a scheduler ISchedulerFactory sf = new StdSchedulerFactory(properties); IScheduler sched = sf.GetScheduler(); // define the job and ask it to run IJobDetail job = JobBuilder.Create<SimpleJob>() .WithIdentity("remotelyAddedJob", "default") .Build(); JobDataMap map = job.JobDataMap; map.Put("msg", "Your remotely added job has executed!"); ITrigger trigger = TriggerBuilder.Create() .WithIdentity("remotelyAddedTrigger", "default") .ForJob(job.Key) .WithCronSchedule("/5 * * ? * *") .Build(); // schedule the job sched.ScheduleJob(job, trigger); log.Info("Remote job scheduled."); } public string Name { get { return null; } } } }
绑定和Server配置信息指定的路径端口协议等连接到服务端,在服务端进行调度实例的调度操作。
注意:先开启Server端,再启动服务端。
三:数据库支持
在调度作业很多之后,我们需要把这些调度数据管理起来,日积月累之后通过人工的方式明显不够明智,所以,由数据库来保存这些调度数据是更好的选择。
官方提供的各种数据库脚本下载地址:
https://github.com/quartznet/quartznet/tree/master/database/tables
在代码中的操作需要留意几个地方:
properties["quartz.jobStore.tablePrefix"] = "QRTZ_";// 数据库表名的前缀 properties["quartz.jobStore.clustered"] = "false";//是否群集 // properties["quartz.jobStore.lockHandler.type"] = "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz";
// 指定Sqlserver数据库引擎 properties["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.SqlServerDelegate, Quartz"; //一下三个关于数据源为"quartz"根据自己的数据库名称而定 properties["quartz.jobStore.dataSource"] = "quartz";//我的数据库取名为quartz,这里填写你们自己的数据库名称 properties["quartz.dataSource.quartz.connectionString"] = "Server=(local);Database=quartz;Trusted_Connection=True;";//我的数据库的连接字符串 properties["quartz.dataSource.quartz.provider"] = "SqlServer-20";//我的数据库的引擎 properties["quartz.dataSource.quartz.maxConnections"] = "5";//我的数据库最大连接数
加入你的数据库名称叫做“YouSelfDb”,那么配置信息应该写成这样:
properties["quartz.jobStore.dataSource"] = "YouSelfDb";//我的数据库取名为quartz,这里填写你们自己的数据库名称 properties["quartz.dataSource.YouSelfDb.connectionString"] = "Server=(local);Database=quartz;Trusted_Connection=True;";//我的数据库的连接字符串 properties["quartz.dataSource.YouSelfDb.provider"] = "SqlServer-20";//我的数据库的引擎 properties["quartz.dataSource.YouSelfDb.maxConnections"] = "5";/
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using Quartz.Impl; using System.Collections.Specialized; using Common.Logging; namespace Quartz.Examples { public class ClusterExample : IExample { private static readonly ILog log = LogManager.GetLogger(typeof(ClusterExample)); public virtual void Run(bool inClearJobs, bool inScheduleJobs) { NameValueCollection properties = new NameValueCollection(); properties["quartz.scheduler.instanceName"] = "TestScheduler"; properties["quartz.scheduler.instanceId"] = "instance_one"; properties["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz"; properties["quartz.threadPool.threadCount"] = "5"; properties["quartz.threadPool.threadPriority"] = "Normal"; properties["quartz.jobStore.misfireThreshold"] = "60000"; properties["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz"; properties["quartz.jobStore.useProperties"] = "false"; properties["quartz.jobStore.tablePrefix"] = "QRTZ_"; properties["quartz.jobStore.clustered"] = "false"; // if running SQLite we need this // properties["quartz.jobStore.lockHandler.type"] = "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz"; properties["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.SqlServerDelegate, Quartz"; //一下三个关于数据源为"quartz"根据自己的数据库名称而定 properties["quartz.jobStore.dataSource"] = "quartz"; properties["quartz.dataSource.quartz.connectionString"] = "Server=(local);Database=quartz;Trusted_Connection=True;"; properties["quartz.dataSource.quartz.provider"] = "SqlServer-20"; properties["quartz.dataSource.quartz.maxConnections"] = "5";//最大连接数 // First we must get a reference to a scheduler ISchedulerFactory sf = new StdSchedulerFactory(properties); IScheduler sched = sf.GetScheduler(); if (inClearJobs) { log.Warn("***** Deleting existing jobs/triggers *****"); sched.Clear(); } log.Info("------- Initialization Complete -----------"); if (inScheduleJobs) { log.Info("------- Scheduling Jobs ------------------"); string schedId = sched.SchedulerInstanceId; int count = 1; IJobDetail job = JobBuilder.Create<SimpleRecoveryJob>() .WithIdentity("job_" + count, schedId) // put triggers in group named after the cluster node instance just to distinguish (in logging) what was scheduled from where .RequestRecovery() // ask scheduler to re-execute this job if it was in progress when the scheduler went down... .Build(); ISimpleTrigger trigger = (ISimpleTrigger)TriggerBuilder.Create() .WithIdentity("triger_" + count, schedId) .StartAt(DateBuilder.FutureDate(1, IntervalUnit.Second)) .WithSimpleSchedule(x => x.WithRepeatCount(20).WithInterval(TimeSpan.FromSeconds(5))) .Build(); log.InfoFormat("{0} will run at: {1} and repeat: {2} times, every {3} seconds", job.Key, trigger.GetNextFireTimeUtc(), trigger.RepeatCount, trigger.RepeatInterval.TotalSeconds); count++; job = JobBuilder.Create<SimpleRecoveryJob>() .WithIdentity("job_" + count, schedId) // put triggers in group named after the cluster node instance just to distinguish (in logging) what was scheduled from where .RequestRecovery() // ask scheduler to re-execute this job if it was in progress when the scheduler went down... .Build(); trigger = (ISimpleTrigger)TriggerBuilder.Create() .WithIdentity("triger_" + count, schedId) .StartAt(DateBuilder.FutureDate(2, IntervalUnit.Second)) .WithSimpleSchedule(x => x.WithRepeatCount(20).WithInterval(TimeSpan.FromSeconds(5))) .Build(); log.Info(string.Format("{0} will run at: {1} and repeat: {2} times, every {3} seconds", job.Key, trigger.GetNextFireTimeUtc(), trigger.RepeatCount, trigger.RepeatInterval.TotalSeconds)); sched.ScheduleJob(job, trigger); count++; job = JobBuilder.Create<SimpleRecoveryStatefulJob>() .WithIdentity("job_" + count, schedId) // put triggers in group named after the cluster node instance just to distinguish (in logging) what was scheduled from where .RequestRecovery() // ask scheduler to re-execute this job if it was in progress when the scheduler went down... .Build(); trigger = (ISimpleTrigger)TriggerBuilder.Create() .WithIdentity("triger_" + count, schedId) .StartAt(DateBuilder.FutureDate(1, IntervalUnit.Second)) .WithSimpleSchedule(x => x.WithRepeatCount(20).WithInterval(TimeSpan.FromSeconds(3))) .Build(); log.Info(string.Format("{0} will run at: {1} and repeat: {2} times, every {3} seconds", job.Key, trigger.GetNextFireTimeUtc(), trigger.RepeatCount, trigger.RepeatInterval.TotalSeconds)); sched.ScheduleJob(job, trigger); count++; job = JobBuilder.Create<SimpleRecoveryJob>() .WithIdentity("job_" + count, schedId) // put triggers in group named after the cluster node instance just to distinguish (in logging) what was scheduled from where .RequestRecovery() // ask scheduler to re-execute this job if it was in progress when the scheduler went down... .Build(); trigger = (ISimpleTrigger)TriggerBuilder.Create() .WithIdentity("triger_" + count, schedId) .StartAt(DateBuilder.FutureDate(1, IntervalUnit.Second)) .WithSimpleSchedule(x => x.WithRepeatCount(20).WithInterval(TimeSpan.FromSeconds(4))) .Build(); log.Info(string.Format("{0} will run at: {1} & repeat: {2}/{3}", job.Key, trigger.GetNextFireTimeUtc(), trigger.RepeatCount, trigger.RepeatInterval)); sched.ScheduleJob(job, trigger); count++; job = JobBuilder.Create<SimpleRecoveryJob>() .WithIdentity("job_" + count, schedId) // put triggers in group named after the cluster node instance just to distinguish (in logging) what was scheduled from where .RequestRecovery() // ask scheduler to re-execute this job if it was in progress when the scheduler went down... .Build(); trigger = (ISimpleTrigger)TriggerBuilder.Create() .WithIdentity("triger_" + count, schedId) .StartAt(DateBuilder.FutureDate(1, IntervalUnit.Second)) .WithSimpleSchedule(x => x.WithRepeatCount(20).WithInterval(TimeSpan.FromMilliseconds(4500))) .Build(); log.Info(string.Format("{0} will run at: {1} & repeat: {2}/{3}", job.Key, trigger.GetNextFireTimeUtc(), trigger.RepeatCount, trigger.RepeatInterval)); sched.ScheduleJob(job, trigger); } // jobs don't start firing until start() has been called... log.Info("------- Starting Scheduler ---------------"); sched.Start(); log.Info("------- Started Scheduler ----------------"); log.Info("------- Waiting for one hour... ----------"); Thread.Sleep(TimeSpan.FromHours(1)); log.Info("------- Shutting Down --------------------"); sched.Shutdown(); log.Info("------- Shutdown Complete ----------------"); } public string Name { get { throw new NotImplementedException(); } } public void Run() { bool clearJobs = true; bool scheduleJobs = true; /* TODO for (int i = 0; i < args.Length; i++) { if (args[i].ToUpper().Equals("clearJobs".ToUpper())) { clearJobs = true; } else if (args[i].ToUpper().Equals("dontScheduleJobs".ToUpper())) { scheduleJobs = false; } } */ ClusterExample example = new ClusterExample(); example.Run(clearJobs, scheduleJobs); } } }
打开你的数据库YouSelfDb,这个库下面有相应的表存储相应的调度数据:比如
QRTZ_TRIGGERS表将会保存具体的作业调度信息,包括作业,触发器等信息。
QRTZ_JOB_DETAILS表保存作业数据。
QRTZ_SIMPLE_TRIGGERS表保存SimpleTirgger类的触发器数据。
QRTZ_CALENDARS表保存日历触发器数据。
QRTZ_CRON_TRIGGERS表保存Cron表达式的tirgger触发器数据。
等等。。。。。(不一 一例举了,你们自己看吧)
四,集群Cluster
在我看来所谓的集群就是一堆机器一起工作。
一堆作业一起调度,但作业与作业之间不会相互干扰,一个作业的停止运行不会影响到另外一个作业的正常工作。
通过properties["quartz.jobStore.clustered"] = "true";启动集群
而集群的代码例子和上面的DbSupport是一样的,这里不再重复贴出。