zoukankan      html  css  js  c++  java
  • 一文揭秘定时任务调度框架quartz

    之前写过quartz或者引用过quartz的一些文章,有很多人给我发消息问quartz的相关问题,

    quartz 报错:java.lang.classNotFoundException

    quartz源码分析之深刻理解job,sheduler,calendar,trigger及listener之间的关系

    Quartz框架多个trigger任务执行出现漏执行的问题分析--转

    quartz集群调度机制调研及源码分析---转载

    分布式定时任务调度系统技术选型--转

    趁着年底比较清闲,把quartz的问题整理了一下,顺带翻了翻源码,做了一些总结,希望能帮助到一些人或者减少人们探索的时间。

    注意,使用版本为quartz2.2.3  spring boot2.1.3

    1.quartz的核心组件

     1.1 Job组件

    1.1.1Job

    Job负责任务执行的逻辑,所有逻辑在execute()方法中,执行所需要的数据存放在JobExecutionContext 中

    Job实例:

    @PersistJobDataAfterExecution
    @DisallowConcurrentExecution
    public class ColorJob implements Job {
    
        private static Logger _log = LoggerFactory.getLogger(ColorJob.class);
        
        // parameter names specific to this job
        public static final String FAVORITE_COLOR = "favorite color";
        public static final String EXECUTION_COUNT = "count";
        
        // Since Quartz will re-instantiate a class every time it
        // gets executed, members non-static member variables can
        // not be used to maintain state!
        private int _counter = 1;
    
        /**
         * <p>
         * Empty constructor for job initialization
         * </p>
         * <p>
         * Quartz requires a public empty constructor so that the
         * scheduler can instantiate the class whenever it needs.
         * </p>
         */
        public ColorJob() {
        }
    
        /**
         * <p>
         * Called by the <code>{@link org.quartz.Scheduler}</code> when a
         * <code>{@link org.quartz.Trigger}</code> fires that is associated with
         * the <code>Job</code>.
         * </p>
         * 
         * @throws JobExecutionException
         *             if there is an exception while executing the job.
         */
        public void execute(JobExecutionContext context)
            throws JobExecutionException {
    
            // This job simply prints out its job name and the
            // date and time that it is running
            JobKey jobKey = context.getJobDetail().getKey();
            
            // Grab and print passed parameters
            JobDataMap data = context.getJobDetail().getJobDataMap();
            String favoriteColor = data.getString(FAVORITE_COLOR);
            int count = data.getInt(EXECUTION_COUNT);
            _log.info("ColorJob: " + jobKey + " executing at " + new Date() + "
    " +
                "  favorite color is " + favoriteColor + "
    " + 
                "  execution count (from job map) is " + count + "
    " + 
                "  execution count (from job member variable) is " + _counter);
            
            // increment the count and store it back into the 
            // job map so that job state can be properly maintained
            count++;
            data.put(EXECUTION_COUNT, count);
            
            // Increment the local member variable 
            // This serves no real purpose since job state can not 
            // be maintained via member variables!
            _counter++;
        }
    
    }

    1.1.2 JobDetail存储Job的信息

     主要负责

    1.指定执行的Job类,唯一标识(job名称和组别 名称)

    2.存储JobDataMap信息

        // job1 will only run 5 times (at start time, plus 4 repeats), every 10 seconds
        JobDetail job1 = newJob(ColorJob.class).withIdentity("job1", "group1").build();
    
        // pass initialization parameters into the job
        job1.getJobDataMap().put(ColorJob.FAVORITE_COLOR, "Green");
        job1.getJobDataMap().put(ColorJob.EXECUTION_COUNT, 1);

     数据库存储如下:

    1.1.3 Quartz JobBuilder提供了一个链式api创建JobDetail

    @Bean
    public JobDetail jobDetail() {
        return JobBuilder.newJob().ofType(SampleJob.class)
          .storeDurably()
          .withIdentity("Qrtz_Job_Detail")  
          .withDescription("Invoke Sample Job service...")
          .build();
    }

    1.1.4 Spring JobDetailFactoryBean

       spring提供的一个创建JobDetail的方式工厂bean

    @Bean
    public JobDetailFactoryBean jobDetail() {
        JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();
        jobDetailFactory.setJobClass(SampleJob.class);
        jobDetailFactory.setDescription("Invoke Sample Job service...");
        jobDetailFactory.setDurability(true);
        return jobDetailFactory;
    }

    1.2 Trigger组件

    trigger的状态不同

    trigger的状态
    
        // STATES
        String STATE_WAITING = "WAITING";
    
        String STATE_ACQUIRED = "ACQUIRED";
    
        String STATE_EXECUTING = "EXECUTING";
    
        String STATE_COMPLETE = "COMPLETE";
    
        String STATE_BLOCKED = "BLOCKED";
    
        String STATE_ERROR = "ERROR";
    
        String STATE_PAUSED = "PAUSED";
    
        String STATE_PAUSED_BLOCKED = "PAUSED_BLOCKED";
    
        String STATE_DELETED = "DELETED";

    状态的表结构

    trigger的类型

        // TRIGGER TYPES
        /** Simple Trigger type. */
        String TTYPE_SIMPLE = "SIMPLE";
    
        /** Cron Trigger type. */
        String TTYPE_CRON = "CRON";
    
        /** Calendar Interval Trigger type. */
        String TTYPE_CAL_INT = "CAL_INT";
    
        /** Daily Time Interval Trigger type. */
        String TTYPE_DAILY_TIME_INT = "DAILY_I";
    
        /** A general blob Trigger type. */
        String TTYPE_BLOB = "BLOB";

    对应表结构

    1.2.1 trigger实例

        SimpleTrigger trigger1 = newTrigger().withIdentity("trigger1", "group1").startAt(startTime)
            .withSchedule(simpleSchedule().withIntervalInSeconds(10).withRepeatCount(4)).build();

    Trigger存储在mysql中

     1.2.2 Quartz TriggerBuilder

    提供了一个链式创建Trigger的api

    @Bean
    public Trigger trigger(JobDetail job) {
        return TriggerBuilder.newTrigger().forJob(job)
          .withIdentity("Qrtz_Trigger")
          .withDescription("Sample trigger")
          .withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1))
          .build();
    }

    1.2.3 Spring SimpleTriggerFactoryBean

     spring提供的一个创建SimpleTrigger的工厂类

    @Bean
    public SimpleTriggerFactoryBean trigger(JobDetail job) {
        SimpleTriggerFactoryBean trigger = new SimpleTriggerFactoryBean();
        trigger.setJobDetail(job);
        trigger.setRepeatInterval(3600000);
        trigger.setRepeatCount(SimpleTrigger.REPEAT_INDEFINITELY);
        return trigger;
    }

    1.3 调度组件

    1.3.1 quartz提供的工厂类

    @Bean
    public Scheduler scheduler(Trigger trigger, JobDetail job) {
        StdSchedulerFactory factory = new StdSchedulerFactory();
        factory.initialize(new ClassPathResource("quartz.properties").getInputStream());
     
        Scheduler scheduler = factory.getScheduler();
        scheduler.setJobFactory(springBeanJobFactory());
        scheduler.scheduleJob(job, trigger);
     
        scheduler.start();
        return scheduler;
    }

    1.3.2 spring提供的工厂bean

    @Bean
    public SchedulerFactoryBean scheduler(Trigger trigger, JobDetail job) {
        SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
        schedulerFactory.setConfigLocation(new ClassPathResource("quartz.properties"));
     
        schedulerFactory.setJobFactory(springBeanJobFactory());
        schedulerFactory.setJobDetails(job);
        schedulerFactory.setTriggers(trigger);
        return schedulerFactory;
    }

    2.工作原理

      2.1 核心类QuartzScheduler

    Scheduler实现类StdScheduler封装了核心工作类QuartzScheduler

        /**
         * <p>
         * Construct a <code>StdScheduler</code> instance to proxy the given
         * <code>QuartzScheduler</code> instance, and with the given <code>SchedulingContext</code>.
         * </p>
         */
        public StdScheduler(QuartzScheduler sched) {
            this.sched = sched;
        }

      2.2 JobDetail的存取

        public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) throws SchedulerException {
            validateState();
    
            if (!storeNonDurableWhileAwaitingScheduling && !jobDetail.isDurable()) {
                throw new SchedulerException(
                        "Jobs added with no trigger must be durable.");
            }
    
            resources.getJobStore().storeJob(jobDetail, replace);
            notifySchedulerThread(0L);
            notifySchedulerListenersJobAdded(jobDetail);
        }

    2.2.1 存储JobDetail信息(以mysql Jdbc方式为例)

        /**
         * <p>
         * Insert or update a job.
         * </p>
         */
        protected void storeJob(Connection conn, 
                JobDetail newJob, boolean replaceExisting)
            throws JobPersistenceException {
    
            boolean existingJob = jobExists(conn, newJob.getKey());
            try {
                if (existingJob) {
                    if (!replaceExisting) { 
                        throw new ObjectAlreadyExistsException(newJob); 
                    }
                    getDelegate().updateJobDetail(conn, newJob);
                } else {
                    getDelegate().insertJobDetail(conn, newJob);
                }
            } catch (IOException e) {
                throw new JobPersistenceException("Couldn't store job: "
                        + e.getMessage(), e);
            } catch (SQLException e) {
                throw new JobPersistenceException("Couldn't store job: "
                        + e.getMessage(), e);
            }
        }

    调用StdJDBCDelegate实现

        /**
         * <p>
         * Insert the job detail record.
         * </p>
         * 
         * @param conn
         *          the DB Connection
         * @param job
         *          the job to insert
         * @return number of rows inserted
         * @throws IOException
         *           if there were problems serializing the JobDataMap
         */
        public int insertJobDetail(Connection conn, JobDetail job)
            throws IOException, SQLException {
            ByteArrayOutputStream baos = serializeJobData(job.getJobDataMap());
    
            PreparedStatement ps = null;
    
            int insertResult = 0;
    
            try {
                ps = conn.prepareStatement(rtp(INSERT_JOB_DETAIL));
                ps.setString(1, job.getKey().getName());
                ps.setString(2, job.getKey().getGroup());
                ps.setString(3, job.getDescription());
                ps.setString(4, job.getJobClass().getName());
                setBoolean(ps, 5, job.isDurable());
                setBoolean(ps, 6, job.isConcurrentExectionDisallowed());
                setBoolean(ps, 7, job.isPersistJobDataAfterExecution());
                setBoolean(ps, 8, job.requestsRecovery());
                setBytes(ps, 9, baos);
    
                insertResult = ps.executeUpdate();
            } finally {
                closeStatement(ps);
            }
    
            return insertResult;
        }

    注意:JobDataMap序列化后以Blob形式存储到数据库中

    StdJDBCConstants中执行sql如下:

        String INSERT_JOB_DETAIL = "INSERT INTO "
                + TABLE_PREFIX_SUBST + TABLE_JOB_DETAILS + " (" 
                + COL_SCHEDULER_NAME + ", " + COL_JOB_NAME
                + ", " + COL_JOB_GROUP + ", " + COL_DESCRIPTION + ", "
                + COL_JOB_CLASS + ", " + COL_IS_DURABLE + ", " 
                + COL_IS_NONCONCURRENT +  ", " + COL_IS_UPDATE_DATA + ", " 
                + COL_REQUESTS_RECOVERY + ", "
                + COL_JOB_DATAMAP + ") " + " VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?)";

    2.2.2  查询JobDetail

      强调一下,因JobDetail中的JobDataMap是以Blob形式存放到数据库中的(也可以通过useProperties属性修改成string存储,默认是false,Blob形式存储),所以查询时需要特殊处理:StdJDBCDelegate.java

    /**
         * <p>
         * Select the JobDetail object for a given job name / group name.
         * </p>
         * 
         * @param conn
         *          the DB Connection
         * @return the populated JobDetail object
         * @throws ClassNotFoundException
         *           if a class found during deserialization cannot be found or if
         *           the job class could not be found
         * @throws IOException
         *           if deserialization causes an error
         */
        public JobDetail selectJobDetail(Connection conn, JobKey jobKey,
                ClassLoadHelper loadHelper)
            throws ClassNotFoundException, IOException, SQLException {
            PreparedStatement ps = null;
            ResultSet rs = null;
    
            try {
                ps = conn.prepareStatement(rtp(SELECT_JOB_DETAIL));
                ps.setString(1, jobKey.getName());
                ps.setString(2, jobKey.getGroup());
                rs = ps.executeQuery();
    
                JobDetailImpl job = null;
    
                if (rs.next()) {
                    job = new JobDetailImpl();
    
                    job.setName(rs.getString(COL_JOB_NAME));
                    job.setGroup(rs.getString(COL_JOB_GROUP));
                    job.setDescription(rs.getString(COL_DESCRIPTION));
                    job.setJobClass( loadHelper.loadClass(rs.getString(COL_JOB_CLASS), Job.class));
                    job.setDurability(getBoolean(rs, COL_IS_DURABLE));
                    job.setRequestsRecovery(getBoolean(rs, COL_REQUESTS_RECOVERY));
    
                    Map<?, ?> map = null;
                    if (canUseProperties()) {
                        map = getMapFromProperties(rs);
                    } else {
                        map = (Map<?, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP);
                    }
    
                    if (null != map) {
                        job.setJobDataMap(new JobDataMap(map));
                    }
                }
    
                return job;
            } finally {
                closeResultSet(rs);
                closeStatement(ps);
            }
        }

    2.3 查询trigger

        /**
         * <p>
         * Retrieve the given <code>{@link org.quartz.Trigger}</code>.
         * </p>
         * 
         * @return The desired <code>Trigger</code>, or null if there is no
         *         match.
         */
        public OperableTrigger retrieveTrigger(final TriggerKey triggerKey) throws JobPersistenceException {
            return (OperableTrigger)executeWithoutLock( // no locks necessary for read...
                new TransactionCallback() {
                    public Object execute(Connection conn) throws JobPersistenceException {
                        return retrieveTrigger(conn, triggerKey);
                    }
                });
        }
        
        protected OperableTrigger retrieveTrigger(Connection conn, TriggerKey key)
            throws JobPersistenceException {
            try {
    
                return getDelegate().selectTrigger(conn, key);
            } catch (Exception e) {
                throw new JobPersistenceException("Couldn't retrieve trigger: "
                        + e.getMessage(), e);
            }
        }

    StdJDBCDelegate.java

       /**
         * <p>
         * Select a trigger.
         * </p>
         * 
         * @param conn
         *          the DB Connection
         * @return the <code>{@link org.quartz.Trigger}</code> object
         * @throws JobPersistenceException 
         */
        public OperableTrigger selectTrigger(Connection conn, TriggerKey triggerKey) throws SQLException, ClassNotFoundException,
                IOException, JobPersistenceException {
            PreparedStatement ps = null;
            ResultSet rs = null;
    
            try {
                OperableTrigger trigger = null;
    
                ps = conn.prepareStatement(rtp(SELECT_TRIGGER));
                ps.setString(1, triggerKey.getName());
                ps.setString(2, triggerKey.getGroup());
                rs = ps.executeQuery();
    
                if (rs.next()) {
                    String jobName = rs.getString(COL_JOB_NAME);
                    String jobGroup = rs.getString(COL_JOB_GROUP);
                    String description = rs.getString(COL_DESCRIPTION);
                    long nextFireTime = rs.getLong(COL_NEXT_FIRE_TIME);
                    long prevFireTime = rs.getLong(COL_PREV_FIRE_TIME);
                    String triggerType = rs.getString(COL_TRIGGER_TYPE);
                    long startTime = rs.getLong(COL_START_TIME);
                    long endTime = rs.getLong(COL_END_TIME);
                    String calendarName = rs.getString(COL_CALENDAR_NAME);
                    int misFireInstr = rs.getInt(COL_MISFIRE_INSTRUCTION);
                    int priority = rs.getInt(COL_PRIORITY);
    
                    Map<?, ?> map = null;
                    if (canUseProperties()) {
                        map = getMapFromProperties(rs);
                    } else {
                        map = (Map<?, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP);
                    }
                    
                    Date nft = null;
                    if (nextFireTime > 0) {
                        nft = new Date(nextFireTime);
                    }
    
                    Date pft = null;
                    if (prevFireTime > 0) {
                        pft = new Date(prevFireTime);
                    }
                    Date startTimeD = new Date(startTime);
                    Date endTimeD = null;
                    if (endTime > 0) {
                        endTimeD = new Date(endTime);
                    }
    
                    if (triggerType.equals(TTYPE_BLOB)) {
                        rs.close(); rs = null;
                        ps.close(); ps = null;
    
                        ps = conn.prepareStatement(rtp(SELECT_BLOB_TRIGGER));
                        ps.setString(1, triggerKey.getName());
                        ps.setString(2, triggerKey.getGroup());
                        rs = ps.executeQuery();
    
                        if (rs.next()) {
                            trigger = (OperableTrigger) getObjectFromBlob(rs, COL_BLOB);
                        }
                    }
                    else {
                        TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(triggerType);
                        
                        if(tDel == null)
                            throw new JobPersistenceException("No TriggerPersistenceDelegate for trigger discriminator type: " + triggerType);
    
                        TriggerPropertyBundle triggerProps = null;
                        try {
                            triggerProps = tDel.loadExtendedTriggerProperties(conn, triggerKey);
                        } catch (IllegalStateException isex) {
                            if (isTriggerStillPresent(ps)) {
                                throw isex;
                            } else {
                                // QTZ-386 Trigger has been deleted
                                return null;
                            }
                        }
    
                        TriggerBuilder<?> tb = newTrigger()
                            .withDescription(description)
                            .withPriority(priority)
                            .startAt(startTimeD)
                            .endAt(endTimeD)
                            .withIdentity(triggerKey)
                            .modifiedByCalendar(calendarName)
                            .withSchedule(triggerProps.getScheduleBuilder())
                            .forJob(jobKey(jobName, jobGroup));
        
                        if (null != map) {
                            tb.usingJobData(new JobDataMap(map));
                        }
        
                        trigger = (OperableTrigger) tb.build();
                        
                        trigger.setMisfireInstruction(misFireInstr);
                        trigger.setNextFireTime(nft);
                        trigger.setPreviousFireTime(pft);
                        
                        setTriggerStateProperties(trigger, triggerProps);
                    }                
                }
    
                return trigger;
            } finally {
                closeResultSet(rs);
                closeStatement(ps);
            }
        }

    执行的sql:

        String SELECT_TRIGGER = "SELECT * FROM "
                + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
                + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
                + " AND " + COL_TRIGGER_NAME + " = ? AND " + COL_TRIGGER_GROUP + " = ?";

    和JobDetail一样,也存在Blob的问题,不再赘述。

    2.4 调度执行线程QuartzSchedulerThread

     /**
         * <p>
         * The main processing loop of the <code>QuartzSchedulerThread</code>.
         * </p>
         */
        @Override
        public void run() {
            boolean lastAcquireFailed = false;
    
            while (!halted.get()) {
                try {
                    // check if we're supposed to pause...
                    synchronized (sigLock) {
                        while (paused && !halted.get()) {
                            try {
                                // wait until togglePause(false) is called...
                                sigLock.wait(1000L);
                            } catch (InterruptedException ignore) {
                            }
                        }
    
                        if (halted.get()) {
                            break;
                        }
                    }
    
                    int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                    if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
    
                        List<OperableTrigger> triggers = null;
    
                        long now = System.currentTimeMillis();
    
                        clearSignaledSchedulingChange();
                        try {
                            triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                    now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); //1.
                            lastAcquireFailed = false;
                            if (log.isDebugEnabled()) 
                                log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                        } catch (JobPersistenceException jpe) {
                            if(!lastAcquireFailed) {
                                qs.notifySchedulerListenersError(
                                    "An error occurred while scanning for the next triggers to fire.",
                                    jpe);
                            }
                            lastAcquireFailed = true;
                            continue;
                        } catch (RuntimeException e) {
                            if(!lastAcquireFailed) {
                                getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                                        +e.getMessage(), e);
                            }
                            lastAcquireFailed = true;
                            continue;
                        }
    
                        if (triggers != null && !triggers.isEmpty()) {
    
                            now = System.currentTimeMillis();
                            long triggerTime = triggers.get(0).getNextFireTime().getTime();
                            long timeUntilTrigger = triggerTime - now;
                            while(timeUntilTrigger > 2) {
                                synchronized (sigLock) {
                                    if (halted.get()) {
                                        break;
                                    }
                                    if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                        try {
                                            // we could have blocked a long while
                                            // on 'synchronize', so we must recompute
                                            now = System.currentTimeMillis();
                                            timeUntilTrigger = triggerTime - now;
                                            if(timeUntilTrigger >= 1)
                                                sigLock.wait(timeUntilTrigger);
                                        } catch (InterruptedException ignore) {
                                        }
                                    }
                                }
                                if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                                    break;
                                }
                                now = System.currentTimeMillis();
                                timeUntilTrigger = triggerTime - now;
                            }
    
                            // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
                            if(triggers.isEmpty())
                                continue;
    
                            // set triggers to 'executing'
                            List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
    
                            boolean goAhead = true;
                            synchronized(sigLock) {
                                goAhead = !halted.get();
                            }
                            if(goAhead) {
                                try {
                                    List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); //2
                                    if(res != null)
                                        bndles = res;
                                } catch (SchedulerException se) {
                                    qs.notifySchedulerListenersError(
                                            "An error occurred while firing triggers '"
                                                    + triggers + "'", se);
                                    //QTZ-179 : a problem occurred interacting with the triggers from the db
                                    //we release them and loop again
                                    for (int i = 0; i < triggers.size(); i++) {
                                        qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                    }
                                    continue;
                                }
    
                            }
    
                            for (int i = 0; i < bndles.size(); i++) {
                                TriggerFiredResult result =  bndles.get(i);
                                TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                                Exception exception = result.getException();
    
                                if (exception instanceof RuntimeException) {
                                    getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                    continue;
                                }
    
                                // it's possible to get 'null' if the triggers was paused,
                                // blocked, or other similar occurrences that prevent it being
                                // fired at this time...  or if the scheduler was shutdown (halted)
                                if (bndle == null) {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                    continue;
                                }
    
                                JobRunShell shell = null;
                                try {
                                    shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                    shell.initialize(qs);
                                } catch (SchedulerException se) {
                                    qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                    continue;
                                }
    
                                if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                    // this case should never happen, as it is indicative of the
                                    // scheduler being shutdown or a bug in the thread pool or
                                    // a thread pool being used concurrently - which the docs
                                    // say not to do...
                                    getLog().error("ThreadPool.runInThread() return false!");
                                    qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                }
    
                            }
    
                            continue; // while (!halted)
                        }
                    } else { // if(availThreadCount > 0)
                        // should never happen, if threadPool.blockForAvailableThreads() follows contract
                        continue; // while (!halted)
                    }
    
                    long now = System.currentTimeMillis();
                    long waitTime = now + getRandomizedIdleWaitTime();
                    long timeUntilContinue = waitTime - now;
                    synchronized(sigLock) {
                        try {
                          if(!halted.get()) {
                            // QTZ-336 A job might have been completed in the mean time and we might have
                            // missed the scheduled changed signal by not waiting for the notify() yet
                            // Check that before waiting for too long in case this very job needs to be
                            // scheduled very soon
                            if (!isScheduleChanged()) {
                              sigLock.wait(timeUntilContinue);
                            }
                          }
                        } catch (InterruptedException ignore) {
                        }
                    }
    
                } catch(RuntimeException re) {
                    getLog().error("Runtime error occurred in main trigger firing loop.", re);
                }
            } // while (!halted)
    
            // drop references to scheduler stuff to aid garbage collection...
            qs = null;
            qsRsrcs = null;
        }

    2.4.1 获取trigger(红色1)

    protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
            throws JobPersistenceException {
            if (timeWindow < 0) {
              throw new IllegalArgumentException();
            }
            
            List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
            Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
            final int MAX_DO_LOOP_RETRY = 3;
            int currentLoopCount = 0;
            do {
                currentLoopCount ++;
                try {
                    List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
                    
                    // No trigger is ready to fire yet.
                    if (keys == null || keys.size() == 0)
                        return acquiredTriggers;
    
                    long batchEnd = noLaterThan;
    
                    for(TriggerKey triggerKey: keys) {
                        // If our trigger is no longer available, try a new one.
                        OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
                        if(nextTrigger == null) {
                            continue; // next trigger
                        }
                        
                        // If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then
                        // put it back into the timeTriggers set and continue to search for next trigger.
                        JobKey jobKey = nextTrigger.getJobKey();
                        JobDetail job;
                        try {
                            job = retrieveJob(conn, jobKey);
                        } catch (JobPersistenceException jpe) {
                            try {
                                getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
                                getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR);
                            } catch (SQLException sqle) {
                                getLog().error("Unable to set trigger state to ERROR.", sqle);
                            }
                            continue;
                        }
                        
                        if (job.isConcurrentExectionDisallowed()) {
                            if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
                                continue; // next trigger
                            } else {
                                acquiredJobKeysForNoConcurrentExec.add(jobKey);
                            }
                        }
                        
                        if (nextTrigger.getNextFireTime().getTime() > batchEnd) {
                          break;
                        }
                        // We now have a acquired trigger, let's add to return list.
                        // If our trigger was no longer in the expected state, try a new one.
                        int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
                        if (rowsUpdated <= 0) {
                            continue; // next trigger
                        }
                        nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
                        getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
    
                        if(acquiredTriggers.isEmpty()) {
                            batchEnd = Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;
                        }
                        acquiredTriggers.add(nextTrigger);
                    }
    
                    // if we didn't end up with any trigger to fire from that first
                    // batch, try again for another batch. We allow with a max retry count.
                    if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
                        continue;
                    }
                    
                    // We are done with the while loop.
                    break;
                } catch (Exception e) {
                    throw new JobPersistenceException(
                              "Couldn't acquire next trigger: " + e.getMessage(), e);
                }
            } while (true);
            
            // Return the acquired trigger list
            return acquiredTriggers;
        }

    2.4.2 触发trigger(红色2)

     protected TriggerFiredBundle triggerFired(Connection conn,
                OperableTrigger trigger)
            throws JobPersistenceException {
            JobDetail job;
            Calendar cal = null;
    
            // Make sure trigger wasn't deleted, paused, or completed...
            try { // if trigger was deleted, state will be STATE_DELETED
                String state = getDelegate().selectTriggerState(conn,
                        trigger.getKey());
                if (!state.equals(STATE_ACQUIRED)) {
                    return null;
                }
            } catch (SQLException e) {
                throw new JobPersistenceException("Couldn't select trigger state: "
                        + e.getMessage(), e);
            }
    
            try {
                job = retrieveJob(conn, trigger.getJobKey());
                if (job == null) { return null; }
            } catch (JobPersistenceException jpe) {
                try {
                    getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
                    getDelegate().updateTriggerState(conn, trigger.getKey(),
                            STATE_ERROR);
                } catch (SQLException sqle) {
                    getLog().error("Unable to set trigger state to ERROR.", sqle);
                }
                throw jpe;
            }
    
            if (trigger.getCalendarName() != null) {
                cal = retrieveCalendar(conn, trigger.getCalendarName());
                if (cal == null) { return null; }
            }
    
            try {
                getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
            } catch (SQLException e) {
                throw new JobPersistenceException("Couldn't insert fired trigger: "
                        + e.getMessage(), e);
            }
    
            Date prevFireTime = trigger.getPreviousFireTime();
    
            // call triggered - to update the trigger's next-fire-time state...
            trigger.triggered(cal);
    
            String state = STATE_WAITING;
            boolean force = true;
            
            if (job.isConcurrentExectionDisallowed()) {
                state = STATE_BLOCKED;
                force = false;
                try {
                    getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                            STATE_BLOCKED, STATE_WAITING);
                    getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                            STATE_BLOCKED, STATE_ACQUIRED);
                    getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                            STATE_PAUSED_BLOCKED, STATE_PAUSED);
                } catch (SQLException e) {
                    throw new JobPersistenceException(
                            "Couldn't update states of blocked triggers: "
                                    + e.getMessage(), e);
                }
            } 
                
            if (trigger.getNextFireTime() == null) {
                state = STATE_COMPLETE;
                force = true;
            }
    
            storeTrigger(conn, trigger, job, true, state, force, false);
    
            job.getJobDataMap().clearDirtyFlag();
    
            return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()
                    .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger
                    .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
        }

     2.4.3 数据库锁

     StdRowLockSemaphore针对支持select for update的数据库如mysql

    UpdateLockRowSemaphore针对不支持select for update的数据库如mssqlserver

     StdRowLockSemaphore的实现如下:

        public static final String SELECT_FOR_LOCK = "SELECT * FROM "
                + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
                + " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";
    
        public static final String INSERT_LOCK = "INSERT INTO "
            + TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" 
            + SCHED_NAME_SUBST + ", ?)"; 

    总结:

      1.quartz的三大组件Job/trigger/scheduler,job负责业务逻辑,trigger负责执行时机,scheduler负责调度Job和trigger来执行。

      2.使用mysql作为存储的话,使用StdJDBCDelegate和数据库进行交互,交互的sql在StdJDBCConstants中定义

       3.QuartzScheduler是核心类,Scheduler做其代理,真正执行的是QuartzSchedulerThread

       4.JobStore存储控制,JobStoreSupport的两个实现JobStoreCMT容器管理事务,不需要使用commit和rollback;JobStoreTX用在单机环境,需要处理commit和rollback

    5.数据库锁使用了悲观锁select for update,定义为Semaphore

    6.qrtz_scheduler_state定义了扫描间隔集群扫描间隔

    参考文献:

    【1】https://www.baeldung.com/spring-quartz-schedule

     【2】https://blog.csdn.net/xiaojin21cen/article/details/79298883

  • 相关阅读:
    如何在SQL Server 2005 中为安装程序增加计数器注册表项值
    C++ 基础小知识学习[1]
    同时支持火狐和IE的输入框内容改变事件
    3D圆角
    Jquery选择器测试
    asp.net 中的 MD5加密和DES加解密算法类
    简单页面控件赋值
    继承IHttpHandler接口实现给网站图片添加水印
    asp.net Ajax
    JQuery操作iframe父页面与子页面的元素与方法
  • 原文地址:https://www.cnblogs.com/davidwang456/p/10329616.html
Copyright © 2011-2022 走看看