zoukankan      html  css  js  c++  java
  • ElasticJob 快速上手

    1.  ElasticJob 是什么

    ElasticJob 是一个分布式调度解决方案,由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。 

    ElasticJob-Lite 定位为轻量级无中心化解决方案,使用jar的形式提供分布式任务的协调服务。

    ElasticJob 已于2020年5月28日成为 Apache ShardingSphere 的子项目。 

    ElasticJob特性:

    • 弹性调度
      • 支持任务在分布式场景下的分片和高可用
      • 能够水平扩展任务的吞吐量和执行效率
      • 任务处理能力随资源配备弹性伸缩 
    • 资源分配
      • 在适合的时间将适合的资源分配给任务并使其生效
      • 相同任务聚合至相同的执行器统一处理
      • 动态调配追加资源至新分配的任务  
    • 作业治理
      • 失效转移
      • 错过作业重新执行
      • 自诊断修复
    • 作业开放生态
      • 可扩展的作业类型统一接口
      • 丰富的作业类型库,如数据流、脚本、HTTP、文件、大数据等
      • 易于对接业务作业,能够与 Spring 依赖注入无缝整合  
    • 可视化管控端
      • 作业管控端
      • 作业执行历史数据追踪
      • 注册中心管理 

    2.  实例演示

    这里采用最新版本 3.0.0-RC1 

    1、启动zookeeper服务

    首先,下载zookeeper-3.6.0版本,解压后复制一份zoo_sample.cfg,重命名未zoo.cfg,保持默认配置即可

    注意,zookeeper-3.6.0启动以后会占用三个端口,其中包括8080哦

    2、编写定时任务业务逻辑

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.4.1</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>elasticjob-demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    
        <properties>
            <java.version>1.8</java.version>
            <elasticjob-lite.version>3.0.0-RC1</elasticjob-lite.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.apache.shardingsphere.elasticjob</groupId>
                <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
                <version>${elasticjob-lite.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-jdbc</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.shardingsphere.elasticjob</groupId>
                <artifactId>elasticjob-error-handler-dingtalk</artifactId>
                <version>${elasticjob-lite.version}</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    application.yml

    elasticjob:
      regCenter:
        serverLists: 192.168.100.15:2181
        namespace: elasticjob-demo
        baseSleepTimeMilliseconds: 2000
        maxSleepTimeMilliseconds: 4000
        maxRetries: 3
      jobs:
        firstJob:
          elasticJobClass: com.example.job.FirstJob
          cron: 0/6 * * * * ?
          shardingTotalCount: 3
          jobErrorHandlerType: DINGTALK
          props:
            dingtalk:
              webhook: https://oapi.dingtalk.com/robot/send?access_token=xxx
              secret: ASDF
              connectTimeout: 3000
              readTimeout: 5000
        secondJob:
          elasticJobClass: com.example.job.SecondJob
          cron: 0/10 * * * * ?
          shardingTotalCount: 1
          jobErrorHandlerType: DINGTALK
          props:
            dingtalk:
              webhook: https://oapi.dingtalk.com/robot/send?access_token=xxx
              secret: ASDF
              connectTimeout: 3000
              readTimeout: 5000 

    两个定时任务

    FirstJob.java

    package com.example.job;
    
    import org.apache.shardingsphere.elasticjob.api.ShardingContext;
    import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
    import org.springframework.stereotype.Component;
    
    /**
     * @author ChengJianSheng
     * @date 2021/1/13
     */
    @Component
    public class FirstJob implements SimpleJob {
        @Override
        public void execute(ShardingContext shardingContext) {
            switch (shardingContext.getShardingItem()) {
                case 0:
                    // do something by sharding item 0
                    System.out.println(0);
                    // int a = 1 / 0;
                    break;
                case 1:
                    // do something by sharding item 1
                    System.out.println(1);
                    break;
                case 2:
                    // do something by sharding item 2
                    System.out.println(2);
                    break;
                // case n: ...
            }
        }
    }
    

    SecondJob.java

    package com.example.job;
    
    import org.apache.shardingsphere.elasticjob.api.ShardingContext;
    import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
    import org.springframework.stereotype.Component;
    
    /**
     * @author ChengJianSheng
     * @date 2021/1/18
     */
    @Component
    public class SecondJob implements SimpleJob {
        @Override
        public void execute(ShardingContext shardingContext) {
            System.out.println("hello");
        }
    } 

    项目结构

    运行项目即可

    通过 ElasticJob-UI 查看任务

    https://shardingsphere.apache.org/elasticjob/current/cn/downloads/

    3.  启动报错排查

    项目启动过程中,可能会报如下错误

    org.apache.zookeeper.ClientCnxn$EndOfStreamException: Unable to read additional data from server sessionid 0x1000bdf48160002, likely server has closed socket

    org.apache.shardingsphere.elasticjob.reg.exception.RegException: org.apache.zookeeper.KeeperException$OperationTimeoutException: KeeperErrorCode = OperationTimeout

    Caused by: org.apache.zookeeper.KeeperException$OperationTimeoutException: KeeperErrorCode = OperationTimeout

    最开始,我以为是zookeeper版本的问题,后来换了版本也不行,防火墙关了也不行

    然后,我怀疑是开发环境问题,于是在本地运行zookeeper,程序连127.0.0.1:2181,居然可以了

    于是我陷入了沉思,为今之计,只剩下一个办法了,打断点调试

    找到了异常抛出的位置,如下图

    baseSleepTimeMilliseconds 表示 等待重试的间隔时间的初始值

    maxSleepTimeMilliseconds  表示 等待重试的间隔时间的最大值

    maxRetries 表示 最大重试次数

    根据代码中意思,如果在 maxSleepTimeMilliseconds * maxRetries 毫秒内还没有连接成功,则连接关闭,并抛出操作超时异常

    联想到,连接本地zookeeper可以,连开发环境zk就不行,再加上观察日志从连接开始到抛异常的时间间隔,我猜到应该是maxSleepTimeMilliseconds设置太短了

    于是,application.yml配置文件中将maxSleepTimeMilliseconds设置为4000,baseSleepTimeMilliseconds设置为2000

    然后好使

    回想刚开始报的那些错,其实根本就还没有连上zookeeper

    4.  作业分片

    ElasticJob 中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片。 随着服务器的增加或宕机,ElasticJob 会近乎实时的感知服务器数量的变更,从而重新为分布式的任务服务器分配更加合理的任务分片项,使得任务可以随着资源的增加而提升效率。

    任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

    也就是说,分片是为了在分布式环境下高效合理利用任务服务器资源的。简单地来讲,一个定时任务,我们运行多台服务器,这意味着有多个实例在执行同一项任务,分片就是为了告诉这些实例各自该处理那些数据,最大限度的降低数据重复处理的问题,同时加快任务处理速度。每个任务实例该处理哪些数据,是根据分片项来的,在任务代码层面,就可以根据分片项来进行逻辑判断。

    举例说明,如果作业分为 4 片,用两台服务器执行,则每个服务器分到 2 片,分别负责作业的 50% 的负载

    分片项

    ElasticJob 并不直接提供数据处理的功能,而是将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与业务的对应关系。 分片项为数字,始于 0 而终于分片总数减 1。

    个性化分片参数

    个性化参数可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。 

    合理使用个性化参数可以让代码更可读。例如,如果配置为 0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。

    分片策略

    平均分片策略

    根据分片项平均分片。如果作业服务器数量与分片总数无法整除,多余的分片将会顺序的分配至每一个作业服务器。

    举例说明:

    • 如果 3 台作业服务器且分片总数为9, 则分片结果为:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]
    • 如果 3 台作业服务器且分片总数为8, 则分片结果为:1=[0,1,6], 2=[2,3,7], 3=[4,5]
    • 如果 3 台作业服务器且分片总数为10,则分片结果为:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]

    奇偶分片策略 

    根据作业名称哈希值的奇偶数决定按照作业服务器 IP 升序或是降序的方式分片。

    如果作业名称哈希值是偶数,则按照 IP 地址进行升序分片; 如果作业名称哈希值是奇数,则按照 IP 地址进行降序分片。 可用于让服务器负载在多个作业共同运行时分配的更加均匀。

    举例说明:

    • 如果 3 台作业服务器,分片总数为2且作业名称的哈希值为偶数,则分片结果为:1 = [0], 2 = [1], 3 = []
    • 如果 3 台作业服务器,分片总数为2且作业名称的哈希值为奇数,则分片结果为:3 = [0], 2 = [1], 1 = [] 

    轮询分片策略

    根据作业名称轮询分片。

    5.  官方文档

    https://shardingsphere.apache.org/elasticjob/current/cn/features/elastic/

    https://shardingsphere.apache.org/elasticjob/current/cn/user-manual/elasticjob-lite/ 

    https://shardingsphere.apache.org/elasticjob/current/cn/user-manual/elasticjob-lite/configuration/ 

    https://shardingsphere.apache.org/elasticjob/current/cn/dev-manual/ 

  • 相关阅读:
    软件创意——汽车语音安全系统
    对系统管理岗位的理解。
    求二维数组最大子数组的和。郭林林&胡潇丹
    电梯调度 结对项目开发(郭林林&胡潇丹)
    电梯调度 结对项目开发
    电梯调度 结对项目开发
    电梯调度的设计与实现过程(李帅 张硕)
    敏捷软件方法综述
    二维数组的子数组和最大问题(李帅 张硕)
    求数组子数组和的最大值 (线性算法)
  • 原文地址:https://www.cnblogs.com/cjsblog/p/14295150.html
Copyright © 2011-2022 走看看