zoukankan      html  css  js  c++  java
  • Elastic Job入门(3)

    引入pom文件

          <dependency>
                <groupId>com.dangdang</groupId>
                <artifactId>elastic-job-lite-core</artifactId>
            </dependency>
            <dependency>
                <groupId>com.dangdang</groupId>
                <artifactId>elastic-job-lite-spring</artifactId>
            </dependency>

    yml文件配置

    regCenter:
      serverList: localhost:6181
      namespace: elastic-job-lite-springboot
      
    simpleJob:
      cron: 0/5 * * * * ?
      shardingTotalCount: 3
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
      
    dataflowJob:
      cron: 0/5 * * * * ?
      shardingTotalCount: 3
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou

    编写Job

    public class SpringSimpleJob implements SimpleJob {
        
        @Resource
        private FooRepository fooRepository;
        
        @Override
        public void execute(final ShardingContext shardingContext) {
            System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
                    shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "SIMPLE"));
            List<Foo> data = fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
            for (Foo each : data) {
                fooRepository.setCompleted(each.getId());
            }
        }
    }
    public class SpringDataflowJob implements DataflowJob<Foo> {
        
        @Resource
        private FooRepository fooRepository;
        
        @Override
        public List<Foo> fetchData(final ShardingContext shardingContext) {
            System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
                    shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW FETCH"));
            return fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
        }
        
        @Override
        public void processData(final ShardingContext shardingContext, final List<Foo> data) {
            System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
                    shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW PROCESS"));
            for (Foo each : data) {
                fooRepository.setCompleted(each.getId());
            }
        }
    }

    配置ZooKeeper

    @Configuration
    @ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
    public class RegistryCenterConfig {
        
        @Bean(initMethod = "init")
        public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
            return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
        }
    }

    配置作业状态监听

    @Configuration
    public class JobEventConfig {
    
        @Resource
        private DataSource dataSource;
    
        @Bean
        public JobEventConfiguration jobEventConfiguration() {
            return new JobEventRdbConfiguration(dataSource);
        }
    }

    配置Job

    @Configuration
    public class SimpleJobConfig {
        
        @Resource
        private ZookeeperRegistryCenter regCenter;
        
        @Resource
        private JobEventConfiguration jobEventConfiguration;
        
        @Bean
        public SimpleJob simpleJob() {
            return new SpringSimpleJob(); 
        }
        
        @Bean(initMethod = "init")
        public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${simpleJob.cron}") final String cron, @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
                                               @Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
            return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
        }
        
        private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
            return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
                    jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
        }
    }
    @Configuration
    public class DataflowJobConfig {
        
        @Resource
        private ZookeeperRegistryCenter regCenter;
        
        @Resource
        private JobEventConfiguration jobEventConfiguration;
        
        @Bean
        public DataflowJob dataflowJob() {
            return new SpringDataflowJob(); 
        }
        
        @Bean(initMethod = "init")
        public JobScheduler dataflowJobScheduler(final DataflowJob dataflowJob, @Value("${dataflowJob.cron}") final String cron, @Value("${dataflowJob.shardingTotalCount}") final int shardingTotalCount,
                                               @Value("${dataflowJob.shardingItemParameters}") final String shardingItemParameters) {
            return new SpringJobScheduler(dataflowJob, regCenter, getLiteJobConfiguration(dataflowJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
        }
        
        private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends DataflowJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
            return LiteJobConfiguration.newBuilder(new DataflowJobConfiguration(JobCoreConfiguration.newBuilder(
                    jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName(), true)).overwrite(true).build();
        }
    }

     更多使用方法,可以参照官方包里边的example案例。

  • 相关阅读:
    使用koa+mongodb构建的仿知乎接口(二)
    使用koa+mongodb构建的仿知乎接口(一)
    flask学习笔记
    后端遇到一些问题
    前端项目一些细节总结
    python基础学习
    vue本地运行项目使用iframe的跨域问题
    hover状态下改变图片颜色的方式 悬停图片切换;css变量;悬停svg图片改变颜色;VUE
    深拷贝
    git初使用
  • 原文地址:https://www.cnblogs.com/ijavanese/p/9974530.html
Copyright © 2011-2022 走看看