zoukankan      html  css  js  c++  java
  • springbatch操作DB

    一、需求分析

    使用Spring Batch对DB进行读写操作: 从一个表中读取数据, 然后批量的插入另外一张表中.


    二、代码实现

    1. 代码结构图:


    2. applicationContext.xml

    <?

    xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd"> <!-- 配置spring扫描范围 --> <context:component-scan base-package="com.zdp" /> <!-- 配置数据源 --> <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" abstract="false" scope="singleton"> <property name="driverClass" value="org.gjt.mm.mysql.Driver" /> <property name="jdbcUrl" value="jdbc:mysql://localhost:3306/test?

    useUnicode=true&characterEncoding=UTF-8" /> <property name="user" value="root" /> <property name="password" value="root" /> <property name="checkoutTimeout" value="30000" /> <property name="maxIdleTime" value="120" /> <property name="maxPoolSize" value="100" /> <property name="minPoolSize" value="2" /> <property name="initialPoolSize" value="2" /> <property name="maxStatements" value="0" /> <property name="maxStatementsPerConnection" value="0" /> <property name="idleConnectionTestPeriod" value="30" /> </bean> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="dataSource" /> </bean> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> <tx:annotation-driven transaction-manager="transactionManager" /> </beans>

    base-package: 扫描spring注解

    jobLauncher: 启动Job

    jobRepository: 为Job提供持久化操作

    transactionManager: 提供事务管理操作


    3. springBatch.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:batch="http://www.springframework.org/schema/batch"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    	xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:tx="http://www.springframework.org/schema/tx"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans	
    	http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    	http://www.springframework.org/schema/batch 
    	http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
    	http://www.springframework.org/schema/context 
    	http://www.springframework.org/schema/context/spring-context-3.1.xsd
    	http://www.springframework.org/schema/tx 
    	http://www.springframework.org/schema/tx/spring-tx-3.1.xsd">
    	
    	<!-- 引入spring核心配置文件 -->
    	<import resource="applicationContext.xml"/>
    
    	<batch:job id="ledgerJob">
    		<!-- 监听job运行状态 -->
    		<batch:listeners>
    			<batch:listener ref="appJobExecutionListener" />
    		</batch:listeners>
    		<batch:step id="step">
    			<!-- 加入事务控制 -->
    			<batch:tasklet transaction-manager="transactionManager">
    				<batch:listeners>
    					<batch:listener ref="itemFailureLoggerListener" />
    				</batch:listeners>
    				<!-- commit-interval: 批量提交的条数; skip-limit: 指同意跳过记录数 -->
    				<batch:chunk reader="ledgerReader" writer="ledgerWriter" commit-interval="1000" skip-limit="1000">
    					<batch:skippable-exception-classes>
    						<batch:include class="java.lang.Exception"/> <!-- 出现exception或其子类, Job仍然会往后运行 -->
    						<batch:exclude class="java.io.FileNotFoundException"/> <!-- 出现这个异常, Job会立马停止 -->
    					</batch:skippable-exception-classes>
    				</batch:chunk> 
    			</batch:tasklet>
    		</batch:step>
    	</batch:job>
    	 
    	<!-- 从ledger表读取数据 -->
    	<bean id="ledgerReader" class="org.springframework.batch.item.database.JdbcCursorItemReader">
    		<property name="dataSource" ref="dataSource" />
    		<property name="sql" value="select * from ledger" /> 
    		<property name="rowMapper" ref="ledgerRowMapper" />
    	</bean>
    	 
    	<bean id="jobParameterBulider" class="org.springframework.batch.core.JobParametersBuilder" />
    	
    	<!-- 定时任务開始 -->  
    	<bean id="ledgerJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">  
    		<property name="targetObject">  
    			<!-- 定时运行类 -->  
    			<ref bean="quartzLedgerJob" />  
    		</property>  
    		<property name="targetMethod">  
    			<!-- 定时运行类的方法 -->  
    			<value>execute</value>  
    		</property>  
    	</bean>  
      
    	<bean id="ledgerCronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean" >   
    		<property name="jobDetail" >  
    			<ref bean="ledgerJobDetail" />  
    		</property>  
    		<property name="cronExpression" > 
    			<!-- 每天晚上22:30运行 --> 
    			<value>0 30 22 ?

    * *</value> </property> </bean> <!-- 触发器工厂。将全部的定时任务都注入工厂--> <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <!-- 加入触发器 --> <property name="triggers"> <list> <!-- 将上面定义的測试定时任务注入(能够定义多个定时任务。同一时候注入)--> <ref local="ledgerCronTrigger" /> </list> </property> </bean> </beans>


    4. AppJobExecutionListener.java

    /**
     * 监听job执行状态
     */
    @Component("appJobExecutionListener")
    public class AppJobExecutionListener implements JobExecutionListener {
    	private final static Logger logger = Logger.getLogger(AppJobExecutionListener.class);
    
    	public void afterJob(JobExecution jobExecution) {
    		if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
    			logger.info("Job completed: " + jobExecution.getJobId());
    		} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
    			logger.info("Job failed: " + jobExecution.getJobId());
    		}
    	}
    
    	public void beforeJob(JobExecution jobExecution) {
    		if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
    			logger.info("Job completed: " + jobExecution.getJobId());
    		} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
    			logger.info("Job failed: " + jobExecution.getJobId());
    		}
    	}
    }


    5. ItemFailureLoggerListener.java

    /**
     * 检查是读出错还是写出错
     */
    @Component("itemFailureLoggerListener")
    public class ItemFailureLoggerListener extends ItemListenerSupport<Object, Object> {
    	private final static Logger LOG = Logger.getLogger(ItemFailureLoggerListener.class);
    
    	public void onReadError(Exception ex) {
    		LOG.error("Encountered error on read", ex);
    	}
    
    	public void onWriteError(Exception ex, Object item) {
    		LOG.error("Encountered error on write", ex);
    	}
    
    }

    6. Ledger.java

    public class Ledger implements Serializable {
    	private static final long serialVersionUID = 1L;
    	private int id;
    	private Date receiptDate;
    	private String memberName;
    	private String checkNumber;
    	private Date checkDate;
    	private String paymentType;
    	private double depositAmount;
    	private double paymentAmount;
    	private String comments;
    
    	// getter and setter
    }

    7. LedgerRowMapper.java

    /**
     * ledger行的映射类
     */
    @SuppressWarnings("rawtypes")
    @Component("ledgerRowMapper")
    public class LedgerRowMapper implements RowMapper {
    	public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
    		Ledger ledger = new Ledger();
    		ledger.setId(rs.getInt("ID"));
    		ledger.setReceiptDate(rs.getDate("RECEIPT_DATE"));
    		ledger.setMemberName(rs.getString("MEMBER_NAME"));
    		ledger.setCheckNumber(rs.getString("MEMBER_NAME"));
    		ledger.setCheckDate(rs.getDate("CHECK_DATE"));
    		ledger.setPaymentType(rs.getString("PAYMENT_TYPE"));
    		ledger.setDepositAmount(rs.getDouble("DEPOSIT_AMOUNT"));
    		ledger.setPaymentAmount(rs.getDouble("PAYMENT_AMOUNT"));
    		ledger.setComments(rs.getString("COMMENTS"));
    		return ledger;
    	}
    }

    8. LedgerDao.java

    public interface LedgerDao {
    	public void save(final Ledger item) ;
    }

    9. LedgerDaoImpl.java
    /**
     * ledger数据操作类
     */
    @Repository
    public class LedgerDaoImpl implements LedgerDao {
    
    	private static final String SAVE_SQL = "INSERT INTO LEDGER_TEMP (RECEIPT_DATE, MEMBER_NAME, CHECK_NUMBER, CHECK_DATE, PAYMENT_TYPE, DEPOSIT_AMOUNT, PAYMENT_AMOUNT, COMMENTS) VALUES(?,?

    ,?

    ,?,?

    ,?,?

    ,?)"; @Autowired private JdbcTemplate jdbcTemplate; @Override public void save(final Ledger item) { jdbcTemplate.update(SAVE_SQL, new PreparedStatementSetter() { public void setValues(PreparedStatement stmt) throws SQLException { stmt.setDate(1, new java.sql.Date(item.getReceiptDate().getTime())); stmt.setString(2, item.getMemberName()); stmt.setString(3, item.getCheckNumber()); stmt.setDate(4, new java.sql.Date(item.getCheckDate().getTime())); stmt.setString(5, item.getPaymentType()); stmt.setDouble(6, item.getDepositAmount()); stmt.setDouble(7, item.getPaymentAmount()); stmt.setString(8, item.getComments()); } }); } }


    10. LedgerWriter.java

    /**
     * ledger写入数据
     */
    @Component("ledgerWriter")
    public class LedgerWriter implements ItemWriter<Ledger> {
    
    	@Autowired
    	private LedgerDao ledgerDao;
    
    	/**
    	 * 写入数据
    	 * @param ledgers
    	 */
    	public void write(List<? extends Ledger> ledgers) throws Exception {
    		for (Ledger ledger : ledgers) {
    			ledgerDao.save(ledger);
    		}
    	}
    
    }

    11. QuartzLedgerJob.java

    /**
     * 定时调度类
     */
    @Component("quartzLedgerJob")
    public class QuartzLedgerJob {
    
    	private static final Logger LOG = LoggerFactory.getLogger(QuartzLedgerJob.class);
    
    	@Autowired
    	private JobLauncher jobLauncher;
    
    	@Autowired
    	private Job ledgerJob;
    
    	@Autowired
    	JobParametersBuilder jobParameterBulider;
    
    	private static long counter = 0l;
    	
    	/**
    	 * 运行业务方法
    	 * @throws Exception
    	 */
    	public void execute() throws Exception {
    		/**
    		 * Spring Batch Job同一个job instance,成功运行后是不同意又一次运行的, 
    		 * 失败后是否同意重跑,可通过配置Job的restartable參数来控制,默认是true,假设须要又一次运行。能够变通处理,
    		 * 加入一个JobParameters构建类,以当前时间作为參数,保证其它參数同样的情况下是不同的job instance
    		 */
    		LOG.debug("start...");
    		StopWatch stopWatch = new StopWatch(); 
    		stopWatch.start();
    		jobParameterBulider.addDate("date", new Date());
    		jobLauncher.run(ledgerJob, jobParameterBulider.toJobParameters());
    		stopWatch.stop();
    		LOG.debug("Time elapsed:{},Execute quartz ledgerJob:{}", stopWatch.prettyPrint(), ++counter);
    	}
    }


    12. StartQuartz.java

    /**
     * 启动定时调度
     * 需求描写叙述: 定时从表ledger读取数据, 然后批量写入表ledger_temp
     */
    public class StartQuartz {
    	public static void main(String[] args) throws FileNotFoundException {
    		new ClassPathXmlApplicationContext("/com/zdp/resources/springBatch.xml");
    	}
    }

    13. sql:

    create table ledger(
    	ID int(10) not null AUTO_INCREMENT PRIMARY KEY,
    	RECEIPT_DATE date,
    	MEMBER_NAME varchar(10) ,
    	CHECK_NUMBER varchar(10) ,
    	CHECK_DATE date,
    	PAYMENT_TYPE varchar(10) ,
    	DEPOSIT_AMOUNT double(10,3),
    	PAYMENT_AMOUNT double(10,3),
    	COMMENTS varchar(100) 
    );
    
    create table ledger_temp(
    	ID int(10) not null AUTO_INCREMENT PRIMARY KEY,
    	RECEIPT_DATE date,
    	MEMBER_NAME varchar(10) ,
    	CHECK_NUMBER varchar(10) ,
    	CHECK_DATE date,
    	PAYMENT_TYPE varchar(10) ,
    	DEPOSIT_AMOUNT double(10,3),
    	PAYMENT_AMOUNT double(10,3),
    	COMMENTS varchar(100) 
    );



  • 相关阅读:
    scrapy高级操作
    scrapy多url爬取
    scrapy基础使用
    selenuim
    数据解析
    python字典转为对象,用"."方式访问对象属性
    python AES.MODE_ECB(128位) pkcs5padding 加密算法
    maven
    maven在idea中的配置
    idea使用技巧
  • 原文地址:https://www.cnblogs.com/liguangsunls/p/7300199.html
Copyright © 2011-2022 走看看