zoukankan      html  css  js  c++  java
  • elastic-job+zookeeper实现分布式定时任务调度的使用(springboot版本)

    总体思路,要确认一个定时任务需要一个cron表达式+jobDetail;

    现在要让实现定时任务的协调,则就让zookeeper,简单说就是需要3要素,zk对象+cron+jobDetail;

    总的项目结构

    1、maven引入依赖

    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    </dependency>
    <dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
    </dependency>
    <dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.5</version>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>
    </dependencies>

    application.properties如下:

    server.port=8766
    spring.application.name=scheduler-service



    regCenter.serverList = localhost:2181
    regCenter.namespace = elastic-job-lite-springboot

    stockJob.cron = 0/5 * * * * ?

    stockJob.shardingTotalCount = 2


    stockJob.shardingItemParameters = 0=Chengdu0,1=Chengdu1

    其中

    stockJob.cron为定时任务的cron表达式;
    stockJob.shardingTotalCount为任务的分数量(即同时同时开几个定时任务);
    stockJob.shardingItemParameters为任务分片携带的参数;


     要素1-zookeeper
    创建一个bean,用来配置zk
    package com.example.elasticjobdemo.config;

    import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
    import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    @Configuration
    @ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
    public class JobRegistryCenterConfig {

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
    return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }

    }

    创建一个任务,jobdetail

    package com.example.elasticjobdemo.config;

    import com.dangdang.ddframe.job.api.ShardingContext;
    import com.dangdang.ddframe.job.api.simple.SimpleJob;
    import org.springframework.beans.factory.annotation.Autowired;

    public class StockSimpleJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
    System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, " +
    "当前分片项: %s.当前参数: %s,"+
    "当前任务名称: %s.当前任务参数: %s"
    ,
    Thread.currentThread().getId(),
    shardingContext.getShardingTotalCount(),
    shardingContext.getShardingItem(),
    shardingContext.getShardingParameter(),
    shardingContext.getJobName(),
    shardingContext.getJobParameter()

    ));

    }
    }

    再创建一个类,把3要素连接起来

    package com.example.elasticjobdemo.config;

    import com.dangdang.ddframe.job.api.simple.SimpleJob;
    import com.dangdang.ddframe.job.config.JobCoreConfiguration;
    import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
    import com.dangdang.ddframe.job.lite.api.JobScheduler;
    import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
    import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
    import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    @Configuration
    public class StockJobConfig {

    @Autowired
    private JobRegistryCenterConfig jobRegistryCenterConfig;
    @Autowired
    private ZookeeperRegistryCenter regCenter;

    public StockJobConfig() {
    }

    @Bean
    public SimpleJob stockJob(){
    return new StockSimpleJob();
    }



    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${stockJob.cron}") final String cron, @Value("${stockJob.shardingTotalCount}") final int shardingTotalCount,
    @Value("${stockJob.shardingItemParameters}") final String shardingItemParameters) {
    return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));
    }

    /**
    *@Description 任务配置类
    */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
    final String cron,
    final int shardingTotalCount,
    final String shardingItemParameters){


    return LiteJobConfiguration
    .newBuilder(
    new SimpleJobConfiguration(
    JobCoreConfiguration.newBuilder(
    jobClass.getName(),cron,shardingTotalCount)
    .shardingItemParameters(shardingItemParameters)
    .build()
    ,jobClass.getCanonicalName()
    )
    )
    .overwrite(true)
    .build();

    }
    }
    boot任务启动后的效果(需要先启动zk,这里我填了分片数为2)
     
     
     
    欢迎关注我的公众号:“进阶者euj”
  • 相关阅读:
    geoserver 文件系统
    geoserver 源码介绍
    geoserver 开发2
    geoserver 开发1
    geoserver笔记
    linux 下安装gult
    LINUX 笔记5
    SQLSTATE[HY000] [2002] 乱码
    微信开发
    javascript记忆
  • 原文地址:https://www.cnblogs.com/yeyongjian/p/8906474.html
Copyright © 2011-2022 走看看