zoukankan      html  css  js  c++  java
  • Spring batch的学习

      Spring batch是用来处理大量数据操作的一个框架,主要用来读取大量数据,然后进行一定处理后输出成指定的形式。

      Spring batch主要有以下部分组成:

    • JobRepository       用来注册job的容器
    • JobLauncher             用来启动Job的接口
    • Job                          实际执行的任务,包含一个或多个Step
    • Step                        step包含ItemReader、ItemProcessor和ItemWriter
    • ItemReader              用来读取数据的接口
    • ItemProcessor          用来处理数据的接口
    • ItemWriter               用来输出数据的接口

    以上Spring Batch的主要组成部分只需要注册成Spring的Bean即可。若想开启批处理的支持还需在配置类上使用@EnableBatchProcessing,在Spring Batch中提供了大量的ItemReader和ItemWriter的实现,用来读取不同的数据来源,数据的处理和校验都要通过ItemProcessor接口实现来完成。

    Spring Boot的支持

      Spring Boot对Spring Batch支持的源码位于org.springframework.boot.autoconfigure.batch下。

      Spring Boot为我们自动初始化了Spring Batch存储批处理记录的数据库。

      spring batch会自动加载hsqldb驱动,根据需求选择去留。

    下面是一个spring boot支持spring batch 的例子:

      1. 实体类

     1 public class Person {
     2     
     3     @Size(max=4,min=2) //使用JSR-303注解来校验注解
     4     private String name;
     5     
     6     private int age;
     7     
     8     private String nation;
     9     
    10     private String address;
    11 
    12     public String getName() {
    13         return name;
    14     }
    15 
    16     public void setName(String name) {
    17         this.name = name;
    18     }
    19 
    20     public int getAge() {
    21         return age;
    22     }
    23 
    24     public void setAge(int age) {
    25         this.age = age;
    26     }
    27 
    28     public String getNation() {
    29         return nation;
    30     }
    31 
    32     public void setNation(String nation) {
    33         this.nation = nation;
    34     }
    35 
    36     public String getAddress() {
    37         return address;
    38     }
    39 
    40     public void setAddress(String address) {
    41         this.address = address;
    42     }
    43 }

      2. 校验器

     1 public class CsvBeanValidator<T> implements Validator<T>,InitializingBean {
     2     private javax.validation.Validator validator; 
     3     @Override
     4     public void afterPropertiesSet() throws Exception { //使用JSR-303的Validator来校验我们的数据,在此处进行JSR-303的Validator的初始化
     5         ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
     6         validator = validatorFactory.usingContext().getValidator();
     7     }
     8 
     9     @Override
    10     public void validate(T value) throws ValidationException {
    11         Set<ConstraintViolation<T>> constraintViolations = validator.validate(value); //使用Validator的validate方法校验数据
    12         if(constraintViolations.size()>0){
    13             
    14             StringBuilder message = new StringBuilder();
    15             for (ConstraintViolation<T> constraintViolation : constraintViolations) {
    16                 message.append(constraintViolation.getMessage() + "
    ");
    17             }
    18             throw new ValidationException(message.toString());
    19 
    20         }
    21 
    22     }
    23 
    24 }

      3. ItemProcessor  

     1 public class CsvItemProcessor  extends ValidatingItemProcessor<Person>{
     2 
     3     @Override
     4     public Person process(Person item) throws ValidationException {
     5         super.process(item); //需要执行super.process(item)才会调用自定义校验器
     6 
     7         if(item.getNation().equals("汉族")){ //对数据做简单的处理,若民族为汉族,则数据转换成01,其余转换成02
     8             item.setNation("01");
     9         }else{
    10             item.setNation("02");
    11         }
    12         return item;
    13     }
    14 
    15 
    16 }

      4. Job监听(监听器要实现JobExecutionListener接口,并重写其beforeJob、afterJob方法即可)

     1 public class CsvJobListener implements JobExecutionListener{ 
     2 
     3     long startTime;
     4     long endTime;
     5     @Override
     6     public void beforeJob(JobExecution jobExecution) {
     7         startTime = System.currentTimeMillis();
     8         System.out.println("任务处理开始");
     9     }
    10 
    11     @Override
    12     public void afterJob(JobExecution jobExecution) {
    13         endTime = System.currentTimeMillis();
    14         System.out.println("任务处理结束");
    15         System.out.println("耗时:" + (endTime - startTime) + "ms");
    16     }
    17 
    18 }

      5. 配置

     1 @Configuration    
     2 @EnableBatchProcessing
     3 public class CsvBatchConfig {
     4 
     5     @Bean
     6     public ItemReader<Person> reader() throws Exception {
     7         FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); //使用FlatFileItemReader读取文件
     8         reader.setResource(new ClassPathResource("people.csv")); //使用FlatFileItemReader的setResource方法设置CSV文件的路径
     9             reader.setLineMapper(new DefaultLineMapper<Person>() {{ //在此处对CVS文件的数据和领域模型类做对应映射
    10                 setLineTokenizer(new DelimitedLineTokenizer() {{
    11                     setNames(new String[] { "name","age", "nation" ,"address"});
    12                 }});
    13                 setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
    14                     setTargetType(Person.class);
    15                 }});
    16             }});
    17             return reader;
    18     }
    19     
    20     @Bean
    21     public ItemProcessor<Person, Person> processor() {
    22         CsvItemProcessor processor = new CsvItemProcessor(); //使用自定义的ItemProcessor的实现
    23         processor.setValidator(csvBeanValidator()); //为Processor指定校验器
    24         return processor;
    25     }
    26     
    27     
    28 
    29     @Bean
    30     public ItemWriter<Person> writer(DataSource dataSource) {//Spring能让容器中已有的Bean以参数的形式注入,Spring boot已经定义了DataSource
    31         JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); //使用JDBC批处理的JdbcBatchItemWriter来写数据到数据库
    32         writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
    33         String sql = "insert into person " + "(id,name,age,nation,address) "
    34                 + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)";
    35         writer.setSql(sql); //在此设置要执行批处理的sql语句
    36         writer.setDataSource(dataSource);
    37         return writer;
    38     }
    39 
    40     @Bean
    41     public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)
    42             throws Exception {
    43         JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    44         jobRepositoryFactoryBean.setDataSource(dataSource);
    45         jobRepositoryFactoryBean.setTransactionManager(transactionManager);
    46         jobRepositoryFactoryBean.setDatabaseType("oracle");
    47         return jobRepositoryFactoryBean.getObject();
    48     }
    49 
    50     @Bean
    51     public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager)
    52             throws Exception {
    53         SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    54         jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
    55         return jobLauncher;
    56     }
    57 
    58     @Bean
    59     public Job importJob(JobBuilderFactory jobs, Step s1) {
    60         return jobs.get("importJob")
    61                 .incrementer(new RunIdIncrementer())
    62                 .flow(s1) //指定step
    63                 .end()
    64                 .listener(csvJobListener()) //绑定监听器
    65                 .build();
    66     }
    67 
    68     @Bean
    69     public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer,
    70             ItemProcessor<Person,Person> processor) {
    71         return stepBuilderFactory
    72                 .get("step1")
    73                 .<Person, Person>chunk(65000) //批处理每次提交65000条数据
    74                 .reader(reader) //给step绑定reader
    75                 .processor(processor) //给step绑定Processor
    76                 .writer(writer) //给step绑定writer
    77                 .build();
    78     }
    79 
    80 
    81 
    82     @Bean
    83     public CsvJobListener csvJobListener() {
    84         return new CsvJobListener();
    85     }
    86 
    87     @Bean
    88     public Validator<Person> csvBeanValidator() {
    89         return new CsvBeanValidator<Person>();
    90     }
    91     
    92 
    93 }

      6.application.xml

    1 spring.datasource.driverClassName=oracle.jdbc.OracleDriver
    2 spring.datasource.url=jdbc:oracle:thin:@localhost:1521:xe
    3 spring.datasource.username=boot
    4 spring.datasource.password=boot
    5 
    6 spring.batch.job.enabled=true
    7 
    8 logging.level.org.springframework.web = DEBUG

    上面的例子是自动触发批处理的,当我们需要手动触发批处理时,需要将CsvBatchConfig类的@Configuration注解注释掉,让此配置类不再起效,新建TriggerBatchConfig配置类,内容与CsvBatchConfig完全一致,除了修改定义ItemReader这个Bean;另外,还需要修改application.xml配置文件spring.batch.job.enable=false

     1 @Configuration
     2 @EnableBatchProcessing
     3 public class TriggerBatchConfig {
     4 
     5     @Bean
     6     @StepScope
     7     public FlatFileItemReader<Person> reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) throws Exception {
     8         FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); //
     9          reader.setResource(new ClassPathResource(pathToFile)); //
    10             reader.setLineMapper(new DefaultLineMapper<Person>() {{ //
    11                 setLineTokenizer(new DelimitedLineTokenizer() {{
    12                     setNames(new String[] { "name","age", "nation" ,"address"});
    13                 }});
    14                 setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
    15                     setTargetType(Person.class);
    16                 }});
    17             }});
    18            
    19             return reader;
    20     }
    21     
    22     @Bean
    23     public ItemProcessor<Person, Person> processor() {
    24         CsvItemProcessor processor = new CsvItemProcessor(); 
    25         processor.setValidator(csvBeanValidator()); 
    26         return processor;
    27     }
    28     
    29     
    30 
    31     @Bean
    32     public ItemWriter<Person> writer(DataSource dataSource) {
    33         JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); 
    34         writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
    35         String sql = "insert into person " + "(id,name,age,nation,address) "
    36                 + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)";
    37         writer.setSql(sql); //3
    38         writer.setDataSource(dataSource);
    39         return writer;
    40     }
    41 
    42     @Bean
    43     public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)
    44             throws Exception {
    45         JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    46         jobRepositoryFactoryBean.setDataSource(dataSource);
    47         jobRepositoryFactoryBean.setTransactionManager(transactionManager);
    48         jobRepositoryFactoryBean.setDatabaseType("oracle");
    49         return jobRepositoryFactoryBean.getObject();
    50     }
    51 
    52     @Bean
    53     public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager)
    54             throws Exception {
    55         SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    56         jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
    57         return jobLauncher;
    58     }
    59 
    60     @Bean
    61     public Job importJob(JobBuilderFactory jobs, Step s1) {
    62         return jobs.get("importJob")
    63                 .incrementer(new RunIdIncrementer())
    64                 .flow(s1) 
    65                 .end()
    66                 .listener(csvJobListener()) 
    67                 .build();
    68     }
    69 
    70     @Bean
    71     public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer,
    72             ItemProcessor<Person,Person> processor) {
    73         return stepBuilderFactory
    74                 .get("step1")
    75                 .<Person, Person>chunk(65000) 
    76                 .reader(reader) 
    77                 .processor(processor) 
    78                 .writer(writer) 
    79                 .build();
    80     }
    81 
    82 
    83 
    84     @Bean
    85     public CsvJobListener csvJobListener() {
    86         return new CsvJobListener();
    87     }
    88 
    89     @Bean
    90     public Validator<Person> csvBeanValidator() {
    91         return new CsvBeanValidator<Person>();
    92     }
    93     
    94 
    95 }

      控制层代码

     1 @RestController
     2 public class DemoController {
     3     
     4         @Autowired
     5         JobLauncher jobLauncher;
     6 
     7         @Autowired
     8         Job importJob;
     9         public JobParameters   jobParameters;
    10         
    11         @RequestMapping("/read")
    12         public String imp(String fileName) throws Exception{
    13             
    14             String path = fileName+".csv";
    15             jobParameters = new JobParametersBuilder()
    16                     .addLong("time", System.currentTimeMillis())
    17                     .addString("input.file.name", path)
    18                     .toJobParameters();
    19             jobLauncher.run(importJob,jobParameters);
    20             return "ok";
    21         }
    22 
    23 }
  • 相关阅读:
    SQL面试题---比较上午vs下午的交易量
    SQL---子查询(subquery)
    SQL创建语句
    数据结构---array与python list的区别
    对比SQL查询语句与Pandas语法(SQL vs Pandas)---基础篇
    python解析图片二维码
    更改mysql数据库主键自增时报错ALTER TABLE causes auto_increment resequencing, resulting in duplicate entry '1'
    Linux添加vip快捷方式
    mysql8.0.23克隆插件的实践
    gtid多源复制Last_Errno: 1007故障处理
  • 原文地址:https://www.cnblogs.com/kevin443/p/6753703.html
Copyright © 2011-2022 走看看