说明
我这里使用的spring boot,同理非spring boot项目可以参考spring boot自动化配置类:org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration 进行手动java bean config方式进行手动配置
需要知道以下知识点
《spring源码阅读(五)-Spring Import注解使用》
准备工作
1.导入pom依赖
<!-- spring batch --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <!--spring-boot-starter-jdbc自动配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <!--数据库驱动 我本地数据库是8.0所以使用8.0--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.18</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.4</version> </dependency> <!--spring mvc--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
2.创建jobRepostiry数据库表
在依赖的batch core包里找到sql脚本 里面有各个存储元数据的数据源 和配置方式 我们用mysql就行
每个表作用的相关介绍可以参考:https://www.cnblogs.com/LQBlog/p/15429882.html#autoid-3-7-0
3.application.properties配置
# jdbc_config datasource spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.url=jdbc:mysql://127.0.0.1:3306/spring_batch_db?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull spring.datasource.username=root spring.datasource.password=868978 # Hikari will use the above plus the following to setup connection pooling spring.datasource.type=com.zaxxer.hikari.HikariDataSource spring.datasource.hikari.minimum-idle=5 spring.datasource.hikari.maximum-pool-size=15 spring.datasource.hikari.auto-commit=true spring.datasource.hikari.idle-timeout=30000 spring.datasource.hikari.pool-name=DatebookHikariCP spring.datasource.hikari.max-lifetime=1800000 spring.datasource.hikari.connection-timeout=30000 #关闭自启动job 源码处org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration spring.batch.job.enabled=false
4.使用EnableBatchProcessing完成部分配置
@SpringBootApplication
//可选参数预先初始化还是延迟初始化
@EnableBatchProcessing(modular = true)
public class SpringBatchDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchDemoApplication.class, args);
}
}
初始化job
定义ItemReader
这里通过读取数据来模拟读取
/** * @Project spring-batch-test-demo * @PackageName springbatchsimpledemo.demo.step.reader * @ClassName DemoReader * @Author qiang.li * @Date 2021/10/21 2:54 下午 * @Description 模拟读数据 */ public class DemoReader implements ItemReader<String> { //模拟数据 private static List<String> datas=new ArrayList<>(); private int cursor=0; static { for (int j=0;j<=100;j++){ datas.add(j+""); } } @Override public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { //返回null表示读完 if(cursor>=datas.size()){ return null; } return datas.get(cursor++); } }
定义ItemProcessor
/** * @Project spring-batch-test-demo * @PackageName springbatchsimpledemo.demo.step.reader * @ClassName DemoProcessor * @Author qiang.li * @Date 2021/10/21 3:08 下午 * @Description 模拟 数据加工处理 */ public class DemoProcessor implements ItemProcessor<String,Long> { @Override public Long process(String item) throws Exception { return Long.valueOf(item); } }
定义ItemWriter
/** * @Project spring-batch-test-demo * @PackageName springbatchsimpledemo.demo.step * @ClassName DemoWriter * @Author qiang.li * @Date 2021/10/21 3:10 下午 * @Description TODO */ public class DemoWriter implements ItemWriter<Long> { @Override public void write(List<? extends Long> items) throws Exception { for (Long id: items) { System.out.println("模拟写入"+id); } } }
定义Step和job
@Configuration public class DemoJobConfig{ @Autowired private ModularBatchConfiguration modularBatchConfiguration; /** * 定义job * @return * @throws Exception */ @Bean public Job initJob() throws Exception { Job job= modularBatchConfiguration.jobBuilders() .get("demoJob") .start(initStep())//单线程 .build(); //注册到registry 后续通过registry获取job modularBatchConfiguration.jobRegistry().register(new JobFactory() { @Override public Job createJob() { return job; } @Override public String getJobName() { return job.getName(); } }); return job; } /** * 定义step * @return * @throws Exception */ public Step initStep() throws Exception { return modularBatchConfiguration.stepBuilders() .get("demoStep") .<String, Long>chunk(1) .reader(new DemoReader()) .processor((new DemoProcessor())) .writer(new DemoWriter()) .build(); } }
测试
/** * @Project spring-batch-test-demo * @PackageName springbatchsimpledemo.demo.controller * @ClassName DemoSyncJob * @Author qiang.li * @Date 2021/10/21 1:11 下午 * @Description TODO */ @Controller public class DemoJobController { @Autowired private ModularBatchConfiguration modularBatchConfiguration; /** * 启动任务,如果任务失败,再次调用则是重新执行 * BatchAutoConfiguration 初始化 */ @Autowired private JobOperator jobOperator; @RequestMapping("/startDemoJob") @ResponseBody public String startOrderJob() throws Exception { Map<String, JobParameter> parameters=new HashMap<>(); parameters.put("date",new JobParameter(11L)); JobParameters jobParameters= new JobParameters(parameters); Job job= modularBatchConfiguration.jobRegistry().getJob("demoJob"); modularBatchConfiguration.jobLauncher().run(job,jobParameters); // jobOperator.start("demoJob",jobParameters); return "success"; } @RequestMapping("/stopDemoJob") @ResponseBody public String stopJob() throws Exception { Set<Long> ids= jobOperator.getRunningExecutions("demoJob"); for (Long id: ids) { jobOperator.stop(id); } return "success"; } /** * 无视错误 启动一个新的job * @return * @throws Exception */ @RequestMapping("/startNextDemoOrderJob") @ResponseBody public String startNextDemoOrderJob() throws Exception { jobOperator.startNextInstance("demoJob"); return "success"; } }
输出结果
默认实现
关于常用的Reader和Writer spring batch提供很多基础的实现具体我们查看对应实现类就好了
ItemReader默认实现
ItemWriter默认实现
优化
前面我们创建job都是通过手动注册
//注册到registry 后续通过registry获取job modularBatchConfiguration.jobRegistry().register(new JobFactory() { @Override public Job createJob() { return job; } @Override public String getJobName() { return job.getName(); } });
我们可以利用spring的生命周期实现自动化注册
@Component public class AutoRegister implements BeanPostProcessor { @Autowired JobRegistry jobRegistry; @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if(bean instanceof Job){ try { jobRegistry.register(new JobFactory() { @Override public Job createJob() { return (Job) bean; } @Override public String getJobName() { return ((Job) bean).getName(); } }); } catch (DuplicateJobException e) { e.printStackTrace(); } } return bean; } }