zoukankan      html  css  js  c++  java
  • Spring Batch(4): Job详解

     

    Spring Batch(4): Job详解

     分类:
     

    目录(?)[+]

     

    第四章 配置作业Job

    4.1 基本配置

    Job的配置有3个必须的属性,name,jobRepository,steps。一个简单的Job配置如下:

    <job id="footballJob">
        <step id="playerload"          parent="s1" next="gameLoad"/>
        <step id="gameLoad"            parent="s2" next="playerSummarization"/>
        <step id="playerSummarization" parent="s3"/>
    </job>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 1
    • 2
    • 3
    • 4
    • 5

    jobRepository默认引用名称为jobRepository的bean,当然也可以显式地配置:

    <job id="footballJob" job-repository="specialRepository">
        <step id="playerload"          parent="s1" next="gameLoad"/>
        <step id="gameLoad"            parent="s3" next="playerSummarization"/>
        <step id="playerSummarization" parent="s3"/>
    </job>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 1
    • 2
    • 3
    • 4
    • 5
    4.1.1 Restartable属性

    该属性定义Job是否可以被重启,默认为true,在JobExecution执行失败后,可以创建另一个JobExecution来继续上次的执行。但是如果该属性设为false,重新执行该JobInstance将抛出异常。

    <job id="footballJob" restartable="false">
        ...
    </job>
    • 1
    • 2
    • 3
    • 1
    • 2
    • 3
    4.1.2 拦截Job执行

    spring Batch在Job的生命周期中提供了一些钩子方法,可这些钩子方法通过Listener的形式提供。JobListener的接口定义如下:

    public interface JobExecutionListener {
    
        void beforeJob(JobExecution jobExecution);
    
        void afterJob(JobExecution jobExecution);
    
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    通过实现JobExecutionListener接口并配置给Job,可以在Job执行前后执行特定的逻辑。例如在执行结束之后,如果失败,发送邮件通知管理人员等。

    <job id="footballJob">
        <step id="playerload"          parent="s1" next="gameLoad"/>
        <step id="gameLoad"            parent="s2" next="playerSummarization"/>
        <step id="playerSummarization" parent="s3"/>
        <listeners>
            <listener ref="sampleListener"/>
        </listeners>
    </job>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    需要注意的是,无论Job是否成功执行,afterJob方法都会执行,Job是否执行成功,可以从JobExecution中获取。

    public void afterJob(JobExecution jobExecution){
        if( jobExecution.getStatus() == BatchStatus.COMPLETED ){
            //job success
        }
        else if(jobExecution.getStatus() == BatchStatus.FAILED){
            //job failure
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    Listener的执行顺序: 
    beforeJob与配置的顺序一样,afterJob与配置的顺序相反。

    Listener异常: 
    Listener的执行过程中如果抛出异常,将导致Job无法继续完成,最终状态为FAILED.因此要合理控制Listener异常对业务的影响。

    注解支持: 
    如果不想使用侵入性强的Listener接口,可以使用@BeforeJob和@AfterJob两个注解声明。

    4.1.3 Job抽象与继承

    通用的Job配置可以抽取出来,作为抽象的Job存在,抽象的Job不允许被实例化:

    <job id="baseJob" abstract="true">
        <listeners>
            <listener ref="listenerOne"/>
        <listeners>
    </job>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 1
    • 2
    • 3
    • 4
    • 5

    子Job可以通过继续共用这些配置(当然,也可以继承非抽象的Job)。

    <job id="job1" parent="baseJob">
        <step id="step1" parent="standaloneStep"/>
    
        <listeners merge="true">
            <listener ref="listenerTwo"/>
        <listeners>
    </job>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    其中的merge=”true”表示合并父job和子job的配置,也就是两个Listener都生效,同常规的Spring配置。

    4.1.4 Job参数验证

    JobParameterValidator组件用于验证JobParameter。通过以下配置为job配置验证器:

    <job id="job1" parent="baseJob3">
        <step id="step1" parent="standaloneStep"/>
        <validator ref="paremetersValidator"/>
    </job>
    • 1
    • 2
    • 3
    • 4
    • 1
    • 2
    • 3
    • 4
    4.1.4 属性的Late Binding

    在Spring中,可以把Bean配置用到的属性值通过PropertiesPlaceHolderConfiguer把属性从配置中分离出来独立管理,理论上来说,在配置Job的时候也可以使用相同的方式。但是Spring Batch提供了在运行时配置参数值的能力:

    <bean:property name="filePath" value="#{jobParameters['filePath']}" />
    • 1
    • 1

    在启动Job时:

        launcher.executeJob("job.xml" , "footjob",
            new JobParametersBuilder().addDate("day", new Date()))
                                      .addString("filePath", "/opt/data/test.xml"));
    • 1
    • 2
    • 3
    • 1
    • 2
    • 3

    4.2 配置JobRepository

    JobRepository为任务框架中的各个组件对象提供CRUD操作,例如JobExecution,StepExecution。 
    一个配置例子如下:

    <job-repository id="jobRepository"
        data-source="dataSource"
        transaction-manager="transactionManager"
        isolation-level-for-create="SERIALIZABLE"
        table-prefix="BATCH_"
        max-varchar-length="1000"/>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    4.2.1 事务配置

    JobRepository的操作需要事务来保证其完整性以及正确性,这些元数据的完整性对框架来说非常重要。如果没有事务支持,框架的行为将无法正确定义。 
    create*方法的事务隔离级别单独定义,为了保证同一个JobInstance不会被同时执行两次,默认的隔离级别为SERIALIZABLE,可以被修改:

    <job-repository id="jobRepository"
                    isolation-level-for-create="REPEATABLE_READ" />
    • 1
    • 2
    • 1
    • 2

    如果没有使用Batch命名空间或者没有使用Factory Bean,则需要显示配置事务AOP:

    <aop:config>
        <aop:advisor
               pointcut="execution(* org.springframework.batch.core..*Repository+.*(..))"/>
        <advice-ref="txAdvice" />
    </aop:config>
    
    <tx:advice id="txAdvice" transaction-manager="transactionManager">
        <tx:attributes>
            <tx:method name="*" />
        </tx:attributes>
    </tx:advice>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    4.2.2 表名前缀

    默认情况下,Spring Batch需要的表以BATCH作为前缀,不过可以自定义:

    <job-repository id="jobRepository"
                    table-prefix="e_batch" />
    • 1
    • 2
    • 1
    • 2

    表前缀可以修改,但是表名和表的列不能被修改。

    4.2.3 特殊的Repository

    测试环境中,内存级别的数据库十分方便:

    <bean id="jobRepository"
      class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="transactionManager"/>
    </bean>
    • 1
    • 2
    • 3
    • 4
    • 1
    • 2
    • 3
    • 4

    如果使用的数据库类型不在SpringBatch的支持中,可以通过JobRepositoryFactoryBean自定义。

    4.3 配置JobLauncher

    默认提供了一个简单的Launcher:

    <bean id="jobLauncher"
          class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
    </bean>
    • 1
    • 2
    • 3
    • 4
    • 1
    • 2
    • 3
    • 4

    JobLauncher的时序图如下:

    这里写图片描述

    如果启动的请求来自HTTP,那么等待整个Job完成再返回不是一个好方法,此时需要异步启动Job,时序图如下:

    这里写图片描述

    相应的Launcher配置如下:

    <bean id="jobLauncher"
          class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
        <property name="taskExecutor">
            <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
        </property>
    </bean>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    4.4 运行Job

    有多种方式可以启动一个Job,但是核心都是通过JobLauncher来实现。 
    1. 命令行运行 
    主要通过CommandLineJobRunner类完成 


    2. 从Web容器中运行 
    通过Http请求启动任务很常见,时序图如下: 
    这里写图片描述

    Controller可以是常规的Spring MVC Controller:

    @Controller
    public class JobLauncherController {
    
        @Autowired
        JobLauncher jobLauncher;
    
        @Autowired
        Job job;
    
        @RequestMapping("/jobLauncher.html")
        public void handle() throws Exception{
            jobLauncher.run(job, new JobParameters());
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14



    3. 使用调度框架运行 
    可以与其他调度框架一起使用,例如使用Spring的轻量级调用框架Spring Scheduler或者Quartz

    4.5 元数据的高级用法

    除了通过JobRepository对元数据进行CRUD操作外,Spring batch还提供另外的接口用于访问元数据。 
    包括: JobExplorer JobOperator。整体结构如下: 
    这里写图片描述

    4.5.1 JobExplorer

    该组件提供了只读的查询操作,是JobRepository的只读版本,接口定义如下:

    public interface JobExplorer {
    
        List<JobInstance> getJobInstances(String jobName, int start, int count);
    
        JobExecution getJobExecution(Long executionId);
    
        StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);
    
        JobInstance getJobInstance(Long instanceId);
    
        List<JobExecution> getJobExecutions(JobInstance jobInstance);
    
        Set<JobExecution> findRunningJobExecutions(String jobName);
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    配置一个Bean如下:

    <bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
          p:dataSource-ref="dataSource" />
    • 1
    • 2
    • 1
    • 2

    如果需要制定表名前缀:

    <bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
          p:dataSource-ref="dataSource" p:tablePrefix="BATCH_" />
    • 1
    • 2
    • 1
    • 2
    4.5.2 JobOperator

    JobOperator集成了很多接口定义,提供了综合的操作方法。定义如下:

    public interface JobOperator {
    
        List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;
    
        List<Long> getJobInstances(String jobName, int start, int count)
              throws NoSuchJobException;
    
        Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;
    
        String getParameters(long executionId) throws NoSuchJobExecutionException;
    
        Long start(String jobName, String parameters)
              throws NoSuchJobException, JobInstanceAlreadyExistsException;
    
        Long restart(long executionId)
              throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                      NoSuchJobException, JobRestartException;
    
        Long startNextInstance(String jobName)
              throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                     JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;
    
        boolean stop(long executionId)
              throws NoSuchJobExecutionException, JobExecutionNotRunningException;
    
        String getSummary(long executionId) throws NoSuchJobExecutionException;
    
        Map<Long, String> getStepExecutionSummaries(long executionId)
              throws NoSuchJobExecutionException;
    
        Set<String> getJobNames();
    
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    配置:

    <bean id="jobOperator" class="org.spr...SimpleJobOperator">
        <property name="jobExplorer">
            <bean class="org.spr...JobExplorerFactoryBean">
                <property name="dataSource" ref="dataSource" />
            </bean>
        </property>
        <property name="jobRepository" ref="jobRepository" />
        <property name="jobRegistry" ref="jobRegistry" />
        <property name="jobLauncher" ref="jobLauncher" />
    </bean>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    其中的startNextInstance方法将使用当前Job的JobParameter,经过JobParametersIncrementer处理之后的参数启动一个JobInstance。

    public interface JobParametersIncrementer {
    
        JobParameters getNext(JobParameters parameters);
    
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 1
    • 2
    • 3
    • 4
    • 5

    下面是一个简单实现:

    public class SampleIncrementer implements JobParametersIncrementer {
    
        public JobParameters getNext(JobParameters parameters) {
            if (parameters==null || parameters.isEmpty()) {
                return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
            }
            long id = parameters.getLong("run.id",1L) + 1;
            return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    为job配置incrementer:

    <job id="footballJob" incrementer="sampleIncrementer">
        ...
    </job>
    • 1
    • 2
    • 3
    • 1
    • 2
    • 3

    在每天处理一次的批处理中,Incrementer的实现可能是按日期递增。

     
    1
     
    0
  • 相关阅读:
    js大文件上传(切片)
    前端大文件上传(切片)
    vue大文件上传(切片)
    网页大文件上传(切片)
    web大文件上传(切片)
    FCKEditor 实现ctrl+v粘贴图片并上传、word粘贴带图片
    umeditor 实现ctrl+v粘贴图片并上传、word粘贴带图片
    百度web编辑器 实现ctrl+v粘贴图片并上传、word粘贴带图片
    百度编辑器 实现ctrl+v粘贴图片并上传、word粘贴带图片
    百度ueditor 实现ctrl+v粘贴图片并上传、word粘贴带图片
  • 原文地址:https://www.cnblogs.com/developer-ios/p/5830064.html
Copyright © 2011-2022 走看看