这里我们对上篇博客的例子做一个修改性的测试来学习一下springbatch的一些关于chunk的一些有用的特性。我渐渐能意会到,深刻并不等于接近事实。
springbatch的学习
一、chunk的skip-limit属性的使用
关于这个属性的介绍:Maximum number of skips during processing of the step. If processing reaches the skip limit, the next exception thrown on item processing (read, process, or write) causes the step to fail.
我们修改batch.xml里面的关于readWriter里面的设置属性。如下:
<!-- old --> <step id="readWriter" next="clean"> <tasklet> <chunk reader="reader" writer="writer" commit-interval="100" processor="processor"> </chunk> </tasklet> </step> <!-- new --> <step id="readWriter" next="clean"> <tasklet> <chunk reader="reader" writer="writer" commit-interval="100" skip-limit="2" processor="processor"> <skippable-exception-classes> <include class="org.springframework.batch.item.file.FlatFileParseException"/> </skippable-exception-classes> </chunk> </tasklet> </step>
FlatFileParseException:Exception thrown when errors are encountered parsing flat files.修改的解压文件的内容,让它有一条数据是错误的。如下:这个日期肯定是错误的,当然这里是为了测试skip-limit属性才做如此的方法处理。其实面对这样的数据,可以放在process里面进行过滤处理的。
运行后的结果,数据库的数据如下:
可以看到上述的那条错误数据没有插入到表中,但是正常的数据已经插入到数据库中。如果增加解压文件的错误条数。比如3条的时候。控制台会报错:org.springframework.batch.core.step.skip.SkipLimitExceededException: Skip limit of '2' exceeded。数据库表的数据也没有成功的插入。
二、chunk的skip-policy属性的使用
如果在意异常数量的话,用上述的skip-limit比较方便和简单。如果不在意异常数量的话,我们可以自己定义忽略的策略,也就是这段要学习的部分。修改batch.xml里面的关于readWriter里面的设置属性如下:
<step id="readWriter" next="clean"> <tasklet> <chunk reader="reader" writer="writer" commit-interval="100" skip-policy="skipPolicy" processor="processor"/> </tasklet> </step> <step id="clean"> <tasklet ref="cleanTasklet"/> </step>
在job.xml中声明定义skipPolicy,内容如下
<bean id="skipPolicy" class="spring.batch.readFile.ExceptionSkipPolicy"> <constructor-arg value="org.springframework.batch.item.file.FlatFileParseException"/> </bean>
ExceptionSkipPolicy是我们自定义的异常策略实现类
package spring.batch.readFile; import org.springframework.batch.core.step.skip.SkipLimitExceededException; import org.springframework.batch.core.step.skip.SkipPolicy; /** * @Author: huhx * @Date: 2017-11-01 下午 4:58 */ public class ExceptionSkipPolicy implements SkipPolicy { private Class<? extends Exception> exceptionClassToSkip; public ExceptionSkipPolicy(Class<? extends Exception> exceptionClassToSkip) { super(); this.exceptionClassToSkip = exceptionClassToSkip; } @Override public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException { return exceptionClassToSkip.isAssignableFrom(t.getClass()); } }
修改解压文件的内容,其实就是上述的错误3条数据的内容。如下
运行之后的数据库数据如下:
三、SkipListener监听skip的数据
我们基于上述做的修改,现在的job节点xml配置如下:
<job id="readFlatFileJob"> <step id="decompress" next="readWriter"> <tasklet ref="decompressTasklet"/> </step> <step id="readWriter" next="clean"> <tasklet> <chunk reader="reader" writer="writer" commit-interval="100" skip-policy="skipPolicy" processor="processor"/> <listeners> <listener ref="skipListener"/> </listeners> </tasklet> </step> <step id="clean"> <tasklet ref="cleanTasklet"/> </step> </job>
job.xml中配置skipListener
<bean id="skipListener" class="spring.batch.readFile.FileSkipListener"/>
FileSkipListener的代码如下:
package spring.batch.readFile; import org.apache.commons.io.FileUtils; import org.springframework.batch.core.annotation.OnSkipInProcess; import org.springframework.batch.core.annotation.OnSkipInRead; import org.springframework.batch.core.annotation.OnSkipInWrite; import org.springframework.batch.item.file.FlatFileParseException; import java.io.File; import java.io.IOException; /** * @Author: huhx * @Date: 2017-11-01 下午 5:32 */ public class FileSkipListener { private File file = new File("file/log.txt"); @OnSkipInRead public void readLog(Throwable t) throws IOException { if (t instanceof FlatFileParseException) { FlatFileParseException ffpe = (FlatFileParseException) t; String dataLog = "from read " + ffpe.getInput() + ", line number = " + ffpe.getLineNumber() + " "; FileUtils.write(file, dataLog, true); } } @OnSkipInProcess public void processLog(People people, Throwable t) throws IOException { if (t instanceof FlatFileParseException) { FlatFileParseException ffpe = (FlatFileParseException) t; String dataLog = "from process " + ffpe.getInput() + ", line number = " + ffpe.getLineNumber() + " "; String peopleInfo = people.getUsername() + ", address " + people.getBirthday() + " "; FileUtils.write(file, dataLog + peopleInfo, true); } } @OnSkipInWrite public void writeLog(People people, Throwable t) throws IOException { if (t instanceof FlatFileParseException) { FlatFileParseException ffpe = (FlatFileParseException) t; String dataLog = "from write " + ffpe.getInput() + ", line number = " + ffpe.getLineNumber() + " "; String peopleInfo = people.getUsername() + ", address " + people.getBirthday() + " "; FileUtils.write(file, dataLog + peopleInfo, true); } } }
对于上述错误的几条记录,我们记日志在log.txt里面。现在log.txt的内容如下:
from read 李元芳|32|黄冈|1985-12-99, line number = 2 from read 王昭君|百里|武汉|1995-10-89, line number = 3 from read 狄仁杰|21|天津|1958-12-99, line number = 4
springbatch提供的SkipListener接口去监听skip的数据项。
public interface SkipListener<T,S> extends StepListener { void onSkipInRead(Throwable t); void onSkipInProcess(T item, Throwable t); void onSkipInWrite(S item, Throwable t); }
当然比较方便的一种做法,就是springbatch提供的上述的@OnSkipInRead,@OnSkipInProcess and @OnSkipInWrite注解方式。
四、类似于上述的skip策略,springbatch还支持retry(重试)的功能
定义重试的方式有二种,和skip的类似。这里我们列举如下:
- 默认retry策略的可以定义重试次数的方式:
<tasklet> <chunk reader="reader" writer="writer" commit-interval="100" retry-limit="3"> <retryable-exception-classes> <include class="org.springframework.daoOptimisticLockingFailureException" /> </retryable-exception-classes> </chunk> </tasklet>
- 自定义重试策略的方式:
<tasklet> <chunk reader="reader" writer="writer" commit-interval="100" retry-policy="retryPolicy" /> </tasklet>
retryPolicy的定义如下:
<bean id="retryPolicy" class="org.springframework.retry.policy.ExceptionClassifierRetryPolicy"> <property name="policyMap"> <map> <entry key="org.springframework.dao.ConcurrencyFailureException"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="3"/> </bean> </entry> <entry key="org.springframework.dao.DeadlockLoserDataAccessException"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="5"/> </bean> </entry> </map> </property> </bean>
当然,retry也有类似于skip的SkipListener。操作及用法如下
package spring.batch.readFile;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; import org.springframework.retry.listener.RetryListenerSupport; /** * @Author: huhx * @Date: 2017-11-01 下午 7:00 */ public class Slf4jRetryListener extends RetryListenerSupport { private static final Logger LOG = LoggerFactory.getLogger(Slf4jRetryListener.class); @Override public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { LOG.error("retried operation",throwable); } }