zoukankan      html  css  js  c++  java
  • springbatch

    springbatch

    job的创建使用

    • job:作业,是批处理中的核心概念,是batch操作的基础单元,每个job由多个step组成

    • step:步骤,任务完成的节点

    • 每个job是由JobBuildFactory创建,每个step是由StepBuildFactory创建

    • 示例:

      ``
      @Configuration
      @EnableBatchProcessing //开启批处理
      public class JobConfiguration {

          /**
           * 创建任务对象的对象
           */
          @Autowired
          private JobBuilderFactory jobBuilderFactory;
      
          /**
           * 创建步骤对象的对象
           */
          @Autowired
          private StepBuilderFactory stepBuilderFactory;
      
          @Bean
          public Job helloJob(){
      
              return jobBuilderFactory.get("helloJob").start(helloStep1()).build();
          }
      
          @Bean
          public Step helloStep1(){
      
              return stepBuilderFactory.get("helloStep1").tasklet(new Tasklet() {
                  @Override
                  public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                      System.out.println("hello world");
                      return RepeatStatus.FINISHED;
                  }
              }).build();
          }
      }  
      

      ``

    从数据库读取

    • JdbcPagingItemReader

        . 设置数据源
        . 设置查询条件
        . 将查询结果映射成实体类
      
    • MySqlPagingQueryProvider // mysql分页查询支持器

    • 代码示例:

          JdbcPagingItemReader<User> jdbcPagingItemReader = new JdbcPagingItemReader<>();
          jdbcPagingItemReader.setDataSource(dataSource);
          jdbcPagingItemReader.setFetchSize(3);//每次从数据库拉取条数
      
          //设置查询条件
          MySqlPagingQueryProvider pagingQueryProvider = new MySqlPagingQueryProvider();
          pagingQueryProvider.setSelectClause("id,name,password");
          pagingQueryProvider.setFromClause("user");
          
          Map<String, Order> orderMap = new HashMap<>();
          orderMap.put("password",Order.DESCENDING);//设置排序字段
          pagingQueryProvider.setSortKeys(orderMap);
          jdbcPagingItemReader.setQueryProvider(pagingQueryProvider);
      
          //设置行映射器
          jdbcPagingItemReader.setRowMapper(new RowMapper<User>() {
              @Override
              public User mapRow(ResultSet resultSet, int i) throws SQLException {
                  User user = new User();
                  user.setId(resultSet.getInt(1));
                  user.setName(resultSet.getString(2));
                  user.setPassword(resultSet.getString(3));
                  return user;
              }
          });
          return jdbcPagingItemReader;
      
      

    从txt文件中获取数据

    • FlatFileItemReader
    • DelimitedLineTokenizer //行分词器
    • DefaultLineMapper //行映射器,将一行数据映射成对应的实体
    • defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
    • flatFileItemReader.setLineMapper(defaultLineMapper);
    • 代码示例:
    ```
        FlatFileItemReader<Hospital> flatFileItemReader = new FlatFileItemReader<>();
        flatFileItemReader.setEncoding("utf-8");
        flatFileItemReader.setLinesToSkip(1);//跳过第一行
        flatFileItemReader.setResource(new ClassPathResource("/data/hospital.txt"));
    
        //解析数据  按行分词
        DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
        delimitedLineTokenizer.setNames("id","org_name","org_type","addr","allow_no","cert_dept",
                "start_valid_date","end_invalid_date");
    
        //行映射器
        DefaultLineMapper<Hospital> defaultLineMapper = new DefaultLineMapper<>();
        defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
        defaultLineMapper.setFieldSetMapper(new FieldSetMapper<Hospital>() {
            @Override
            public Hospital mapFieldSet(FieldSet fieldSet) throws BindException {
                Hospital hospital = new Hospital();
                hospital.setId(fieldSet.readInt("id"));
                hospital.setAddr(fieldSet.readString("addr"));
                hospital.setAllowNo(fieldSet.readString("allow_no"));
                hospital.setCertDept(fieldSet.readString("cert_dept"));
                hospital.setEndValidDate(fieldSet.readString("end_invalid_date"));
                hospital.setOrgName(fieldSet.readString("org_name"));
                hospital.setOrgType(fieldSet.readString("org_type"));
                hospital.setStartValidDate(fieldSet.readString("start_valid_date"));
                return hospital;
            }
        });
        defaultLineMapper.afterPropertiesSet();
        flatFileItemReader.setLineMapper(defaultLineMapper);
    
        return flatFileItemReader;
    ``` 
    

    从xml中读取数据

    • StaxEventItemReader xml读取器

    • pom引入jar包

      ``

        <dependency>
            <groupId>com.thoughtworks.xstream</groupId>
            <artifactId>xstream</artifactId>
            <version>1.4.11.1</version>
        </dependency>
      
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-oxm</artifactId>
            <version>5.0.8.RELEASE</version>
        </dependency>
      

      ``

    • xstream实现xml转对象

    • spring-oxm遵循xstream接口规范,实现xml转对象

    • 代码示例:

      ``

            @Bean
            public ItemReader<User> xmlItemReader() {
        
                StaxEventItemReader<User> staxEventItemReader = new StaxEventItemReader<>();
                staxEventItemReader.setFragmentRootElementName("user");//设置根标签
                staxEventItemReader.setResource(new ClassPathResource("/data/user.xml"));//设置文件路径
        
                //将xml转成实体对象
                XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
                Map<String,Class> alias = new HashMap<>();
                alias.put("user",User.class);//key 为根标签,class是key标签下的所有标签映射到的对象
                xStreamMarshaller.setAliases(alias);
        
                staxEventItemReader.setUnmarshaller(xStreamMarshaller);
        
                return staxEventItemReader;
            }
      

      ``

    多文件读取

    • MultiResourceItemReader

    • 多文件读取,其实就是一个一个文件读取

    • MultiResourceItemReader需要设置文件读取代理,待读取文件资源

    • 代码示例:

      ``

      @Value("classpath:/data/file*.txt")
      private Resource[] resources;

      @Bean
      public MultiResourceItemReader multiFileItemReader() {

         MultiResourceItemReader<Hospital> multiResourceItemReader = new MultiResourceItemReader<>();
         //多文件读取,其实也是一个一个文件进行读取,需要设置文件读取器
         multiResourceItemReader.setDelegate(flatFileItemReader());
         multiResourceItemReader.setResources(resources);
         return multiResourceItemReader;
      

      }

      @Bean
      public FlatFileItemReader flatFileItemReader(){
      FlatFileItemReader flatFileItemReader = new FlatFileItemReader<>();
      flatFileItemReader.setEncoding("utf-8");

         DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
         delimitedLineTokenizer.setDelimiter(",");
         delimitedLineTokenizer.setNames("id","org_name","org_type","addr","allow_no","cert_dept",
                 "start_valid_date","end_invalid_date");
      
         DefaultLineMapper<Hospital> defaultLineMapper = new DefaultLineMapper<>();
         defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
         defaultLineMapper.setFieldSetMapper(new FieldSetMapper<Hospital>() {
             @Override
             public Hospital mapFieldSet(FieldSet fieldSet) throws BindException {
                 Hospital hospital = new Hospital();
                 hospital.setStartValidDate(fieldSet.readString("start_valid_date"));
                 hospital.setOrgType(fieldSet.readString("org_type"));
                 hospital.setOrgName(fieldSet.readString("org_name"));
                 hospital.setEndValidDate(fieldSet.readString("end_invalid_date"));
                 hospital.setCertDept(fieldSet.readString("cert_dept"));
                 hospital.setAllowNo(fieldSet.readString("allow_no"));
                 hospital.setAddr(fieldSet.readString("addr"));
                 hospital.setId(fieldSet.readInt("id"));
                 return hospital;
             }
         });
      
         flatFileItemReader.setLineMapper(defaultLineMapper);
      
         return flatFileItemReader;
      

      }
      ``

    从文件中读取,并写入到数据库

    • JdbcBatchItemWriter

    • 代码示例:

      
          @Bean
          public ItemWriter<Hospital> jdbcBatchItemWriter() {
              JdbcBatchItemWriter<Hospital> jdbcBatchItemWriter = new JdbcBatchItemWriter<>();
              jdbcBatchItemWriter.setDataSource(dataSource);
              jdbcBatchItemWriter.setSql(
                      "INSERT INTO `hospital` (`id`, `org_name`, `org_type`, `addr`, `allow_no`, `cert_dept`, `start_valid_date`, `end_invalid_date`) " +
                              "VALUES (:id, :orgName, :orgType, :addr, :allowNo, :certDept, :startValidDate, :endValidDate)"
              );
              //将对象属性值映射到sql参数中
              jdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
              return jdbcBatchItemWriter;
          }
      
      
      

    从数据库读取,写入到普通文件

    • FlatItemFileWriter

    • 代码示例:

          @Bean
          public ItemWriter<? super Hospital> flatFileItemWriter() {
      
              FlatFileItemWriter<Hospital> flatFileItemWriter = new FlatFileItemWriter<>();
              flatFileItemWriter.setEncoding("utf-8");
              //设置存放数据的文件路径
              flatFileItemWriter.setResource(new FileSystemResource("f:/hospital_generate.txt"));
      
              flatFileItemWriter.setLineAggregator(new LineAggregator<Hospital>() {
                  ObjectMapper objectMapper = new ObjectMapper();
                  @Override
                  public String aggregate(Hospital item) {
                      String result = null;
                      try {
                          result = objectMapper.writeValueAsString(item);//将对象转json字符串
                      } catch (JsonProcessingException e) {
                          e.printStackTrace();
                      }
      
                      return result;
                  }
              });
              return flatFileItemWriter;
      
          }
      

    从数据库读取并写入到xml文件

    • StaxEventItemWriter

    • XstreamMarshaller - xml标签元素与实体对象映射

    • 代码示例

          @Bean
          public ItemWriter<? super Hospital> xmlFileItemWriter() {
              StaxEventItemWriter<Hospital> staxEventItemWriter = new StaxEventItemWriter<>();
              staxEventItemWriter.setEncoding("utf-8");
              staxEventItemWriter.setResource(new FileSystemResource("f:/hospital.xml"));
              staxEventItemWriter.setRootTagName("hospitals");
      
              //XML标签与实体对象映射
              Map<String,Class<Hospital>> alias = new HashMap<>();
              alias.put("hospital",Hospital.class);
      
              XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
              xStreamMarshaller.setAliases(alias);
      
              staxEventItemWriter.setMarshaller(xStreamMarshaller);
      
              return staxEventItemWriter;
      
          }
      
  • 相关阅读:
    Handsontable添加超链接
    Handsontable 筛选事件
    handsontable自定义渲染
    M1 Mac安装 Homebrew
    Pypi官网怎么找历史依赖包
    在 CentOS7 中我们在安装 MySQL
    Ansible使用yum安装
    Ansible集群自动化运维操作
    java对list中map集合中某个字段排序
    使用hive的orcfiledump命令查看orc文件
  • 原文地址:https://www.cnblogs.com/sxqjava/p/10408481.html
Copyright © 2011-2022 走看看