总体思路,要确认一个定时任务需要一个cron表达式+jobDetail;
现在要让实现定时任务的协调,则就让zookeeper,简单说就是需要3要素,zk对象+cron+jobDetail;
总的项目结构
1、maven引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.properties如下:
server.port=8766
spring.application.name=scheduler-service
regCenter.serverList = localhost:2181
regCenter.namespace = elastic-job-lite-springboot
stockJob.cron = 0/5 * * * * ?
stockJob.shardingTotalCount = 2
stockJob.shardingItemParameters = 0=Chengdu0,1=Chengdu1
其中
stockJob.cron为定时任务的cron表达式;
stockJob.shardingTotalCount为任务的分数量(即同时同时开几个定时任务);
stockJob.shardingItemParameters为任务分片携带的参数;
要素1-zookeeper
创建一个bean,用来配置zk
package com.example.elasticjobdemo.config;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class JobRegistryCenterConfig {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
}
创建一个任务,jobdetail
package com.example.elasticjobdemo.config;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.springframework.beans.factory.annotation.Autowired;
public class StockSimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, " +
"当前分片项: %s.当前参数: %s,"+
"当前任务名称: %s.当前任务参数: %s"
,
Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(),
shardingContext.getJobParameter()
));
}
}
再创建一个类,把3要素连接起来
package com.example.elasticjobdemo.config;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class StockJobConfig {
@Autowired
private JobRegistryCenterConfig jobRegistryCenterConfig;
@Autowired
private ZookeeperRegistryCenter regCenter;
public StockJobConfig() {
}
@Bean
public SimpleJob stockJob(){
return new StockSimpleJob();
}
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${stockJob.cron}") final String cron, @Value("${stockJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${stockJob.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));
}
/**
*@Description 任务配置类
*/
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters){
return LiteJobConfiguration
.newBuilder(
new SimpleJobConfiguration(
JobCoreConfiguration.newBuilder(
jobClass.getName(),cron,shardingTotalCount)
.shardingItemParameters(shardingItemParameters)
.build()
,jobClass.getCanonicalName()
)
)
.overwrite(true)
.build();
}
}
boot任务启动后的效果(需要先启动zk,这里我填了分片数为2)