zoukankan      html  css  js  c++  java
  • SimpleJob的使用

    转载的博客

    Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
    Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务; --摘自官网

    具体的详细介绍,大家可以去官网查阅

    这篇文章主要是整合springboot 的简单例子。通过一步一步实现,来逐步的熟悉elastic-job 这个组件,首要条件就是需要你有个运行的zookeeper

    搭建springboot项目

    这里不在赘述如果搭建,这里我只贴出我的pom文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
       <parent>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-parent</artifactId>
           <version>2.0.1.RELEASE</version>
           <relativePath/> <!-- lookup parent from repository -->
       </parent>
       <groupId>com.kevin</groupId>
       <artifactId>es-job</artifactId>
       <version>0.0.1-SNAPSHOT</version>
       <name>es-job</name>
       <description>Demo project for Spring Boot</description>
    
       <properties>
           <java.version>1.8</java.version>
           <elastic-job.version>2.1.4</elastic-job.version>
       </properties>
    
       <dependencies>
           <dependency>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-starter</artifactId>
           </dependency>
    
           <dependency>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-starter-test</artifactId>
               <scope>test</scope>
           </dependency>
    
           <!--  elastic-job dependency -->
           <dependency>
               <groupId>com.dangdang</groupId>
               <artifactId>elastic-job-lite-core</artifactId>
               <version>${elastic-job.version}</version>
           </dependency>
           <dependency>
               <groupId>com.dangdang</groupId>
               <artifactId>elastic-job-lite-spring</artifactId>
               <version>${elastic-job.version}</version>
           </dependency>
    
           <!-- spring boot dependency -->
    
           <dependency>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-starter-web</artifactId>
           </dependency>
           <dependency>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-starter-actuator</artifactId>
           </dependency>
           <dependency>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-configuration-processor</artifactId>
               <optional>true</optional>
           </dependency>
    
           <dependency>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-starter-jdbc</artifactId>
           </dependency>
           <!-- mysql驱动 -->
           <dependency>
               <groupId>mysql</groupId>
               <artifactId>mysql-connector-java</artifactId>
           </dependency>
           <dependency>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-starter-data-jpa</artifactId>
           </dependency>
       </dependencies>
    
       <build>
           <plugins>
               <plugin>
                   <groupId>org.springframework.boot</groupId>
                   <artifactId>spring-boot-maven-plugin</artifactId>
               </plugin>
           </plugins>
       </build>
    
    </project>
    
    

    主要的就是加载这2个依赖

        <!--  elastic-job dependency -->
           <dependency>
               <groupId>com.dangdang</groupId>
               <artifactId>elastic-job-lite-core</artifactId>
               <version>${elastic-job.version}</version>
           </dependency>
           <dependency>
               <groupId>com.dangdang</groupId>
               <artifactId>elastic-job-lite-spring</artifactId>
               <version>${elastic-job.version}</version>
           </dependency>
    

    整合elasticjob

    1. 编写配置application.yml
    zookeeper:
     address: 192.168.247.7:2181
     namespace: elastic-job
     connectionTimeout: 10000
     sessionTimeout: 10000
     maxRetries: 3
    
    # simplejob配置
    simpleJob:
       cron: 0/5 * * * * ?
       shardingTotalCount: 5
       shardingItemParameters: 0=java,1=php,2=erlang,3=angular,4=vue
       jobParameter: source1=public,source2=private
       failover: true
       monitorExecution: true
       monitorPort: 8889
       maxTimeDiffSeconds: -1
       jobShardingStrategyClass: com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy
    
    dataflowJob:
       cron: 0/10 * * * * ?
       shardingTotalCount: 2
       shardingItemParameters: 0=jinan,1=qingdao
    
    
    
    
    ############################################################
    #
    # 配置数据源信息
    #
    ############################################################
    spring:
     datasource: # 数据源的相关配置
         type: com.zaxxer.hikari.HikariDataSource          # 数据源类型:HikariCP
         driver-class-name: com.mysql.jdbc.Driver          # mysql驱动
         url: jdbc:mysql://localhost:3306/elasticjob?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false
         username: root
         password: root
         hikari:
           connection-timeout: 30000       # 等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException, 默认:30秒
           minimum-idle: 5                 # 最小连接数
           maximum-pool-size: 20           # 最大连接数
           auto-commit: true               # 自动提交
           idle-timeout: 600000            # 连接超时的最大时长(毫秒),超时则被释放(retired),默认:10分钟
           pool-name: DateSourceHikariCP     # 连接池名字
           max-lifetime: 1800000           # 连接的生命时长(毫秒),超时而且没被使用则被释放(retired),默认:30分钟 1800000ms
           connection-test-query: SELECT 1
    
    1. 注册中心加载到spring 容器中
    /**
    * @author: kevin
    * @Date: 2020/1/22
    */
    @Configuration
    @ConditionalOnExpression("'${zookeeper.address}'.length() > 0")
    public class RegistryCenterConfig {
    
       /**
        * 	把注册中心加载到spring 容器中
        * @return
        */
       @Bean(initMethod = "init")
       public ZookeeperRegistryCenter registryCenter(@Value("${zookeeper.address}") final String serverLists,
                                                     @Value("${zookeeper.namespace}") final String namespace,
                                                     @Value("${zookeeper.connectionTimeout}") final int connectionTimeout,
                                                     @Value("${zookeeper.sessionTimeout}") final int sessionTimeout,
                                                     @Value("${zookeeper.maxRetries}") final int maxRetries) {
           ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverLists, namespace);
           zookeeperConfiguration.setConnectionTimeoutMilliseconds(connectionTimeout);
           zookeeperConfiguration.setSessionTimeoutMilliseconds(sessionTimeout);
           zookeeperConfiguration.setMaxRetries(maxRetries);
    
           return new ZookeeperRegistryCenter(zookeeperConfiguration);
    
       }
    }
    
    1. 配置JobEventConfig事件追踪
    /**
    * @author: kevin
    * @Date: 2020/1/22
    */
    @Configuration
    public class JobEventConfig {
    
       @Autowired
       private DataSource dataSource;
    
       @Bean
       public JobEventConfiguration jobEventConfiguration() {
           return new JobEventRdbConfiguration(dataSource);
       }
    
    }
    
    1. 编写自己的job并完成配置

      4.1 定义自己的job

    /**
    * @author: kevin
    * @Date: 2020/1/22
    */
    
    @Component
    public class MySimpleJob implements SimpleJob {
       @Override
       public void execute(ShardingContext shardingContext) {
           System.out.println(shardingContext.getShardingParameter());
       }
    }
    

    ​ 4.2 编写job配置

    @Configuration
    public class MySimpleJobConfig {
       @Autowired
       private ZookeeperRegistryCenter registryCenter;
    
       @Autowired
       private JobEventConfiguration jobEventConfiguration;
    
       /**
        * 	具体真正的定时任务执行逻辑
        * @return
        */
       @Bean
       public SimpleJob simpleJob() {
           return new MySimpleJob();
       }
    
       /**
        * @param simpleJob
        * @return
        */
       @Bean(initMethod = "init")
       public JobScheduler simpleJobScheduler(final SimpleJob simpleJob,
                                              @Value("${simpleJob.cron}") final String cron,
                                              @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
                                              @Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters,
                                              @Value("${simpleJob.jobParameter}") final String jobParameter,
                                              @Value("${simpleJob.failover}") final boolean failover,
                                              @Value("${simpleJob.monitorExecution}") final boolean monitorExecution,
                                              @Value("${simpleJob.monitorPort}") final int monitorPort,
                                              @Value("${simpleJob.maxTimeDiffSeconds}") final int maxTimeDiffSeconds,
                                              @Value("${simpleJob.jobShardingStrategyClass}") final String jobShardingStrategyClass) {
    
           return new SpringJobScheduler(simpleJob,
                   registryCenter,
                   getLiteJobConfiguration(simpleJob.getClass(),
                           cron,
                           shardingTotalCount,
                           shardingItemParameters,
                           jobParameter,
                           failover,
                           monitorExecution,
                           monitorPort,
                           maxTimeDiffSeconds,
                           jobShardingStrategyClass),
                   jobEventConfiguration,
                   new SimpleJobListener());
    
       }
    
    
       private LiteJobConfiguration getLiteJobConfiguration(Class<? extends SimpleJob> jobClass, String cron,
                                                            int shardingTotalCount, String shardingItemParameters, String jobParameter, boolean failover,
                                                            boolean monitorExecution, int monitorPort, int maxTimeDiffSeconds, String jobShardingStrategyClass) {
    
           //定义作业核心配置
           JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
                   .newBuilder(jobClass.getName(), cron, shardingTotalCount)
                   .misfire(true)
                   .failover(failover)
                   .jobParameter(jobParameter)
                   .shardingItemParameters(shardingItemParameters)
                   .build();
    
           //定义SIMPLE类型配置
           SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
    
           //定义Lite作业根配置
           LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration)
                   .jobShardingStrategyClass(jobShardingStrategyClass)
                   .monitorExecution(monitorExecution)
                   .monitorPort(monitorPort)
                   .maxTimeDiffSeconds(maxTimeDiffSeconds)
                   .overwrite(true)
                   .build();
    
           return liteJobConfiguration;
       }
    

    运行

    这里直接启动后,可以看到控制台打印

    java
    php
    erlang
    angular
    vue
    
  • 相关阅读:
    线程池的扩展 beforeExecute() afterExecute() terminaerd()
    信号量semaphore 读写锁ReadWriteLock 倒计时器CountDownLatch 循环栅栏 CyclicBarrier 线程阻塞工具类LockSupport
    ReenTrantLock可重入锁 和synchronized 内部所锁的
    integer.valueof和integer.parseint
    守护线程
    notify wait sleep join yield yield
    Thread.stop()不要随意调用
    iterate使用了parallel() 反而消耗了更多的时间
    Stream 分支/合并 框架的实际例子
    区分Collection、Collector和collect Collectors类的静态工厂方法
  • 原文地址:https://www.cnblogs.com/shouyaya/p/14191596.html
Copyright © 2011-2022 走看看