观察quartz 的sql日志, 会发现其中有很多的 job状态的转变, 那么 有哪些呢
qrtz_TRIGGERS 表的状态
qrtz_FIRED_TRIGGERS 也好, 状态最开始是 WAITING, 然后在时间窗口被扫描到 则获取锁准备执行, 获取锁成功则更新为 ACQUIRED, 但是 对于 它, 并没有 EXECUTING 状态
然后真正准备执行 是在qrtz_FIRED_TRIGGERS 进行状态改变的!
qrtz_SIMPLE_TRIGGERS
没有状态字段;
全部完毕则 COMPLETE, 获取不到锁则 BLOCKED ( 通常是 集群情况下 )
对于 qrtz_FIRED_TRIGGERS 新增的时候 状态就是 ACQUIRED,获取锁成功则更新为 EXECUTING, 然后执行完就删除之。
下面常量来自于org.quartz.impl.jdbcjobstore.Constants:
// MISC CONSTANTS String DEFAULT_TABLE_PREFIX = "QRTZ_"; // STATES String STATE_WAITING = "WAITING"; String STATE_ACQUIRED = "ACQUIRED"; String STATE_EXECUTING = "EXECUTING"; String STATE_COMPLETE = "COMPLETE"; String STATE_BLOCKED = "BLOCKED"; String STATE_ERROR = "ERROR"; String STATE_PAUSED = "PAUSED"; String STATE_PAUSED_BLOCKED = "PAUSED_BLOCKED"; String STATE_DELETED = "DELETED"; /** * @deprecated Whether a trigger has misfired is no longer a state, but * rather now identified dynamically by whether the trigger's next fire * time is more than the misfire threshold time in the past. */ String STATE_MISFIRED = "MISFIRED";
任务被调度的时候,状态如何变化?
这个需要观察数据库表,和sql 日志
重要方法是triggersFired,里面很多操作, 下面一一说来。
org.quartz.impl.jdbcjobstore.JobStoreSupport#triggerFired 关键代码:
if (job.isConcurrentExectionDisallowed()) { state = STATE_BLOCKED; force = false; try { getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_BLOCKED, STATE_WAITING); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_BLOCKED, STATE_ACQUIRED); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_PAUSED_BLOCKED, STATE_PAUSED); } catch (SQLException e) { throw new JobPersistenceException( "Couldn't update states of blocked triggers: " + e.getMessage(), e); } } if (trigger.getNextFireTime() == null) { state = STATE_COMPLETE; force = true; } storeTrigger(conn, trigger, job, true, state, force, false); job.getJobDataMap().clearDirtyFlag(); return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup() .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
首先是 调用selectTriggerState方法, 执行sql:
String SELECT_TRIGGER_STATE = "SELECT "
+ COL_TRIGGER_STATE + " FROM " + TABLE_PREFIX_SUBST
+ TABLE_TRIGGERS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_TRIGGER_NAME + " = ? AND "
+ COL_TRIGGER_GROUP + " = ?";
查询状态, 必须是 STATE_ACQUIRED, 否则不继续
if (!state.equals(STATE_ACQUIRED)) {
return null;
}
然后 retrieveJob,
就是 return getDelegate().selectJobDetail(conn, key,
getClassLoadHelper());
然后, 如果 if (trigger.getCalendarName() != null) 那么 retrieveCalendar
String SELECT_CALENDAR = "SELECT *" + " FROM "
+ TABLE_PREFIX_SUBST + TABLE_CALENDARS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_CALENDAR_NAME + " = ?";
然后 getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
String UPDATE_FIRED_TRIGGER = "UPDATE "
+ TABLE_PREFIX_SUBST + TABLE_FIRED_TRIGGERS + " SET "
+ COL_INSTANCE_NAME + " = ?, "
+ COL_FIRED_TIME + " = ?, " + COL_SCHED_TIME + " = ?, " + COL_ENTRY_STATE + " = ?, " + COL_JOB_NAME
+ " = ?, " + COL_JOB_GROUP + " = ?, " + COL_IS_NONCONCURRENT + " = ?, "
+ COL_REQUESTS_RECOVERY + " = ? WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_ENTRY_ID + " = ?";
关键是设置状态 STATE_EXECUTING
然后 // call triggered - to update the trigger's next-fire-time state...
trigger.triggered(cal);
trigger.triggered 主要是设置实例属性,主要是更新 previousFireTime、nextFireTime ;
实际执行看具体的 触发器, 如果是
然后,看是否允许并发执行.
如果执行有问题, 会释放锁 releaseAcquiredTrigger ——> updateTriggerStateFromOtherState
protected void releaseAcquiredTrigger(Connection conn, OperableTrigger trigger) throws JobPersistenceException { try { getDelegate().updateTriggerStateFromOtherState(conn, trigger.getKey(), STATE_WAITING, STATE_ACQUIRED); getDelegate().deleteFiredTrigger(conn, trigger.getFireInstanceId()); } catch (SQLException e) { throw new JobPersistenceException( "Couldn't release acquired trigger: " + e.getMessage(), e); } }
更新
String UPDATE_TRIGGER_STATE_FROM_STATE = "UPDATE "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " SET " + COL_TRIGGER_STATE
+ " = ?" + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_TRIGGER_NAME + " = ? AND "
+ COL_TRIGGER_GROUP + " = ? AND " + COL_TRIGGER_STATE + " = ?";
然后删除...
上面的一系列数据库操作执行没问题了, 才是真正的 job 执行:
通过JobRunShell
先 initialize , 主要是
JobDetail jobDetail = firedTriggerBundle.getJobDetail(); try { job = sched.getJobFactory().newJob(firedTriggerBundle, scheduler); 然后创建: this.jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job);
org.quartz.core.JobRunShell#run 方法值得看

