zoukankan      html  css  js  c++  java
  • spring batch(二):核心部分(1):配置Spring batch

    chapter 3、Batch configuration

    1、spring batch 的命名空间

    spring xml中指定batch的前缀作为命名空间。

    示例:

    Xml代码  收藏代码
    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <beans xmlns="http://www.springframework.org/schema/beans"  
    3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    4.     xmlns:batch="http://www.springframework.org/schema/batch"  
    5.     xsi:schemaLocation="http://www.springframework.org/schema/beans  
    6.     http://www.springframework.org/schema/beans/spring-beans.xsd  
    7.     http://www.springframework.org/schema/batch  
    8.     http://www.springframework.org/schema/batch/spring-batch.xsd">  
    9.       
    10.     <batch:job id="importProductsJob">  
    11.     (...)  
    12.     </batch:job>  
    13. </beans>  

    当指定batch命名空间之后,bean中的声明都需要加上batch的前缀,例如:<batch:job id="importProductsJob">

    spring的命名空间的前缀可以指定任意的名称,这里采用batch作为前缀,为了方便理解。

    2、spring batch XML的主要标签有:job、step、tasklet、chunk、job-repository

    3、Job配置。job元素是整个配置的顶级元素,它的属性有:

    a、id

    b、restartable

    c、incrementer

    d、abstract

    e、parent

    f、job-repository

    restartable 属性如果是false,则程序不允许重启。如果发生重启,会抛出JobRestartException异常。

    Java代码  收藏代码
    1. <batch:job id="importProductsJob" restartable="false">  
    2.     (...)  
    3. </batch:job>  

     job除了这些属性外,还可以配置验证器<batch:validator ref="parameterValidator" />,用来校验工作参数(job parameters),可以实现JobParametersValidator接口。

    如果无法通过验证,会抛出JobParametersInvalidException异常。spring batch提供了一个默认的实现类DefaultJobParametersValidator,完成绝大部分的工作。如果还是无法满足需求,可以自己编码实现接口。

    实例:

    Java代码  收藏代码
    1. <batch:job id="importProductsJob">  
    2.     (...)  
    3.     <batch:validator ref="validator"/>  
    4. </batch:job>  
    5. <bean id="validator" class="org.springframework.batch.core.job.DefaultJobParametersValidator">  
    6.     <property name="requiredKeys">  
    7.         <set>  
    8.             <value>date</value>  
    9.         </set>  
    10.     </property>  
    11.     <property name="optionalKeys">  
    12.         <set>  
    13.             <value>productId</value>  
    14.         </set>  
    15.     </property>  
    16. </bean>  

    4、step步骤的配置。

    step的属性:

    a、next

    b、parent

    c、abstract

    示例:

    Java代码  收藏代码
    1. <job id="importProductsJob">  
    2.     <step id="decompress" next="readWrite">  
    3.         (...)  
    4.     </step>  
    5.     <step id="readWrite">  
    6.         (...)  
    7.     </step>  
    8. </job>  

    5、tasklet和chunk的配置。

    tasklet和chunk是用来指定处理过程的。

    一个tasklet对应于一个事务性的、潜在可重复的过程中发生的一个步骤。

    你可以自己实现Tasklet 接口,定义自己的tasklet。这个特性很有用,比如:用于解压文件,执行系统命令,执行存储过程等待。

    你也可以使用系统tasklet的属性有:

    a、ref指定应用的bean

    b、transaction-manager事物管理器

    c、start-limittasklet重试retry的次数

    d、allow-start-if-complete如果tasklet成功完成之后,是否可以进行重试retry操作。

    示例:

    Java代码  收藏代码
    1. <batch:job id="importProductsJob">  
    2.     (...)  
    3.     <batch:step id="readWriteStep">  
    4.         <batch:tasklet  
    5.             transaction-manager="transactionManager"  
    6.             start-limit="3"  
    7.             allow-start-if-complete="true">  
    8.             (...)  
    9.         </batch:tasklet>  
    10.     </batch:step>  
    11. </batch:job>  
    12. <bean id="transactionManager" class="(...)">  
    13.     (...)  
    14. </bean>  

    chunk ,ChunkOrientedTasklet 类实现类“块处理(chunk processing)”。

    配置tasklet很简单,但是配置“块处理”就会复杂一点,因为它涉及的内容更多。

    chunk的属性有:

    a、reader

    b、processor

    c、writer

    d、commit-interval事物提交一次处理的items的数量。也是chunk的大小。

    e、skip-limit跳跃的次数

    f、skip-policy跳跃的策略:要实现SkipPolicy接口

    g、retry-policy重试的策略:要实现RetryPolicy接口

    h、retry-limit最大的重试次数

    i、cache-capacity重试的缓存策略

    j、reader-transactional-queue从一个拥有事务的JMS的queue读取item数据

    k、processor-transactional处理器是否包含事务处理

    l、chunk-completion-policychunk的完成策略

    示例:

    Java代码  收藏代码
    1. <batch:job id="importProductsJob">  
    2.     (...)  
    3.     <batch:step id="readWrite">  
    4.         <batch:tasklet>  
    5.             <batch:chunk  
    6.                 reader="productItemReader"  
    7.                 processor="productItemProcessor"  
    8.                 writer="productItemWriter"  
    9.                 commit-interval="100"   
    10.                 skip-limit="20"  
    11.                 retry-limit="3"  
    12.                 cache-capacity="100"  
    13.                 chunk-completion-policy="timeoutCompletionPolicy"/>  
    14.         </batch:tasklet>  
    15.     </batch:step>  
    16. </batch:job>  
    17. <bean id="productItemReader" class="(...)">  
    18. (...)  
    19. </bean>  
    20. <bean id="productItemProcessor" class="(...)">  
    21. (...)  
    22. </bean>  
    23. <bean id="productItemWriter" class="(...)">  
    24. (...)  
    25. </bean>  
    26. <bean id="timeoutCompletionPolicy"  
    27.     class="org.springframework.batch.repeat.policy.TimeoutTerminationPolicy">  
    28.         <constructor-arg value="60"/>  
    29. </bean>  

    chunk还有几个子标签,包括:reader、processor、writer、skip-policy、retry-policy

    示例:

    Java代码  收藏代码
    1. <batch:job id="importProductsJob">  
    2.     (...)  
    3.     <batch:step id="readWrite">  
    4.         <batch:tasklet>  
    5.             <batch:chunk commit-interval="100">  
    6.                 <batch:reader>  
    7.                     <bean class="(...)">  
    8.                         (...)  
    9.                     </bean>  
    10.                 </batch:reader>  
    11.                 <batch:processor>  
    12.                     <bean class="(...)">  
    13.                         (...)  
    14.                     </bean>  
    15.                 </batch:processor>  
    16.                 <batch:writer>  
    17.                     <bean class="(...)">  
    18.                         (...)  
    19.                     </bean>  
    20.                 </batch:writer>  
    21.             </batch:chunk>  
    22.         </batch:tasklet>  
    23.     </batch:step>  
    24. </batch:job>  

    chunk的一些其他额外的子标签:retry-listeners、skippable-exception-classes、retryable-exception-classes、streams

    示例:

    Xml代码  收藏代码
    1. <batch:job id="importProductsJob">  
    2.     (...)  
    3.     <batch:step id="readWrite">  
    4.         <batch:tasklet>  
    5.             <batch:chunk commit-interval="100"  
    6.                 skip-limit="10">  
    7.                 <skippable-exception-classes>  
    8.                     <include class="org.springframework.batch.item.file.FlatFileParseException"/>  
    9.                     <exclude class="java.io.FileNotFoundException"/>  
    10.                 </skippable-exception-classes>  
    11.             </batch:chunk>  
    12.         </batch:tasklet>  
    13.     </batch:step>  
    14. </batch:job>  

    在chunk里面配置streams

    示例:

    Xml代码  收藏代码
    1. <batch:job id="importProductsJob">  
    2.     (...)  
    3.     <batch:step id="readWrite">  
    4.             <batch:tasklet>  
    5.                 <batch:chunk reader="productItemReader" writer="compositeWriter"/>  
    6.                 <streams>  
    7.                     <stream ref="productItemWriter1"/>  
    8.                     <stream ref="productItemWriter2"/>  
    9.                 </streams>  
    10.             </batch:tasklet>  
    11.     </batch:step>  
    12. </batch:job>  
    13.   
    14. <bean id="compositeWriter"  
    15.     class="org.springframework.batch.item.support.CompositeItemWriter">  
    16.     <property name="delegates">  
    17.         <list>  
    18.             <ref bean="productItemWriter1"/>  
    19.             <ref bean="productItemWriter2"/>  
    20.         </list>  
    21.     </property>  
    22. </bean>  

    使用一个复合的stream形式,如上例的itemWriter,内部是writer1和writer2是stream的,需要在chunk里面的streams元素里面注册。

    6、事务配置。

        事务是spring batch的一个重要主题。主要作用于batch处理程序的健壮性,chunk处理的合并来完成它的工作。因为事务调用的对象类型不同,你可以在不同的层次配置事务。

    示例:

    Xml代码  收藏代码
    1. <batch:job id="importProductsJob">  
    2.     (...)  
    3.     <batch:step id="readWrite">  
    4.         <batch:tasklet transaction-manager="transactionManager" (...)>  
    5.             (...)  
    6.         </batch:tasklet>  
    7.     </batch:step>  
    8. </batch:job>  

     事务,有几个定义的属性:行为,隔离 isolation,超时 timeout。

    spring batch提供transaction-attributes标签来描述这些事务的属性。

     示例:

    Xml代码  收藏代码
    1. <batch:tasklet>  
    2.     <batch:chunk reader="productItemReader"  
    3.         writer="productItemReader"  
    4.         commit-interval="100"/>  
    5.         <batch:transaction-attributes isolation="DEFAULT"  
    6.         propagation="REQUIRED"  
    7.         timeout="30"/>  
    8.     </batch:chunk>  
    9. </batch:tasklet>  

     isolation 定义数据库的隔离级别以及对外部事务的可见性。

    spring的事务处理是基于java的异常体系的。java里面的异常分为两类:检查型异常(extends Exception)和非检查型异常(extends RuntimeException)

    spring对检查型异常进行commit提交处理,对非检查型异常进行rollback回滚处理。

    spring batch运行你对不需要触发rollback回滚动作的异常进行定义。你可以在tasklet的no-rollback-exception-class元素中进行指定。

     示例:

    Xml代码  收藏代码
    1. <batch:tasklet>  
    2.     (...)  
    3.     <batch:no-rollback-exception-classes>  
    4.         <batch:include  
    5.             class="org.springframework.batch.item.validator.ValidationException"/>  
    6.     </batch:no-rollback-exception-classes>  
    7. </batch:tasklet>  

     对于JMS,spring batch提供chunk的reader-transactional-queue 属性,提供事务处理。

    7、配置job仓库

    job仓库(job repository)是spring batch底层基础的一个关键特征,它为spring batch的运行提供信息。

    job仓库必须实现JobRepository接口,spring batch只提供了一个具体的实现类:SimpleJobRepository。

    spring batch为DAO提供了两种实现:

        a、内存无持久化

        b、jdbc元数据的持久化

    第一种“内存无持久化”的方式可以用来spring batch的测试和开发,但是不能应用于生产环境。因为内存中的东西会丢失。

    设定job仓库的参数

        (1)、配置内存模式的job仓库:Configuring an in-memory job repository

    示例:

    Xml代码  收藏代码
    1. <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">  
    2.     <property name="transactionManager-ref" ref="transactionManager"/>  
    3. </bean>  
    4.   
    5. <bean id="transactionManager"    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>  
    6.   
    7. <batch:job id="importInvoicesJob"    job-repository="jobRepository">  
    8.     (...)  
    9. </batch:job>  

     由于job仓库是内存模式的,所以事务管理器采用ResourcelessTransactionManager。 这个类是NOOP (NO OPeration)无操作的PlatformTransactionManager接口实现。

         (2)、持久化的job仓库:Configuring a persistent job repository

    关系型数据库的job仓库的属性如下:data-source,transaction-manager,isolation-level-for-create,max-varchar-length,table-prefix,lob-handler

    示例:

    Xml代码  收藏代码
    1. <bean id="dataSource"  
    2.     class="org.apache.commons.dbcp.BasicDataSource"  
    3.     destroy-method="close">  
    4.     <property name="driverClassName" value="${batch.jdbc.driver}" />  
    5.     <property name="url" value="${batch.jdbc.url}" />  
    6.     <property name="username" value="${batch.jdbc.user}" />  
    7.     <property name="password" value="${batch.jdbc.password}" />  
    8. </bean>  
    9.   
    10. <bean id="transactionManager" lazy-init="true"  
    11.     class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
    12.     <property name="dataSource" ref="dataSource" />  
    13. </bean>  
    14.   
    15. <batch:job-repository id="jobRepository"  
    16.     data-source="dataSource"  
    17.     transaction-manager="transactionManager"  
    18.     isolation-level-for-create="SERIALIZABLE"  
    19.     table-prefix="BATCH_"  
    20. />  
    21.   
    22. <batch:job id="importInvoicesJob" job-repository="jobRepository">  
    23.     (...)  
    24. </batch:job>  

     [问题:如果在不同的物理节点上面运行同样的job会发生什么呢?]

    前提:使用同一份spring batch的元数据,即采用同一份数据库上面的表结构。

    当创建job实例和执行信息的元数据的时候,job仓库扮演了一个集中维护的工作库的角色。当job并发运行的时候,spring batch 的job仓库,可以防止创建相同的工作情况。这个功能依赖于底层数据库的事务能力,完成job的同步任务。我们设置job仓库的属性isolation-level-for-create="SERIALIZABLE",来避免发生job的并发问题。由于有这种防护措施,你可以把你的spring batch的job分布到各个不同的物理节点,保证你的job实例不会被创建两次。

         job仓库是spring batch底层结构中的重要部分,它记录了batch处理的信息,并且跟踪job运行的成功与失败。

     8、spring batch配置的高级主题。

    (1)、使用step作用域。当使用SpEL的时候,step作用域很有用,来实现属性的晚绑定。

    (2)、Spring表达式语言:Spring Expression Language (SpEL) 

    Spring3.× 开始提供。

     step的作用的实例范围包括:jobParameters、jobExecutionContext、stepExecutionContext

     示例:

    Xml代码  收藏代码
    1. <bean id="decompressTasklet"  
    2.             class="com.manning.sbia.ch01.batch.DecompressTasklet"  
    3.             scope="step">  
    4.     <property name="inputResource"  
    5.     value="#{jobParameters['inputResource']}" />  
    6.     <property name="targetDirectory"  
    7.     value="#{jobParameters['targetDirectory']}" />  
    8.     <property name="targetFile"  
    9.     value="#{jobParameters['targetFile']}" />  
    10. </bean>  

     SpEL由 #{ 和 } 组成

     在jobExecutionContext 和 stepExecutionContext也可以应用SpEL表达式。

     (3)、使用Linteners提供的更多的处理。

    Spring batch在job和step级别上面提供listener。

    Spring batch提供的listener的类型:

    a、Job listener:在job级别监听处理过程

    b、Step listeners:在step级别监听处理过程

    c、Item listeners:监听item的retry重试和repeat重做动作

    一、Job listener 可以监听job的运行,在job运行前和后添加动作。可以利用 listener标签,在job标签下面作为子元素进行添加。

    示例1:

    Xml代码  收藏代码
    1. <batch:job id="importProductsJob">  
    2.     <batch:listeners>  
    3.     <batch:listener ref="importProductsJobListener"/>  
    4.     </batch:listeners>  
    5. </batch:job>  

     importProductsJobListener不管job运行成功还是失败,它都会在job运行的开始和结束的时候,接收job的通知,进行监听。

    还可以通过普通的POJO的java对象来做监听器,只需要进行简单的配置即可。

    示例2:

    Xml代码  收藏代码
    1. <batch:listeners>  
    2.     <batch:listener ref="importProductsJobListener" after-job-method="afterJob" before-job-method="beforeJob"/>  
    3. </batch:listeners>  

     可以在listener里面的ref指定引用的POJO的bean,通过after-job-method="afterJob" before-job-method="beforeJob" 来指定job之前和之后的执行方法。不过,被指定的这两个方法的参数都需要是:JobExecution jobExecution。 这个2个方法的返回值都是void

    还有一种方法是利用“注释”来配置listener,spring batch会自己发现并运行该类。

    二、Step listener

        Step有一系列的listener来跟踪step的处理过程。这里所有的listener接口都继承了StepListener接口。

    Spring batch提供的Step的listener有:

    a、ChunkListener:在chunk执行的之前和之后调用。

    b、ItemProcessListener:在 ItemProcessor得到一个item之前和之后调用,在ItemProcessor抛出一个异常的时候调用。

    c、ItemReadListener:在读取item之前和读取item之后调用,或者在读取item的过程中触发异常的时候调用。

    d、ItemWriteListener:在一个item输出之前和之后调用,或者在item输出的过程中调用。

    e、SkipListener:当读取、处理和输出的过程中产生了skip跳跃一个item的时候调用。

    f、StepExecutionListener:在step运行之前和之后调用。

     接口代码:

    Java代码  收藏代码
    1. public interface StepExecutionListener extends StepListener {  
    2.     void beforeStep(StepExecution stepExecution);  
    3.     ExitStatus afterStep(StepExecution stepExecution);  
    4. }  
    5.   
    6. public interface ChunkListener extends StepListener {  
    7.     void beforeChunk();  
    8.     void afterChunk();  
    9. }  
    10.   
    11.   
    12. public interface ItemProcessListener<T, S> extends StepListener {  
    13.     void beforeProcess(T item);  
    14.     void afterProcess(T item, S result);  
    15.     void onProcessError(T item, Exception e);  
    16. }  
    17.   
    18. public interface ItemReadListener<T> extends StepListener {  
    19.     void beforeRead();  
    20.     void afterRead(T item);  
    21.     void onReadError(Exception ex);  
    22. }  
    23.   
    24. public interface ItemWriteListener<S> extends StepListener {  
    25.     void beforeWrite(List<? extends S> items);  
    26.     void afterWrite(List<? extends S> items);  
    27.     void onWriteError(Exception exception, List<? extends S> items);  
    28. }  
    29.   
    30. public interface SkipListener<T,S> extends StepListener {  
    31.     void onSkipInRead(Throwable t);  
    32.     void onSkipInProcess(T item, Throwable t);  
    33.     void onSkipInWrite(S item, Throwable t);  
    34. }  

     Step listener 作为tasklet标签的一个子标签进行配置。

     上面这些所有的Step listener都可以作为tasklet标签的子标签以相同的方式和等级进行配置。

    示例:

    Xml代码  收藏代码
    1. <bean id="importProductsJobListener"  
    2.     class="test.case01.java.batch.listener.job.ImportProductsJobListener" />  
    3.       
    4. <bean id="productStepExecutionListener"  
    5.     class="test.case01.java.batch.listener.step.ProductStepExecutionListener" />  
    6.   
    7. <bean id="productChunkListener"  
    8.     class="test.case01.java.batch.listener.step.chunk.ProductChunkListener" />  
    9.       
    10. <bean id="productItemProcessListener"  
    11.     class="test.case01.java.batch.listener.step.chunk.item.ProductItemProcessListener" />  
    12.       
    13. <batch:job id="importProducts" restartable="false">  
    14.     <batch:step id="readWriteProducts">  
    15.         <batch:tasklet>  
    16.             <batch:chunk reader="reader" writer="writer" processor="processor"  
    17.                 commit-interval="100" skip-limit="5">  
    18.                 <batch:skippable-exception-classes>  
    19.                     <batch:include  
    20.                         class="org.springframework.batch.item.file.FlatFileParseException" />  
    21.                 </batch:skippable-exception-classes>  
    22.             </batch:chunk>  
    23.             <batch:listeners>  
    24.                 <!-- here configure three kinds listeners for StepExecutionListener ,  ChunkListener , ItemProcessListener  for example.-->  
    25.                 <batch:listener ref="productStepExecutionListener" />  
    26.                 <batch:listener ref="productChunkListener" />  
    27.                 <batch:listener ref="productItemProcessListener" />  
    28.             </batch:listeners>  
    29.         </batch:tasklet>  
    30.     </batch:step>  
    31.     <batch:validator ref="parameterValidator" />  
    32.     <batch:listeners>  
    33.         <batch:listener ref="importProductsJobListener" />  
    34.     </batch:listeners>  
    35. </batch:job>  

     三、retry重试和repeat重做的listener

    repeat listener拥有的方法名称:before、after、close(在一个item最后一次repeat重做之后调用,不管repeat成功与否)、onError、open

    retry listener拥有的方法名称:close(在一个item上面最后一次尝试retry之后调用,不管retry成功与否)、onError、open

    接口代码:

    Java代码  收藏代码
    1. public interface RepeatListener {  
    2.     void before(RepeatContext context);  
    3.     void after(RepeatContext context, RepeatStatus result);  
    4.     void open(RepeatContext context);  
    5.     void onError(RepeatContext context, Throwable e);  
    6.     void close(RepeatContext context);  
    7. }  
    8.   
    9. public interface RetryListener {  
    10.     <T> void open(RetryContext context, RetryCallback<T> callback);  
    11.     <T> void onError(RetryContext context,  
    12.     RetryCallback<T> callback, Throwable e);  
    13.     <T> void close(RetryContext context,  
    14.     RetryCallback<T> callback, Throwable e);  
    15. }  

     这2个接口,和上面的step listener的配置位置和方式一致,都是tasklet标签的子标签位置

    四、配置继承关系

    spring的XML提供配置的继承。使用abstract和parent两个属性。从一个parent的bean继承,表示该bean可以利用parent bean中的所有的属性,并且可以覆盖这些属性。

    示例:

    Xml代码  收藏代码
    1. <bean id="parentBean" abstract="true">  
    2.     <property name="propertyOne" value="(...)"/>  
    3. </bean>  
    4.   
    5. <bean id="childBean" parent="parentBean">  
    6.     <property name="propertyOne" value="(...)"/>  
    7.     <property name="propertyTwo" value="(...)"/>  
    8. </bean>  

     这种继承关系是由spring提供的,在spring batch里面也可以使用。

    listeners标签,提供merge属性,可以用来合并parent和自身的listener

     示例:

    Xml代码  收藏代码
    1. <job id="parentJob" abstract="true">  
    2.     <listeners>  
    3.         <listener ref="globalListener"/>  
    4.     <listeners>  
    5. </job>  
    6.   
    7. <job id="importProductsJob" parent="parentJob">  
    8.     (...)  
    9.     <listeners merge="true">  
    10.         <listener ref="specificListener"/>  
    11.     <listeners>  
    12. </job>  

     spring XML的这种继承关系,使得spring batch的XML配置更简单。

  • 相关阅读:
    cache buffers chains ,buffer busy waits
    关于I/O的一些脚本
    模拟buffer busy waits等待事件
    找出热点块所属的用户,对象名,类型
    找到引起磁盘排序的SQL
    db file parallel write,write complete waits
    free buffer waits
    检查日志文件是否传输到备用数据库
    模拟direct path read 等待事件
    3系统启动后的 wifi 加载过程
  • 原文地址:https://www.cnblogs.com/developer-ios/p/5829966.html
Copyright © 2011-2022 走看看