zoukankan      html  css  js  c++  java
  • Quartz Scheduler调度流程分析

    date: 2019-08-31

    Demo

    QuartzSample.java

    public class QuartzSample {
        public static void main(String[] args) throws SchedulerException {
            StdSchedulerFactory sf = new StdSchedulerFactory();
            Scheduler scheduler = sf.getScheduler();
            scheduler.start();
    
            JobDetail job = JobBuilder
                    .newJob(SampleJob.class)
                    .withIdentity("job01", "group01")
                    .build();
    
            String cron = "0 00 10 * * ?";
            CronTrigger cronTrigger = TriggerBuilder
                    .newTrigger()
                    .withIdentity("cronTrigger")
                    .forJob("job01", "group01")
                    .withSchedule(CronScheduleBuilder.cronSchedule(cron))
                    .startNow()
                    .build();
    
            scheduler.scheduleJob(job, cronTrigger);
        }
    }
    

    SampleJob.java

    public class SampleJob implements Job {
    
        public void execute(JobExecutionContext jobExecutionContext) {
            System.out.println("hello quartz!");
        }
    }
    

    其中SampleJob是用户自定义的Job类,用于处理业务逻辑。
    整个调度过程都是围绕Scheduler类进行的,关键的三个语句分别如下:

    • 1.Scheduler scheduler = sf.getScheduler();
    • 2.scheduler.scheduleJob(job, cronTrigger);
    • 3.scheduler.start();

    时序图如下:
    时序图

    创建调度器

    StdSchedulerFactory.getScheduler()源码:

    public Scheduler getScheduler() throws SchedulerException {
        // 读取quartz配置文件,未指定则顺序遍历各个path下的quartz.properties文件
        // 解析出quartz配置内容和环境变量,存入PropertiesParser对象
        // PropertiesParser组合了Properties(继承Hashtable),定义了一系列对Properties的操作方法,比如getPropertyGroup()批量获取相同前缀的配置。配置内容和环境变量存放在Properties成员变量中
        if (cfg == null) {
            initialize();
        }
    
         // 获取调度器池,采用了单例模式
        // 其实,调度器池的核心变量就是一个hashmap,每个元素key是scheduler名,value是scheduler实例
        // getInstance()用synchronized防止并发创建
        SchedulerRepository schedRep = SchedulerRepository.getInstance();
    
        // 从调度器池中取出当前配置所用的调度器
        Scheduler sched = schedRep.lookup(getSchedulerName());
    
        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }
    
        // 如果调度器池中没有当前配置的调度器,则实例化一个调度器,主要动作包括:
        // 1)初始化threadPool(线程池):开发者可以通过org.quartz.threadPool.class配置指定使用哪个线程池类,比如SimpleThreadPool。先class load线程池类,接着动态生成线程池实例bean,然后通过反射,使用setXXX()方法将以org.quartz.threadPool开头的配置内容赋值给bean成员变量;
        // 2)初始化jobStore(任务存储方式):开发者可以通过org.quartz.jobStore.class配置指定使用哪个任务存储类,比如RAMJobStore。先class load任务存储类,接着动态生成实例bean,然后通过反射,使用setXXX()方法将以org.quartz.jobStore开头的配置内容赋值给bean成员变量;
        // 3)初始化dataSource(数据源):开发者可以通过org.quartz.dataSource配置指定数据源详情,比如哪个数据库、账号、密码等。jobStore要指定为JDBCJobStore,dataSource才会有效;
        // 4)初始化其他配置:包括SchedulerPlugins、JobListeners、TriggerListeners等;
        // 5)初始化threadExecutor(线程执行器):默认为DefaultThreadExecutor;
        // 6)创建工作线程:根据配置创建N个工作thread,执行start()启动thread,并将N个thread顺序add进threadPool实例的空闲线程列表availWorkers中;
        // 7)创建调度器线程:创建QuartzSchedulerThread实例,并通过threadExecutor.execute(实例)启动调度器线程;
        // 8)创建调度器:创建StdScheduler实例,将上面所有配置和引用组合进实例中,并将实例存入调度器池中
        sched = instantiate();
    
        return sched;
    }
    

    下面从instantiate方法中抽出一些关键代码进行分析:

    ThreadPool tp = null;
    QuartzScheduler qs = null;
    tp.initialize();
    qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
    Scheduler scheduler = instantiate(rsrcs, qs);
    return scheduler;
    
    • 1.创建工作线程
      SimpleThreadPool.initialize()关键源码:
    ...
    // create the worker threads and start them
    Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
    while(workerThreads.hasNext()) {
    	WorkerThread wt = workerThreads.next();
    	wt.start();
    	availWorkers.add(wt);
    }
    

    创建若干个Worker线程,并且调用线程的start方法启动各个线程。

    • 2.创建调度器线程
      QuartzScheduler.QuartzScheduler(...):
    this.schedThread = new QuartzSchedulerThread(this, resources);
    

    创建QuartzSchedulerThread实例。

    QuartzSchedulerThread.QuartzSchedulerThread(...):

    ...
    // start the underlying thread, but put this object into the 'paused'
    // state
    // so processing doesn't start yet...
    paused = true;
    halted = new AtomicBoolean(false);
    

    初始化paused和halted变量,用于控制该线程运行(run方法)。

    • 3.创建调度器
    StdSchedulerFactory.instantiate():
    Scheduler scheduler = new StdScheduler(qs);
    return scheduler;
    

    创建StdScheduler实例并返回。

    调度器绑定任务和触发器

    QuartzScheduler.scheduleJob(JobDetail jobDetail,Trigger trigger):

    public Date scheduleJob(JobDetail jobDetail,
    		Trigger trigger) throws SchedulerException {
    	// 检查调度器是否开启,如果关闭则throw异常到上层
    	validateState();
    	...
    	// 获取trigger首次触发job的时间,以此时间为起点,每隔一段指定的时间触发job
    	Date ft = trig.computeFirstFireTime(cal);
    	...
    	// 把job和trigger注册进调度器的jobStore
    	resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
    	// 通知job监听者
    	notifySchedulerListenersJobAdded(jobDetail);
    	// 通知调度器线程
    	notifySchedulerThread(trigger.getNextFireTime().getTime());
    	// 通知trigger监听者
    	notifySchedulerListenersSchduled(trigger);
    
    	return ft;
    }
    

    调度器开始调度任务

    QuartzScheduler.start():

    public void start() throws SchedulerException {
    	...
    	// 这句最关键,通过变量使调度器线程跳出一个无限循环,开始轮询所有trigger触发job
    	schedThread.togglePause(false);
    	...
    }
    

    QuartzSchedulerThread.togglePause(boolean pause):

    void togglePause(boolean pause) {
        synchronized (sigLock) {
            paused = pause;
    
            if (paused) {
                signalSchedulingChange(0);
            } else {
                sigLock.notifyAll();
            }
        }
    }
    

    QuartzSchedulerThread.run():

    // 调度器线程一旦启动,将一直运行此方法
    @Override
    public void run() {
    	...
        // while()无限循环,每次循环取出时间将到的trigger,触发对应的job,直到调度器线程被关闭
        // halted是一个AtomicBoolean类变量,有个volatile int变量value,其get()方法仅仅简单的一句return value != 0,get()返回结果表示调度器线程是否开关
        // volatile修饰的变量,存取必须走内存,不能通过cpu缓存,这样一来get总能获得set的最新真实值,因此volatile变量适合用来存放简单的状态信息
        // 顾名思义,AtomicBoolean要解决原子性问题,但volatile并不能保证原子性,详见http://blog.csdn.net/wxwzy738/article/details/43238089
        while (!halted.get()) {
    		// check if we're supposed to pause...
    		// sigLock是个Object对象,被用于加锁同步
    		// 需要用到wait(),必须加到synchronized块内
    		synchronized (sigLock) {
    			while (paused && !halted.get()) {
    				try {
    					// wait until togglePause(false) is called...
    					// 这里会不断循环等待,直到QuartzScheduler.start()调用了togglePause(false)
    					// 调用wait(),调度器线程进入休眠状态,同时sigLock锁被释放
    					// togglePause(false)获得sigLock锁,将paused置为false,使调度器线程能够退出此循环,同时执行sigLock.notifyAll()唤醒调度器线程
    					sigLock.wait(1000L);
    				} catch (InterruptedException ignore) {
    				}
    
    				// reset failure counter when paused, so that we don't
    				// wait again after unpausing
    				acquiresFailed = 0;
    			}
    
    			if (halted.get()) {
    				break;
    			}
    		}
    
    		...
    		int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
    		if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
    
    			...
    			// 获取马上到时间的trigger
    			// 允许取出的trigger个数不能超过一个阀值,这个阀值是线程池个数与org.quartz.scheduler.batchTriggerAcquisitionMaxCount配置值间的最小者
    
    			// 调度器在trigger队列中寻找30秒内一定数目的trigger(需要保证集群节点的系统时间一致)
    			triggers = qsRsrcs.getJobStore().acquireNextTriggers(
    					now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
    			...
    
    			if (triggers != null && !triggers.isEmpty()) {
    				...
    
    				// 触发trigger
    				List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
    				if(res != null)
    					bndles = res;
    
    				for (int i = 0; i < bndles.size(); i++) {
    					...
    					JobRunShell shell = null;
    					shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
    					shell.initialize(qs);
    					...
    
    					// 执行与trigger绑定的job
    					// shell是JobRunShell对象,实现了Runnable接口
    					// SimpleThreadPool.runInThread(Runnable)从线程池空闲列表中取出一个工作线程
    					// 工作线程执行WorkerThread.run(Runnable),详见下方WorkerThread的讲解
    					if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
    						...
    					}
    				}
    
    				continue; // while (!halted)
    			}
    		} else { continue;}
    		...
    
        } // while (!halted)
    	...
    }
    

    SimpleThreadPool.runInThread(Runnable runnable):

    if (!isShutdown) {
        WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
        busyWorkers.add(wt);
        wt.run(runnable);
    } else {
        ...
    }
    

    从availWorkers中取出一个工作线程执行其run(Runnable newRunnable)方法。

    WorkerThread.run(Runnable newRunnable):

    public void run(Runnable newRunnable) {
        synchronized(lock) {
            if(runnable != null) {
                throw new IllegalStateException("Already running a Runnable!");
            }
    
            runnable = newRunnable;
            lock.notifyAll();
        }
    }
    

    WorkerThread.run():

    @Override
    public void run() {
    	boolean ran = false;
    	
    	while (run.get()) {
    		try {
    			synchronized(lock) {
    				while (runnable == null && run.get()) {
    					lock.wait(500);
    				}
    
    				if (runnable != null) {
    					ran = true;
    					runnable.run();
    				}
    			}
    		} 
    		...
    	}
    }
    

    此时执行的是JobRunShell中的run方法。

    JobRunShell.run():

    public void run() {
    	Job job = jec.getJobInstance();
    	...
    	job.execute(jec);
    	...
    }
    

    至此,已经形成了一个完整的执行过程。

    总的来说,核心代码就是在QuartzSchedulerThread.run()方法while循环中调用sigLock.wait(),等待可以跳出while循环的条件成立,当条件成立时,立马调度sigLock.notifyAll()使线程跳出while。通过这样的代码,可以实现调度器线程等待启动、工作线程等待job等功能。


    参考:

  • 相关阅读:
    2018-8-18 训练神经网络笔记
    ffmpeg解码视频为图片和将图片合成一个MP4视频
    minikube start error
    按顺序将目录下的所有文件的绝对路径写入文件中
    ssh远程免密登录
    Ubuntu默认的awk一直报语法错误
    ffmpeg常用操作
    ssh免密登录server
    cv::namedWindow是非线程安全的
    lingcrypt源码安装undefined reference to ...
  • 原文地址:https://www.cnblogs.com/cloudflow/p/13894300.html
Copyright © 2011-2022 走看看