一、需求分析
使用Spring Batch对DB进行读写操作: 从一个表中读取数据, 然后批量的插入另外一张表中.
二、代码实现
1. 代码结构图:
2. applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd"> <!-- 配置spring扫描范围 --> <context:component-scan base-package="com.zdp" /> <!-- 配置数据源 --> <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" abstract="false" scope="singleton"> <property name="driverClass" value="org.gjt.mm.mysql.Driver" /> <property name="jdbcUrl" value="jdbc:mysql://localhost:3306/test?
useUnicode=true&characterEncoding=UTF-8" /> <property name="user" value="root" /> <property name="password" value="root" /> <property name="checkoutTimeout" value="30000" /> <property name="maxIdleTime" value="120" /> <property name="maxPoolSize" value="100" /> <property name="minPoolSize" value="2" /> <property name="initialPoolSize" value="2" /> <property name="maxStatements" value="0" /> <property name="maxStatementsPerConnection" value="0" /> <property name="idleConnectionTestPeriod" value="30" /> </bean> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="dataSource" /> </bean> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> <tx:annotation-driven transaction-manager="transactionManager" /> </beans>
base-package: 扫描spring注解
jobLauncher: 启动Job
jobRepository: 为Job提供持久化操作
transactionManager: 提供事务管理操作
3. springBatch.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd"> <!-- 引入spring核心配置文件 --> <import resource="applicationContext.xml"/> <batch:job id="ledgerJob"> <!-- 监听job运行状态 --> <batch:listeners> <batch:listener ref="appJobExecutionListener" /> </batch:listeners> <batch:step id="step"> <!-- 加入事务控制 --> <batch:tasklet transaction-manager="transactionManager"> <batch:listeners> <batch:listener ref="itemFailureLoggerListener" /> </batch:listeners> <!-- commit-interval: 批量提交的条数; skip-limit: 指同意跳过记录数 --> <batch:chunk reader="ledgerReader" writer="ledgerWriter" commit-interval="1000" skip-limit="1000"> <batch:skippable-exception-classes> <batch:include class="java.lang.Exception"/> <!-- 出现exception或其子类, Job仍然会往后运行 --> <batch:exclude class="java.io.FileNotFoundException"/> <!-- 出现这个异常, Job会立马停止 --> </batch:skippable-exception-classes> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <!-- 从ledger表读取数据 --> <bean id="ledgerReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> <property name="dataSource" ref="dataSource" /> <property name="sql" value="select * from ledger" /> <property name="rowMapper" ref="ledgerRowMapper" /> </bean> <bean id="jobParameterBulider" class="org.springframework.batch.core.JobParametersBuilder" /> <!-- 定时任务開始 --> <bean id="ledgerJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"> <property name="targetObject"> <!-- 定时运行类 --> <ref bean="quartzLedgerJob" /> </property> <property name="targetMethod"> <!-- 定时运行类的方法 --> <value>execute</value> </property> </bean> <bean id="ledgerCronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean" > <property name="jobDetail" > <ref bean="ledgerJobDetail" /> </property> <property name="cronExpression" > <!-- 每天晚上22:30运行 --> <value>0 30 22 ?* *</value> </property> </bean> <!-- 触发器工厂。将全部的定时任务都注入工厂--> <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <!-- 加入触发器 --> <property name="triggers"> <list> <!-- 将上面定义的測试定时任务注入(能够定义多个定时任务。同一时候注入)--> <ref local="ledgerCronTrigger" /> </list> </property> </bean> </beans>
4. AppJobExecutionListener.java
/**
* 监听job执行状态
*/
@Component("appJobExecutionListener")
public class AppJobExecutionListener implements JobExecutionListener {
private final static Logger logger = Logger.getLogger(AppJobExecutionListener.class);
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
logger.info("Job completed: " + jobExecution.getJobId());
} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
logger.info("Job failed: " + jobExecution.getJobId());
}
}
public void beforeJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
logger.info("Job completed: " + jobExecution.getJobId());
} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
logger.info("Job failed: " + jobExecution.getJobId());
}
}
}
/**
* 检查是读出错还是写出错
*/
@Component("itemFailureLoggerListener")
public class ItemFailureLoggerListener extends ItemListenerSupport<Object, Object> {
private final static Logger LOG = Logger.getLogger(ItemFailureLoggerListener.class);
public void onReadError(Exception ex) {
LOG.error("Encountered error on read", ex);
}
public void onWriteError(Exception ex, Object item) {
LOG.error("Encountered error on write", ex);
}
}6. Ledger.java
public class Ledger implements Serializable {
private static final long serialVersionUID = 1L;
private int id;
private Date receiptDate;
private String memberName;
private String checkNumber;
private Date checkDate;
private String paymentType;
private double depositAmount;
private double paymentAmount;
private String comments;
// getter and setter
}7. LedgerRowMapper.java
/**
* ledger行的映射类
*/
@SuppressWarnings("rawtypes")
@Component("ledgerRowMapper")
public class LedgerRowMapper implements RowMapper {
public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
Ledger ledger = new Ledger();
ledger.setId(rs.getInt("ID"));
ledger.setReceiptDate(rs.getDate("RECEIPT_DATE"));
ledger.setMemberName(rs.getString("MEMBER_NAME"));
ledger.setCheckNumber(rs.getString("MEMBER_NAME"));
ledger.setCheckDate(rs.getDate("CHECK_DATE"));
ledger.setPaymentType(rs.getString("PAYMENT_TYPE"));
ledger.setDepositAmount(rs.getDouble("DEPOSIT_AMOUNT"));
ledger.setPaymentAmount(rs.getDouble("PAYMENT_AMOUNT"));
ledger.setComments(rs.getString("COMMENTS"));
return ledger;
}
}8. LedgerDao.java
public interface LedgerDao {
public void save(final Ledger item) ;
}9. LedgerDaoImpl.java
/**
* ledger数据操作类
*/
@Repository
public class LedgerDaoImpl implements LedgerDao {
private static final String SAVE_SQL = "INSERT INTO LEDGER_TEMP (RECEIPT_DATE, MEMBER_NAME, CHECK_NUMBER, CHECK_DATE, PAYMENT_TYPE, DEPOSIT_AMOUNT, PAYMENT_AMOUNT, COMMENTS) VALUES(?,?,?
,?,?
,?,?
,?)";
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public void save(final Ledger item) {
jdbcTemplate.update(SAVE_SQL, new PreparedStatementSetter() {
public void setValues(PreparedStatement stmt) throws SQLException {
stmt.setDate(1, new java.sql.Date(item.getReceiptDate().getTime()));
stmt.setString(2, item.getMemberName());
stmt.setString(3, item.getCheckNumber());
stmt.setDate(4, new java.sql.Date(item.getCheckDate().getTime()));
stmt.setString(5, item.getPaymentType());
stmt.setDouble(6, item.getDepositAmount());
stmt.setDouble(7, item.getPaymentAmount());
stmt.setString(8, item.getComments());
}
});
}
}
10. LedgerWriter.java
/**
* ledger写入数据
*/
@Component("ledgerWriter")
public class LedgerWriter implements ItemWriter<Ledger> {
@Autowired
private LedgerDao ledgerDao;
/**
* 写入数据
* @param ledgers
*/
public void write(List<? extends Ledger> ledgers) throws Exception {
for (Ledger ledger : ledgers) {
ledgerDao.save(ledger);
}
}
}11. QuartzLedgerJob.java
/**
* 定时调度类
*/
@Component("quartzLedgerJob")
public class QuartzLedgerJob {
private static final Logger LOG = LoggerFactory.getLogger(QuartzLedgerJob.class);
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job ledgerJob;
@Autowired
JobParametersBuilder jobParameterBulider;
private static long counter = 0l;
/**
* 运行业务方法
* @throws Exception
*/
public void execute() throws Exception {
/**
* Spring Batch Job同一个job instance,成功运行后是不同意又一次运行的,
* 失败后是否同意重跑,可通过配置Job的restartable參数来控制,默认是true,假设须要又一次运行。能够变通处理,
* 加入一个JobParameters构建类,以当前时间作为參数,保证其它參数同样的情况下是不同的job instance
*/
LOG.debug("start...");
StopWatch stopWatch = new StopWatch();
stopWatch.start();
jobParameterBulider.addDate("date", new Date());
jobLauncher.run(ledgerJob, jobParameterBulider.toJobParameters());
stopWatch.stop();
LOG.debug("Time elapsed:{},Execute quartz ledgerJob:{}", stopWatch.prettyPrint(), ++counter);
}
}
12. StartQuartz.java
/**
* 启动定时调度
* 需求描写叙述: 定时从表ledger读取数据, 然后批量写入表ledger_temp
*/
public class StartQuartz {
public static void main(String[] args) throws FileNotFoundException {
new ClassPathXmlApplicationContext("/com/zdp/resources/springBatch.xml");
}
}13. sql:
create table ledger( ID int(10) not null AUTO_INCREMENT PRIMARY KEY, RECEIPT_DATE date, MEMBER_NAME varchar(10) , CHECK_NUMBER varchar(10) , CHECK_DATE date, PAYMENT_TYPE varchar(10) , DEPOSIT_AMOUNT double(10,3), PAYMENT_AMOUNT double(10,3), COMMENTS varchar(100) ); create table ledger_temp( ID int(10) not null AUTO_INCREMENT PRIMARY KEY, RECEIPT_DATE date, MEMBER_NAME varchar(10) , CHECK_NUMBER varchar(10) , CHECK_DATE date, PAYMENT_TYPE varchar(10) , DEPOSIT_AMOUNT double(10,3), PAYMENT_AMOUNT double(10,3), COMMENTS varchar(100) );