public void run() { qs.addInternalSchedulerListener(this); try { OperableTrigger trigger = (OperableTrigger) jec.getTrigger(); JobDetail jobDetail = jec.getJobDetail(); do { JobExecutionException jobExEx = null; Job job = jec.getJobInstance(); try { begin(); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't begin execution.", se); break; } // notify job & trigger listeners... try { if (!notifyListenersBeginning(jec)) { break; } } catch(VetoedException ve) { try { CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null); qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode); // QTZ-205 // Even if trigger got vetoed, we still needs to check to see if it's the trigger's finalized run or not. if (jec.getTrigger().getNextFireTime() == null) { qs.notifySchedulerListenersFinalized(jec.getTrigger()); } complete(true); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error during veto of Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); } break; } long startTime = System.currentTimeMillis(); long endTime = startTime; // execute the job try { log.debug("Calling execute on job " + jobDetail.getKey()); job.execute(jec); endTime = System.currentTimeMillis(); } catch (JobExecutionException jee) { endTime = System.currentTimeMillis(); jobExEx = jee; getLog().info("Job " + jobDetail.getKey() + " threw a JobExecutionException: ", jobExEx); } catch (Throwable e) { endTime = System.currentTimeMillis(); getLog().error("Job " + jobDetail.getKey() + " threw an unhandled Exception: ", e); SchedulerException se = new SchedulerException( "Job threw an unhandled exception.", e); qs.notifySchedulerListenersError("Job (" + jec.getJobDetail().getKey() + " threw an exception.", se); jobExEx = new JobExecutionException(se, false); } jec.setJobRunTime(endTime - startTime); // notify all job listeners if (!notifyJobListenersComplete(jec, jobExEx)) { break; } CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP; // update the trigger try { instCode = trigger.executionComplete(jec, jobExEx); } catch (Exception e) { // If this happens, there's a bug in the trigger... SchedulerException se = new SchedulerException( "Trigger threw an unhandled exception.", e); qs.notifySchedulerListenersError( "Please report this error to the Quartz developers.", se); } // notify all trigger listeners if (!notifyTriggerListenersComplete(jec, instCode)) { break; } // update job/trigger or re-execute job if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) { jec.incrementRefireCount(); try { complete(false); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); } continue; } try { complete(true); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); continue; } qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode); break; } while (true); } finally { qs.removeInternalSchedulerListener(this); } }
关键步骤之 acquireNextTriggers
acquireNextTriggers -> selectTriggerToAcquire
String SELECT_NEXT_TRIGGER_TO_ACQUIRE = "SELECT "
+ COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", "
+ COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? "
+ "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" +COL_MISFIRE_INSTRUCTION+ " != -1 AND "+ COL_NEXT_FIRE_TIME + " >= ?)) "
+ "ORDER BY "+ COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC";
ps.setString(1, STATE_WAITING);
retrieveTrigger -> selectTrigger :
String SELECT_TRIGGER = "SELECT * FROM "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_TRIGGER_NAME + " = ? AND " + COL_TRIGGER_GROUP + " = ?";
selectJobDetail
String SELECT_JOB_DETAIL = "SELECT *" + " FROM "
+ TABLE_PREFIX_SUBST + TABLE_JOB_DETAILS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_JOB_NAME
+ " = ? AND " + COL_JOB_GROUP + " = ?";
updateTriggerStateFromOtherState
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
String UPDATE_TRIGGER_STATE_FROM_STATE = "UPDATE "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " SET " + COL_TRIGGER_STATE
+ " = ?" + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_TRIGGER_NAME + " = ? AND "
+ COL_TRIGGER_GROUP + " = ? AND " + COL_TRIGGER_STATE + " = ?";
然后insertFiredTrigger
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
String INSERT_FIRED_TRIGGER = "INSERT INTO "
+ TABLE_PREFIX_SUBST + TABLE_FIRED_TRIGGERS + " (" + COL_SCHEDULER_NAME + ", " + COL_ENTRY_ID
+ ", " + COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", "
+ COL_INSTANCE_NAME + ", "
+ COL_FIRED_TIME + ", " + COL_SCHED_TIME + ", " + COL_ENTRY_STATE + ", " + COL_JOB_NAME
+ ", " + COL_JOB_GROUP + ", " + COL_IS_NONCONCURRENT + ", "
+ COL_REQUESTS_RECOVERY + ", " + COL_PRIORITY
+ ") VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
getFiredTriggerRecordId 返回是什么 就是 fire trigger 表的 COL_ENTRY_ID : 格式: NON_CLUSTERED1627210623661
instanceName 默认是 NON_CLUSTERED,
validation: 只要数据库存在 上面新增的 ENTRY_ID,则返回true,否则 false; txValidator 仅在 acquireNextTrigger 发送JobPersistenceException 错误的时候起作用,
List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
String SELECT_INSTANCES_FIRED_TRIGGERS = "SELECT * FROM "
+ TABLE_PREFIX_SUBST
+ TABLE_FIRED_TRIGGERS
+ " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_INSTANCE_NAME + " = ?";