例子
@SpringBootApplication //可选参数预先初始化还是延迟初始化 @EnableBatchProcessing(modular = true) public class SpringBatchDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchDemoApplication.class, args); } }
我们使用spring batch 使用了 @EnableBatchProcessing 此注解
@EnableBatchProcessing作用
我们打开源码可以发现使用Import注解 import注解使用可以查看 https://www.cnblogs.com/LQBlog/p/15410425.html
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(BatchConfigurationSelector.class)//<1> public @interface EnableBatchProcessing { /** * Indicate whether the configuration is going to be modularized into multiple application contexts. If true then * you should not create any @Bean Job definitions in this context, but rather supply them in separate (child) * contexts through an {@link ApplicationContextFactory}. * * @return boolean indicating whether the configuration is going to be * modularized into multiple application contexts. Defaults to false. */ boolean modular() default false; }
<1>
public class BatchConfigurationSelector implements ImportSelector { @Override public String[] selectImports(AnnotationMetadata importingClassMetadata) { Class<?> annotationType = EnableBatchProcessing.class; //获取注解元数据 AnnotationAttributes attributes = AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes( annotationType.getName(), false)); Assert.notNull(attributes, String.format("@%s is not present on importing class '%s' as expected", annotationType.getSimpleName(), importingClassMetadata.getClassName())); String[] imports; //是实时初始化 延时延迟初始化 if (attributes.containsKey("modular") && attributes.getBoolean("modular")) { //<2> 导入ModularBatchConfiguration类 imports = new String[] { ModularBatchConfiguration.class.getName() }; } else { //是延迟初始化 点进去看代码可以看到get相关对象都是创建代理 imports = new String[] { SimpleBatchConfiguration.class.getName() }; } return imports; }
<2>
/** * @Project spring-batch-test-demo * @PackageName springbatchsimpledemo.demo.batch.job.order * @ClassName Test * @Author qiang.li * @Date 2021/10/21 5:51 下午 * @Description TODO */ //proxyBeanMethods = false @Bean创建的对象将不使用代理 @Configuration(proxyBeanMethods = false) public class ModularBatchConfiguration extends AbstractBatchConfiguration { @Autowired private ApplicationContext context; //初始化参考<> @Autowired(required = false) private Collection<BatchConfigurer> configurers; private AutomaticJobRegistrar registrar = new AutomaticJobRegistrar(); /** * 获得jobRepository 主要是用来管理 job执行期间相关元数据的 * 比如redis,文件,h2库 默认我们都是用的mysql * 可参考:org.springframework.batch.core.repository.support.SimpleJobRepository * @return * @throws Exception */ @Override @Bean public JobRepository jobRepository() throws Exception { return getConfigurer(configurers).getJobRepository(); } /** * 获得JobLauncher * JobLauncher主要job的容器 主要是管理job的启动 * @return * @throws Exception */ @Override @Bean public JobLauncher jobLauncher() throws Exception { //<3>可以看到获取是根据configures 是通过容器注入的 return getConfigurer(configurers).getJobLauncher(); } /** * job 执行期间的提交和关闭事物使用的事物管理器 * @return * @throws Exception */ @Override @Bean public PlatformTransactionManager transactionManager() throws Exception { // //<3>可以看到获取是根据configures 是通过容器注入的 return getConfigurer(configurers).getTransactionManager(); } /** * 与JobRepository 后续看源码再看具体用来做啥 * @return * @throws Exception */ @Override @Bean public JobExplorer jobExplorer() throws Exception { // //<3>可以看到获取是根据configures 是通过容器注入的 return getConfigurer(configurers).getJobExplorer(); } /** * J内部通过实现 LifecycleProcessor 当spring 容器启动成功时候自动实现将容器中job 注册到jobRegistry
* * @return * @throws Exception */ @Bean public AutomaticJobRegistrar jobRegistrar() throws Exception { for (ApplicationContextFactory factory : context.getBeansOfType(ApplicationContextFactory.class).values()) { registrar.addApplicationContextFactory(factory); } return registrar; } }
<3>
org.springframework.batch.core.configuration.annotation.AbstractBatchConfiguration#getConfigurer
protected BatchConfigurer getConfigurer(Collection<BatchConfigurer> configurers) throws Exception { //如果不为空就使用configurer if (this.configurer != null) { return this.configurer; } //如果容器没有初始化默认使用DefaultBatchConfigurer if (configurers == null || configurers.isEmpty()) { if (dataSource == null) { DefaultBatchConfigurer configurer = new DefaultBatchConfigurer(); configurer.initialize(); this.configurer = configurer; return configurer; } else { DefaultBatchConfigurer configurer = new DefaultBatchConfigurer(dataSource); configurer.initialize(); this.configurer = configurer; return configurer; } } if (configurers.size() > 1) { throw new IllegalStateException( "To use a custom BatchConfigurer the context must contain precisely one, found " + configurers.size()); } this.configurer = configurers.iterator().next(); return this.configurer; }
BatchConfiure接口
通过此接口我们可以自定义jobRepostory JobLauncher jobExplore等
定义
public interface BatchConfigurer { //获得jobRepository JobRepository getJobRepository() throws Exception; //获得事物管理器 PlatformTransactionManager getTransactionManager() throws Exception; //获得JobLauncher JobLauncher getJobLauncher() throws Exception; //获得JobExplorer JobExplorer getJobExplorer() throws Exception; }
类图
BatchConfiure初始化
spring boot项目参考自动化配置:org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration 非Spring boot项目 就参考这个类自己配置就行了
@Configuration( proxyBeanMethods = false ) @ConditionalOnClass({JobLauncher.class, DataSource.class})//class Path有JobLauncher.class, DataSource.class 类时 @AutoConfigureAfter({HibernateJpaAutoConfiguration.class}) @ConditionalOnBean({JobLauncher.class})//当容器中有JobLauncher 实现的时候 @EnableConfigurationProperties({BatchProperties.class})//加载配置 @Import({BatchConfigurerConfiguration.class, DatabaseInitializationDependencyConfigurer.class})//<4>BatchConfigurerConfiguration 为BatchConfigurer的自动化配置 public class BatchAutoConfiguration { public BatchAutoConfiguration() { } /** * spring.batch.job.enabled 为true时 默认为true * JobLauncherApplicationRunner 主要作用就是实现ApplicationRunner 接口在 项目启动后自动启动job * @param jobLauncher * @param jobExplorer * @param jobRepository * @param properties * @return */ @Bean @ConditionalOnMissingBean @ConditionalOnProperty( prefix = "spring.batch.job", name = {"enabled"}, havingValue = "true", matchIfMissing = true ) public JobLauncherApplicationRunner jobLauncherApplicationRunner(JobLauncher jobLauncher, JobExplorer jobExplorer, JobRepository jobRepository, BatchProperties properties) { JobLauncherApplicationRunner runner = new JobLauncherApplicationRunner(jobLauncher, jobExplorer, jobRepository); String jobNames = properties.getJob().getNames(); if (StringUtils.hasText(jobNames)) { runner.setJobNames(jobNames); } return runner; } @Bean @ConditionalOnMissingBean({ExitCodeGenerator.class}) public JobExecutionExitCodeGenerator jobExecutionExitCodeGenerator() { return new JobExecutionExitCodeGenerator(); } /** * jobLauncher的容器,提供了跟丰富的方法 如停止job * @param jobParametersConverter * @param jobExplorer * @param jobLauncher * @param jobRegistry * @param jobRepository * @return * @throws Exception */ @Bean @ConditionalOnMissingBean({JobOperator.class}) public SimpleJobOperator jobOperator(ObjectProvider<JobParametersConverter> jobParametersConverter, JobExplorer jobExplorer, JobLauncher jobLauncher, ListableJobLocator jobRegistry, JobRepository jobRepository) throws Exception { SimpleJobOperator factory = new SimpleJobOperator(); factory.setJobExplorer(jobExplorer); factory.setJobLauncher(jobLauncher); factory.setJobRegistry(jobRegistry); factory.setJobRepository(jobRepository); jobParametersConverter.ifAvailable(factory::setJobParametersConverter); return factory; } @Configuration( proxyBeanMethods = false ) @ConditionalOnBean({DataSource.class}) @ConditionalOnClass({DatabasePopulator.class}) static class DataSourceInitializerConfiguration { DataSourceInitializerConfiguration() { } @Bean @ConditionalOnMissingBean BatchDataSourceInitializer batchDataSourceInitializer(DataSource dataSource, @BatchDataSource ObjectProvider<DataSource> batchDataSource, ResourceLoader resourceLoader, BatchProperties properties) { return new BatchDataSourceInitializer((DataSource)batchDataSource.getIfAvailable(() -> { return dataSource; }), resourceLoader, properties); } } }
<4>
@ConditionalOnClass({PlatformTransactionManager.class}) @ConditionalOnBean({DataSource.class}) @ConditionalOnMissingBean({BatchConfigurer.class}) @Configuration( proxyBeanMethods = false ) class BatchConfigurerConfiguration { BatchConfigurerConfiguration() { } /** * 如果容器中存在entityManagerFactory 表示用的jpa使用JpaBatchConfigurer */ @Configuration( proxyBeanMethods = false ) @ConditionalOnClass({EntityManagerFactory.class}) @ConditionalOnBean( name = {"entityManagerFactory"} ) static class JpaBatchConfiguration { JpaBatchConfiguration() { } @Bean JpaBatchConfigurer batchConfigurer(BatchProperties properties, DataSource dataSource, @BatchDataSource ObjectProvider<DataSource> batchDataSource, ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers, EntityManagerFactory entityManagerFactory) { return new JpaBatchConfigurer(properties, (DataSource)batchDataSource.getIfAvailable(() -> { return dataSource; }), (TransactionManagerCustomizers)transactionManagerCustomizers.getIfAvailable(), entityManagerFactory); } } /** * 如果不是使用的jpa则使用BasicBatchConfigurer */ @Configuration( proxyBeanMethods = false ) @ConditionalOnMissingBean( name = {"entityManagerFactory"} ) static class JdbcBatchConfiguration { JdbcBatchConfiguration() { } @Bean BasicBatchConfigurer batchConfigurer(BatchProperties properties, DataSource dataSource, @BatchDataSource ObjectProvider<DataSource> batchDataSource, ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) { return new BasicBatchConfigurer(properties, (DataSource)batchDataSource.getIfAvailable(() -> { return dataSource; }), (TransactionManagerCustomizers)transactionManagerCustomizers.getIfAvailable()); } } }
BasicBatchConfigure
public class BasicBatchConfigurer implements BatchConfigurer, InitializingBean { //实现了spring的InitializingBean 执行初始化 public void afterPropertiesSet() { this.initialize(); } public void initialize() { try { this.transactionManager = this.buildTransactionManager(); this.jobRepository = this.createJobRepository(); this.jobLauncher = this.createJobLauncher(); this.jobExplorer = this.createJobExplorer(); } catch (Exception var2) { throw new IllegalStateException("Unable to initialize Spring Batch", var2); } } protected JobExplorer createJobExplorer() throws Exception { PropertyMapper map = PropertyMapper.get(); JobExplorerFactoryBean factory = new JobExplorerFactoryBean(); factory.setDataSource(this.dataSource); BatchProperties.Jdbc var10001 = this.properties.getJdbc(); var10001.getClass(); map.from(var10001::getTablePrefix).whenHasText().to(factory::setTablePrefix); factory.afterPropertiesSet(); return factory.getObject(); } protected JobLauncher createJobLauncher() throws Exception { //默认SimpleJobLauncher SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(this.getJobRepository()); jobLauncher.afterPropertiesSet(); return jobLauncher; } protected JobRepository createJobRepository() throws Exception { //默认使用JobRepositoryFactoryBean JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); PropertyMapper map = PropertyMapper.get(); map.from(this.dataSource).to(factory::setDataSource); map.from(this::determineIsolationLevel).whenNonNull().to(factory::setIsolationLevelForCreate); BatchProperties.Jdbc var10001 = this.properties.getJdbc(); var10001.getClass(); map.from(var10001::getTablePrefix).whenHasText().to(factory::setTablePrefix); map.from(this::getTransactionManager).to(factory::setTransactionManager); factory.afterPropertiesSet(); return factory.getObject(); } protected String determineIsolationLevel() { return null; } private PlatformTransactionManager buildTransactionManager() { PlatformTransactionManager transactionManager = this.createTransactionManager(); if (this.transactionManagerCustomizers != null) { this.transactionManagerCustomizers.customize(transactionManager); } return transactionManager; } protected PlatformTransactionManager createTransactionManager() { return new DataSourceTransactionManager(this.dataSource); } }