zoukankan      html  css  js  c++  java
  • 分布式任务elastic-job

    # elastic-job简介
    目前Elastic job的最新版本已经由原来的elastic-job-core分离除了两个项目,分别为Elastic-Job-Lite和Elastic-Job-Cloud。Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成,Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。 Elastic-Job-Cloud使用Mesos + Docker(TBD)的解决方案,额外提供资源治理、应用分发以及进程隔离等服务,Elastic-Job-Lite和Elastic-Job-Cloud提供同一套API开发作业,开发者仅需一次开发,即可根据需要以Lite或Cloud的方式部署 .[转自官网:https://github.com/dangdangdotcom/elastic-job/blob/master/README_cn.md]

    一般的技术quartz、spring task、java.util.Timer,这几种如果在单一机器上跑其实问题不大,但是如果一旦应用于集群环境做分布式部署,就会带来一个致命的问题,那就是重复执行,当然解决方案有,但是必须依赖数据库,将任务执行状态持久化下来。所以当当就把quartz和zookeeper结合起来达到分布式调度,并且添加其他功能,形成了elastic-job。

    elastic-job主要的设计理念是无中心化的分布式定时调度框架,思路来源于Quartz的基于数据库的高可用方案。但数据库没有分布式协调功能,所以在高可用方案的基础上增加了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源。


    # 功能

    1. 主要功能

    a) 分布式:重写Quartz基于数据库的分布式功能,改用Zookeeper实现注册中心。

    b) 并行调度:采用任务分片方式实现。将一个任务拆分为n个独立的任务项,由分布式的服务器并行执行各自分配到的分片项。

    c) 弹性扩容缩容:将任务拆分为n个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,elastic-job将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片。

    d) 集中管理:采用基于Zookeeper的注册中心,集中管理和协调分布式作业的状态,分配和监听。外部系统可直接根据Zookeeper的数据管理和监控elastic-job。

    e) 定制化流程型任务:作业可分为简单和数据流处理两种模式,数据流又分为高吞吐处理模式和顺序性处理模式,其中高吞吐处理模式可以开启足够多的线程快速的处理数据,而顺序性处理模式将每个分片项分配到一个独立线程,用于保证同一分片的顺序性,这点类似于kafka的分区顺序性。

    2. 其他功能

    a) 失效转移:弹性扩容缩容在下次作业运行前重分片,但本次作业执行的过程中,下线的服务器所分配的作业将不会重新被分配。失效转移功能可以在本次作业运行中用空闲服务器抓取孤儿作业分片执行。同样失效转移功能也会牺牲部分性能。

    b) Spring命名空间支持:elastic-job可以不依赖于spring直接运行,但是也提供了自定义的命名空间方便与spring集成。

    c) 运维平台:提供web控制台用于管理作业。

    下载源码
    https://github.com/elasticjob


    #spring boot quick start
    Add maven dependency

    <!-- https://mvnrepository.com/artifact/com.dangdang/elastic-job-lite-core --> 
    <!-- import elastic-job lite core -->
    <dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.6</version>
    </dependency>
    
    <!-- import other module if need -->
    <dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.6</version>
    </dependency>

    RegCenter configuration

    @Configuration
    public class RegistryCenterConfig {
    
        @Bean(initMethod = "init")
        public ZookeeperRegistryCenter regCenter() {
            String serverList = ConfigOne.getProperty(ConfigConstants.JOB_REG_ZK);
            String namespace = ConfigOne.getProperty(ConfigConstants.JOB_REG_NS);
            return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
        }
    
    }


    Job configuration

    @Configuration
    public class SimpleJobConfig {
    
        //private String defaultCron = "0/5 * * * * ?";
        //默认每天0点10分开始统计
        @Value("${job.default.cron}")
        private String defaultCron = "0 10 0 * * ?";
        @Value("${job.default.shardingTotalCount}")
        private int defaultShardTotal = 1;
        @Value("${job.default.shardingItemParameters}")
        private String defaultShardPrams = "";
    
        @Resource
        private ZookeeperRegistryCenter regCenter;
    
        /*@Resource
        private JobEventConfiguration jobEventConfiguration;*/
    
        @Resource
        private StatsDeviceJob statsDeviceJob;
    
        @Resource
        private StatsUserJob statsUserJob;
    
        @Resource
        private StatsDeviceFaultJob statsDeviceFaultJob;
    
        @Resource
        private StatsDeviceAlarmJob statsDeviceAlarmJob;
    
        @Resource
        private StatsRuleJob statsRuleJob;
    
        @Resource
        private StatsYumairJob statsYumairJob;
    
    
        //@Bean(initMethod = "init")
        @PostConstruct
        public void init() {
            //statsDeviceJob
            new SpringJobScheduler(statsDeviceJob, regCenter,
                    getLiteJobConfiguration(statsDeviceJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init();
            //statsUserJob
            new SpringJobScheduler(statsUserJob, regCenter,
                    getLiteJobConfiguration(statsUserJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init();
            //statsDeviceFaultJob
            new SpringJobScheduler(statsDeviceFaultJob, regCenter,
                    getLiteJobConfiguration(statsDeviceFaultJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init();
            //statsDeviceAlarmJob
            new SpringJobScheduler(statsDeviceAlarmJob, regCenter,
                    getLiteJobConfiguration(statsDeviceAlarmJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init();
            //statsRuleJob
            new SpringJobScheduler(statsRuleJob, regCenter,
                    getLiteJobConfiguration(statsRuleJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init();
            //statsYumairJob 每小时5分运行一次
            new SpringJobScheduler(statsYumairJob, regCenter,
                    getLiteJobConfiguration(statsYumairJob.getClass(), "0 5 * * * ?", defaultShardTotal, defaultShardPrams)).init();
    
        }
    
    
        private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int defaultShardTotal, final String defaultShardPrams) {
            return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
                    jobClass.getName(), cron, defaultShardTotal).shardingItemParameters(defaultShardPrams).build(), jobClass.getCanonicalName())).overwrite(true).build();
        }
    }

    Job development

    @Service
    public class StatsDeviceJob implements SimpleJob {
        private static Logger logger = LoggerFactory.getLogger(StatsDeviceJob.class);
    
        @Autowired
        private DeviceStatsFeignClient deviceStatsFeignClient;
        @Autowired
        private DeviceDataFeignClient deviceDataFeignClient;
        @Autowired
        private IDataStatsFacade dataStatsFacade;
    
        /**
         * 1.当分片数为1时,在同一个zookepper和jobname情况下,多台机器部署了Elastic job时,
         * 只有拿到shardingContext.getShardingItem()为0的机器得以执行,其他的机器不执行
         * 2.当分片数大于1时,假如有3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。
         * 此时每台服务器可根据拿到的shardingItem值进行相应的处理
         * 目前job分片数全部置为1,即不使用分片
         * @param shardingContext
         */
        @Override
        public void execute(ShardingContext shardingContext) {
            logger.info(String.format("ShardingItem: %s | Thread: %s | %s",
                    shardingContext.getShardingItem(), Thread.currentThread().getId(), "SIMPLE"));
    
            deviceStatsJob();
        }
    
    
        public void deviceStatsJob() {
            //TODO 
        }
    
    
    
     }

    # 运维平台和RESTFul API部署(可选)
    1. 下载或者克隆elastic-job源码
    地址:https://github.com/dangdangdotcom/elastic-job


    2. maven编译安装
    进入到elastic-job目录,按住Shift+鼠标右键,选择“在此处打开命令窗口(W)”,执行如下命令:

    ```
    mvn clean install -Dmaven.test.skip=true
    ```

    ```
    [INFO] ------------------------------------------------------------------------
    [INFO] Reactor Summary:
    [INFO]
    [INFO] elastic-job ....................................... SUCCESS [23.570s]
    [INFO] elastic-job-common ................................ SUCCESS [0.053s]
    [INFO] elastic-job-common-core ........................... SUCCESS [27.108s]
    [INFO] elastic-job-common-restful ........................ SUCCESS [29.844s]
    [INFO] elastic-job-lite .................................. SUCCESS [0.078s]
    [INFO] elastic-job-lite-core ............................. SUCCESS [7.249s]
    [INFO] elastic-job-lite-lifecycle ........................ SUCCESS [3.766s]
    [INFO] elastic-job-lite-spring ........................... SUCCESS [1:08:42.613s
    ]
    [INFO] elastic-job-lite-console .......................... SUCCESS [2:31.964s]
    [INFO] elastic-job-cloud ................................. SUCCESS [0.031s]
    [INFO] elastic-job-cloud-executor ........................ SUCCESS [3.728s]
    [INFO] elastic-job-cloud-scheduler ....................... SUCCESS [33.803s]
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 1:13:24.136s
    [INFO] Finished at: Sat Dec 02 18:33:33 CST 2017
    [INFO] Final Memory: 55M/272M
    [INFO] ------------------------------------------------------------------------
    ```


    3. 解压上一步打好的包

    路径:elastic-jobelastic-job-liteelastic-job-lite-console argetelastic-job-lite-console-2.1.6.tar.gz
    elastic-job-lite-console-2.1.6in目录下是启动脚本

    windows环境用:start.bat

    linux环境用:start.sh

    elastic-job-lite-console-2.1.6conf目录下是配置文件auth.properties,配置的用户名和密码

    4. 解压缩elastic-job-lite-console-${version}.tar.gz并执行binstart.sh。

    5. 打开浏览器访问http://localhost:8899/即可访问控制台。8899为默认端口号,可通过启动脚本输入-p自定义端口号。

    6. 访问RESTFul API方法同控制台。

  • 相关阅读:
    POST、GET请求中文参数乱码问题
    表的复制——sql语句
    mysql之limit m,n
    nullpointerxception——处理思路
    public-private-protected-默认缺省 的区别
    final关键字的作用
    使用注解来构造IOC容器
    成功的背后!(给所有IT人)
    jQuery对象复制
    键盘录入, if语句
  • 原文地址:https://www.cnblogs.com/iiot/p/7979087.html
Copyright © 2011-2022 走看看