zoukankan      html  css  js  c++  java
  • Quartz源码——JobStore保存JonDetail和Trigger源码分析(一)

    我都是分析的jobStore 方式为jdbc的SimpleTrigger!RAM的方式类似分析方式!

    {0} :表的前缀 ,如表qrtz_trigger ,{0}== qrtz_

    {1}:quartz.properties 中配置的 org.quartz.scheduler.instanceName: myInstanceName ,{1} ==myInstanceName

    要使用定时器,并讲任务持久到数据库,我们一定明白JobDetail和Trigger是如何操作进入数据库,如何注册到Scheduler中!前面我分析了Scheduler的start()和QuartzSchedulerThread的run()!这些都是要在下面讲的这个流程的基础上!

    那么下面就开始分析源码,分析源码前首先加一个小的demo,方面解释!

    		//1.创建Scheduler的工厂
    		SchedulerFactory sf = new StdSchedulerFactory();
    		//2.从工厂中获取调度器实例
    		Scheduler scheduler = sf.getScheduler();
    		//3.创建JobDetail
    		JobDetail jb = JobBuilder.newJob(RAMJob.class)
    				.withDescription("this is a ram job") //job的描述
    				.withIdentity("ramJob", "ramGroup") //job 的name和group
    				.build();
    		
    		//任务运行的时间,SimpleSchedle类型触发器有效
    		long time=  System.currentTimeMillis() + 3*1000L; //3秒后启动任务
    		Date statTime = new Date(time);
    		
    		//4.创建Trigger
    			//使用SimpleScheduleBuilder或者CronScheduleBuilder
    		Trigger t = TriggerBuilder.newTrigger()
    					.withDescription("")
    					.withIdentity("ramTrigger", "ramTriggerGroup")
    					//.withSchedule(SimpleScheduleBuilder.simpleSchedule())
    					.startAt(statTime)  //默认当前时间启动
    					.withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?")) //两秒执行一次
    					.build();
    		
    		//5.注册任务和定时器
    		scheduler.scheduleJob(jb, t);//源码分析
    		
    		//6.启动 调度器
    		scheduler.start();
    

    真正的源码分析开始了!!!!
    主要分析下面这段,从这里进去,看看源码里面到底是怎么进行处理的!

    scheduler.scheduleJob(jb, t);//源码分析

    /**
         * <p>
         * Calls the equivalent method on the 'proxied' <code>QuartzScheduler</code>.
         * 调用“代理”<QuartzScheduler>上的等效方法。
         * 
         * 实现 在 StdScheduler.scheduleJob(JobDetail jobDetail, Trigger trigger)
         * </p>
         */
        public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
            throws SchedulerException {
            //这里的sched 是 QuartzScheduler 对象,Quartz和核心类,Quartz调度器
            return sched.scheduleJob(jobDetail, trigger);//保存JobDetail和Trigger
        }
    
    

    ched.scheduleJob(jobDetail, trigger);//保存JobDetail和Trigger

    /**
         * <p>
         * Add the <code>{@link org.quartz.Job}</code> identified by the given
         * <code>{@link org.quartz.JobDetail}</code> to the Scheduler, and
         * associate the given <code>{@link org.quartz.Trigger}</code> with it.
         * </p>
         * 
         * <p>
         * If the given Trigger does not reference any <code>Job</code>, then it
         * will be set to reference the Job passed with it into this method.
         * </p>
         * 
         * @throws SchedulerException
         *           if the Job or Trigger cannot be added to the Scheduler, or
         *           there is an internal Scheduler error.
         * 将给定org.quartz.JobDetail标识的org.quartz.Job添加到Scheduler,
         * 并将给定的org.quartz.Trigger与其关联。
         * 如果给定的触发器不引用任何作业,则它将被设置为引用与其一起传递的作业到此方法中。
         * 
         * 实现在 QuartzScheduler.scheduleJob(JobDetail jobDetail,
         *       Trigger trigger)
         */
        public Date scheduleJob(JobDetail jobDetail,
                Trigger trigger) throws SchedulerException {
            validateState();//验证调度器是否关闭,关闭抛出异常
    
    		//检查 jobDetail和trigger 
            if (jobDetail == null) {
                throw new SchedulerException("JobDetail cannot be null");
            }
            
            if (trigger == null) {
                throw new SchedulerException("Trigger cannot be null");
            }
            
            if (jobDetail.getKey() == null) {
                throw new SchedulerException("Job's key cannot be null");
            }
    
            if (jobDetail.getJobClass() == null) {
                throw new SchedulerException("Job's class cannot be null");
            }
            
            OperableTrigger trig = (OperableTrigger)trigger;
            
    		//getJobKey 获取 getJobName(), getJobGroup() 
            if (trigger.getJobKey() == null) {
                trig.setJobKey(jobDetail.getKey());
            } else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
                throw new SchedulerException(
                    "Trigger does not reference given job!");
            }
    		//验证trigger
            trig.validate();
    
            Calendar cal = null;
            if (trigger.getCalendarName() != null) {
                cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());//检索Calendar
               "SELECT * FROM {0}CALENDARS WHERE SCHED_NAME = {1} AND CALENDAR_NAME = ?"
            }
            //在触发器首次添加到调度程序时由调度程序调用,以便让触发器基于任何关联的日历计算
            //其第一次触发时间。调用此方法后,getNextFireTime()应返回有效的答案。
            Date ft = trig.computeFirstFireTime(cal);
    
            if (ft == null) {
                throw new SchedulerException(
                        "Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
            }
    
    		//存储给定的org.quartz.JobDetail和org.quartz.Trigger。
            resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
            notifySchedulerListenersJobAdded(jobDetail);
            notifySchedulerThread(trigger.getNextFireTime().getTime());
            notifySchedulerListenersSchduled(trigger);
    
            return ft;
        }
    
    

    //存储给定的org.quartz.JobDetail和org.quartz.Trigger
    resources.getJobStore().storeJobAndTrigger(jobDetail, trig);

    public void storeJobAndTrigger(final JobDetail newJob,
                final OperableTrigger newTrigger) 
            throws JobPersistenceException {
            executeInLock(
                (isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null,
                new VoidTransactionCallback() {
                    public void executeVoid(Connection conn) throws JobPersistenceException {
    	                //(1)保存JobDetail
                        storeJob(conn, newJob, false);
                        //(2)保存Trigger
                        storeTrigger(conn, newTrigger, newJob, false,
                                Constants.STATE_WAITING, false, false);
                    }
                });
        }
    
    

    //(1)保存JobDetail
    storeJob(conn, newJob, false);

     protected void storeJob(Connection conn, 
                JobDetail newJob, boolean replaceExisting)
            throws JobPersistenceException {
    
    		//判断JobDetail是否已经存在,根据name和group
            boolean existingJob = jobExists(conn, newJob.getKey());
            try {
                if (existingJob) {
                    if (!replaceExisting) { 
                        throw new ObjectAlreadyExistsException(newJob); 
                    }
                    //更新JobDetail
                    getDelegate().updateJobDetail(conn, newJob);
                    "UPDATE {0}JOB_DETAILS SET DESCRIPTION = ?, JOB_CLASS_NAME = ?, IS_DURABLE = ?, IS_NONCONCURRENT = ?, IS_UPDATE_DATA = ?, REQUESTS_RECOVERY = ?, JOB_DATA = ?  WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
                } else {
    	            //插入JobDetail
                    getDelegate().insertJobDetail(conn, newJob);
                    "INSERT INTO {0}JOB_DETAILS (SCHED_NAME, JOB_NAME, JOB_GROUP, DESCRIPTION, JOB_CLASS_NAME, IS_DURABLE, IS_NONCONCURRENT, IS_UPDATE_DATA, REQUESTS_RECOVERY, JOB_DATA)  VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
                }
            } catch (IOException e) {
                throw new JobPersistenceException("Couldn't store job: "
                        + e.getMessage(), e);
            } catch (SQLException e) {
                throw new JobPersistenceException("Couldn't store job: "
                        + e.getMessage(), e);
            }
        }
    

    //(2)保存Trigger
    storeTrigger(conn, newTrigger, newJob, false,
    Constants.STATE_WAITING, false, false);

    
    protected void storeTrigger(Connection conn,
                OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state,
                boolean forceState, boolean recovering)
            throws JobPersistenceException {
    
    		//判断Trigger是否已经存在,根据name和group
            boolean existingTrigger = triggerExists(conn, newTrigger.getKey());
    
            if ((existingTrigger) && (!replaceExisting)) { 
                throw new ObjectAlreadyExistsException(newTrigger); 
            }
            
            try {
    
                boolean shouldBepaused;
    			//进行一些状态的判断
                if (!forceState) {
                    shouldBepaused = getDelegate().isTriggerGroupPaused(
                            conn, newTrigger.getKey().getGroup());
    
                    if(!shouldBepaused) {
                        shouldBepaused = getDelegate().isTriggerGroupPaused(conn,
                                ALL_GROUPS_PAUSED);
    
                        if (shouldBepaused) {
                            getDelegate().insertPausedTriggerGroup(conn, newTrigger.getKey().getGroup());
                        }
                    }
    
                    if (shouldBepaused && (state.equals(STATE_WAITING) || state.equals(STATE_ACQUIRED))) {
                        state = STATE_PAUSED;
                    }
                }
    			//若job为null,重新获取!
                if(job == null) {
                    job = getDelegate().selectJobDetail(conn, newTrigger.getJobKey(), getClassLoadHelper());
                    
    				"SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
                }
                if (job == null) {
                    throw new JobPersistenceException("The job ("
                            + newTrigger.getJobKey()
                            + ") referenced by the trigger does not exist.");
                }
    			//判断是否有DisallowConcurrentExecution注解,recovering恢复
                if (job.isConcurrentExectionDisallowed() && !recovering) { 
                    state = checkBlockedState(conn, job.getKey(), state);
    			"if (jobName != null) {
                    ps = conn.prepareStatement(rtp(SELECT_FIRED_TRIGGERS_OF_JOB));
    		                "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
                    ps.setString(1, jobName);
                    ps.setString(2, groupName);
                } else {
                    ps = conn
                            .prepareStatement(rtp(SELECT_FIRED_TRIGGERS_OF_JOB_GROUP));
                            "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND JOB_GROUP = ?"
                    ps.setString(1, groupName);
                }"
                }
                
                if (existingTrigger) {
    	            //更新trigger
                    getDelegate().updateTrigger(conn, newTrigger, state, job);
      "  大概贴出一些,具体想看完整的进入源码看
    	  if(updateJobData) {
    	      ps = conn.prepareStatement(rtp(UPDATE_TRIGGER));
    	      "UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ?, JOB_DATA = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
           } else {
               ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_SKIP_DATA));
               "UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
           }
    "
                } else {
    				//插入trigger
                    getDelegate().insertTrigger(conn, newTrigger, state, job);
                    "INSERT INTO {0}TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY)  VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
                }
            } catch (Exception e) {
                throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '" 
                        + newTrigger.getJobKey() + "' job:" + e.getMessage(), e);
            }
        }
    
    
    

    到这里,保存就结束了!

    欢迎 一起讨论,学习!互相成长!

    附学习博文地址:quartz2.2源码分析4-JobStore


    欢迎访问我的csdn博客,我们一同成长!

    "不管做什么,只要坚持下去就会看到不一样!在路上,不卑不亢!"

    博客首页http://blog.csdn.net/u010648555

  • 相关阅读:
    使用python写天气预告
    beef配合ettercap批量劫持内网的浏览器
    html布局
    python 使用paramiko模块上传本地文件到ssh
    mysql一些函数的记录
    python与ssh交互
    html笔记4
    html笔记3
    html笔记2
    html笔记1
  • 原文地址:https://www.cnblogs.com/aflyun/p/6768964.html
Copyright © 2011-2022 走看看