zoukankan      html  css  js  c++  java
  • 通过源码分析Java开源任务调度框架Quartz的主要流程

    通过源码分析Java开源任务调度框架Quartz的主要流程

    从使用效果、调用链路跟踪、E-R图、循环调度逻辑几个方面分析Quartz。

    github项目地址: https://github.com/tanliwei/spring-quartz-cluster-sample , 补充了SQL输出

    系统说明:

    IDE: IntelliJ

    JDK:1.8

    Quartz:2.2.1

    使用效果

    1.从github项目https://github.com/tanliwei/spring-quartz-cluster-sample中,拉取项目到本地,导入IDEA。

        相信读者都有一定工作经验,这些细节不赘述。

    2.本文采用Mysql数据库。

        请执行 resources/scripts/tables_mysql_innodb.sql

    3.修改jdbc.properties中数据库配置

    4.通过IDEA, Edit Configurations -> Add Tomcat Server, 部署到Tomcat

    暴露的Restful 接口 /say-hello.do 以及添加好任务后的调用效果:


     

    添加任务

    在tomcat启动成功后,在首页点击“添加任务”,添加如下任务:


     

    代码执行逻辑在SyncJobFactory类中,从Output中可以看到执行的输出信息,

    调用链跟踪的最后会回到这个类来。

    现在开始跟踪调用链路。 

    IDEA 快捷键:
    进入方法:  Ctrl + 鼠标左键
    光标前进/后退: Ctrl + Shirt + 右方向键/左方向键
     
     
    一、 调用链路跟踪

    从配置文件applicationContext.xml配置中找到任务调度核心类SchedulerFactoryBean

     resources/applicationContext.xml

    <bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
    ...
    </bean>

    使用IDEA快捷键,点击进入SchedulerFactoryBean类,它实现了InitializingBean接口,

    在Spring中凡是实现了InitializingBean接口的Bean,都会在Bean属性都设置完成后调用afterPropertiesSet()方法.

     SchedulerFactoryBean.java

    //---------------------------------------------------------------------
    // Implementation of InitializingBean interface
    // 实现 InitializingBean 接口
    //---------------------------------------------------------------------
    public void afterPropertiesSet() throws Exception {
        //...
        // Create SchedulerFactory instance.
        // 创建 SchedulerFactory 调度器工厂实例
        SchedulerFactory schedulerFactory = (SchedulerFactory)
                BeanUtils.instantiateClass(this.schedulerFactoryClass);
        initSchedulerFactory(schedulerFactory);
        //...
        // Get Scheduler instance from SchedulerFactory.
        // 通过调度器工厂 获取 调度器实例
        try {
            this.scheduler = createScheduler(schedulerFactory, this.schedulerName);
        //...
    }

     SchedulerFactoryBean.java

    /**
     * Create the Scheduler instance for the given factory and scheduler name.
     * 通过制定工厂和调度器名称创建调度器实例
     * Called by {@link #afterPropertiesSet}.
     * <p>The default implementation invokes SchedulerFactory's <code>getScheduler</code>
     * method. Can be overridden for custom Scheduler creation.
     */
    protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
            throws SchedulerException {
        //...
        try {
            SchedulerRepository repository = SchedulerRepository.getInstance();
            synchronized (repository) {
                Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
                Scheduler newScheduler = schedulerFactory.getScheduler();
                if (newScheduler == existingScheduler) {
                    throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
                            "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
                }
                //...
    }

     这个项目走的逻辑是 StdSchedulerFactory.getScheduler()方法,可自行debug。

     StdSchedulerFactory.java

    /**
     * Returns a handle to the Scheduler produced by this factory.
     * 返回该工厂创造的调度器的句柄
     */
    public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }
    
        SchedulerRepository schedRep = SchedulerRepository.getInstance();
    
        Scheduler sched = schedRep.lookup(getSchedulerName());
        //...
        sched = instantiate();
        return sched;
    }

    StdSchedulerFactory.java

    private Scheduler instantiate() throws SchedulerException {
        //...
        //大量的配置初始化、实例化代码
        //...
        //第1298行代码
        qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
        //...
    }

    QuartzScheduler.java

    /**
     * Create a <code>QuartzScheduler</code> with the given configuration
     * 根据给定的配置 创建Quartz调度器
     */
    public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
            throws SchedulerException {
            this.resources = resources;
            if (resources.getJobStore() instanceof JobListener) {
                addInternalJobListener((JobListener)resources.getJobStore());
            }
            //private QuartzSchedulerThread schedThread;
            this.schedThread = new QuartzSchedulerThread(this, resources);
            ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
            //通过线程池执行 Quartz调度器线程
            schedThreadExecutor.execute(this.schedThread);
            //...
    }

     QuartzSchedulerThread.java

    /**
     * <p>
     * The main processing loop of the <code>QuartzSchedulerThread</code>.
     * Quartz调度器线程的主循环逻辑
     * </p>
     */
    @Override
    public void run() {
        //while循环执行,只要调度器为被暂停
        while(!halted.get()){
                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            }
                            if (qsRsrcs.getThreadPool().runInThread(shell) == false){}
    
        }
    }

     JobRunShell.java

    public void run() {
            //...
            Job job = jec.getJobInstance();
            //...
            try {
                log.debug("Calling execute on job " + jobDetail.getKey());
                //执行
                job.execute(jec);
                endTime = System.currentTimeMillis();
            }
            //...
            //更新Trigger触发器状态,删除FIRED_TRIGGERS触发记录
            instCode = trigger.executionComplete(jec, jobExEx);
            //...
    }

    QuartzJobBean.java

    /**
     * This implementation applies the passed-in job data map as bean property
     * values, and delegates to <code>executeInternal</code> afterwards.
     * 这个实现 把传入的map数据作为bean属性值,然后委托给 executeInternal 方法
     */
    public final void execute(JobExecutionContext context) throws JobExecutionException {
        try {
        //执行
        executeInternal(context);
    }

      SyncJobFactory.java

    //回到了我们的业务类SyncJobFactory的executeInternal方法,
    //里面执行我们的业务代码
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        try {
            LOG.info("SyncJobFactory execute" + IPAddressKowalski.getIpAddressAndPort() + " port:"+IPAddressKowalski.getTomcatPort());
        }
        //...
        System.out.println("jobName:" + scheduleJob.getJobName() + "  " + scheduleJob);
        //...
    }

     二、E-R图

    梳理6张主要的Quartz表:

     

     
    QRTZ_TRIGGERS 触发器表

        SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。 联合主键,QRTZ_JOB_DETAILS表SCHED_NAME外键

        JOB_NAME,任务名。自定义值。 联合主键,QRTZ_JOB_DETAILS表JOB_NAME外键

        JOB_GROUP,任务组。 自定义值。联合主键,QRTZ_JOB_DETAILS表JOB_GROUP外键

        TRIGGER_STATE,触发器状态: WAITING , ACQUIRED, BLOCKING

        NEXT_FIRE_TIME, 下次触发时间:

        MISFIRE_INSTR,执行失败后的指令,

            非失败策略 MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1; 

            失败策略 MISFIRE_INSTRUCTION_SMART_POLICY = 0;

        TRIGGER_TYPE, 触发器类型,例如CRON,cron表达式类型的触发器

        PRIORITY,优先级

    QRTZ_CRON_TRIGGERS cron类型触发器表

        SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。 联合主键,QRTZ_TRIGGERS表SCHED_NAME外键

        JOB_NAME,任务名。自定义值。 联合主键,QRTZ_TRIGGERS表JOB_NAME外键

        JOB_GROUP,任务组。 自定义值。联合主键,QRTZ_TRIGGERS表JOB_GROUP外键

        CRON_EXPRESSION, cron表达式, 例如每30秒执行一次, 0/30 * * * * ?

    QRTZ_JOB_DETAILS 任务详细表

        SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

        JOB_NAME,任务名。自定义值。 联合主键

        JOB_GROUP,任务组。 自定义值。联合主键

        JOB_DATA,blob类型,任务参数

    QRTZ_FIRED_TRIGGERS 任务触发表

        SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

        ENTRY_ID,entry id,联合主键

        JOB_NAME,任务名。自定义值。 

        JOB_GROUP,任务组。 自定义值。

        FIRED_TIME, 任务触发时间

        STATE,状态

        INSTANCE_NAME, 服务器实例名

        PRIORITY,优先级

    QRTZ_SCHEDULER_STATE 

        SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

        INSTANCE_NAME,服务器实例名。联合主键

        LAST_CHECKIN_TIME,上次检查时间

        CHECKIN_INTERVAL,检查间隔

    QRTZ_LOCKS 全局锁

        SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

        LOCK_NAME,锁名称,例如,TRIGGER_ACCESS。联合主键

       

    三、循环调度逻辑

        主要流程如下:

        源码如下:

    QuartzSchedulerThread.java

     public void run() {
            //...
            while (!halted.get()) {
                try {
                    //合理休眠
                    //...
                            //获取接下来的触发器
                            //1.状态为WAITING
                            //2.触发时间在30秒内
                            //3.不是错过执行的或者错过了但是时间不超过两分钟
                            triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                    now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                           
                                    //... 
                                    //触发任务
                                    List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                    //...
                                JobRunShell shell = null;
                                //...
                                //执行代码
                                if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                //...
            } // while (!halted)
            //..
        } 

     JobRunShell.java

        protected QuartzScheduler qs = null;
        
        public void run() {
            qs.addInternalSchedulerListener(this);
            try {
                //...
                do {
                    Job job = jec.getJobInstance();
                    // execute the job
                    try {
                        //执行任务代码
                        job.execute(jec);
                    //更新触发器,删除触发记录
                    qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
                    break;
                } while (true);
            } 
        //...
        }

    四、扩展

    除了对主线程 QuartzSchedulerThread 的分析

    继续分析JobStoreSupport类的两个线程 ClusterManager 和 MisfireHandler 的分析, 它们维护触发器的MISFIRE_INSTR状态,和调度器状态QRTZ_SCHEDULER_STATE。

  • 相关阅读:
    FLASK报错,TypeError,需要valid response
    pycharm push failed
    pycharm环境艰难安装requirements.txt库文件
    git如何忽略部分不想备份的文件
    redis.exceptions.ConnectionError,目标计算机积极拒绝
    Mysql无法启动
    pymysql
    mysql数据类型
    MySQL命令(SQL语句)
    MySQL安装(windows10)
  • 原文地址:https://www.cnblogs.com/tanliwei/p/10020787.html
Copyright © 2011-2022 走看看