zoukankan      html  css  js  c++  java
  • elastic-job-lite

    1. 为什么不用quartz

      通过定时任务来进行计算,如果数量不多,可以轻易的用quartz来完成,如果用户量特别大,可能短时间内处理不完需要处理的数据。另外如果我们将job直接放在我们的webapp里,webapp通常是多节点部署的,这样,项目需要每隔一段时间执行某个定时任务,但是由于同时部署在多台机器上,因此可能会出现任务被执行多次,造成重复数据的情况,我们的job也就是多节点,造成了多个job同时执行,导致job重复执行,为了避免这种情况,我们可能多job的节点进行加锁,保证只有一个节点能执行,或者将job从webapp里剥离出来,独自部署一个节点。Elastic job是当当网架构师张亮,曹昊和江树建基于Zookepper、Quartz开发并开源的一个Java分布式定时任务,解决了Quartz不支持分布式的弊端。Elastic job主要的功能有支持弹性扩容,通过Zookepper集中管理和监控job,支持失效转移等,这些都是Quartz等其他定时任务无法比拟的。

    2. 原理

      elastic底层的任务调度还是使用quartz,通过zookeeper来动态给job节点分片,使用elastic-job开发的作业都是客户的,假如我们需要使用3台机器跑job,我们将任务分成3片,框架通过zk的协调,最终会让3台机器分别分配0,1,2的任务片,比如server0-->0,server1-->1,server2-->2,当server0执行时,可以只查询id%3==0的用户,server1执行时,只查询id%3==1的用户,server2执行时,只查询id%3==2的用户。当分片数为1时,在同一个zookepper和jobname情况下,多台机器部署了Elastic job时,只有拿到shardingContext.getShardingItem()为0的机器得以执行,其他的机器不执行当分片数大于1时,假如有3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。此时每台服务器可根据拿到的shardingItem值进行相应的处理,

    举例场景:

    假如job处理数据库中的数据业务,方法为:A服务器处理数据库中Id以0,1,2结尾的数据,B处理数据库中Id以3,4,5结尾的数据,C处理器处理6,7,8,9结尾的数据,合计处理0-9为全部数据

    如果服务器C崩溃,Elastic Job自动进行进行失效转移,将C服务器的分片转移到A和B服务器上,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9

    此时,A服务器处理数据库中Id以0,1,2,3,4结尾的数据,B处理数据库中Id以5,6,7,8,9结尾的数据,合计处理0-9为全部数据.

    在上述基础上,如果我们增加server3,此时,server3分不到任务分片,因为任务分片只有3片,已经分完了,没有分到任务分片的程序不执行。如果server2挂了,那么server2的任务分片会分给server3,server3有了分片后就会执行。如果server3也挂了,框架会自动将server3的分片随机分给server0或server1,这种特性称之为弹性扩容,也就是elastic-job的由来。

      elastic-job不支持单机多实例,通过zk的协调分片是以ip为单元的,如果通过单机多实例来试验,结果会导致分片和预期不一致,可以通过虚拟机模拟多台机器。

    3. 作业类型

      elastic-job 提供了三种类型的作业:simple,dataflow,script。script类型作业为脚本类型作业,支持shell,python等类型脚本。simple类型需要实现SimpleJob接口,未经过任何封装,和quartz原生接口相似。dataflow类型用于处理数据流,需要实现DafaflowJob接口,该接口提供了两个方法可以覆盖,分别用于抓取fetchData和处理processData数据。

    4. 代码演示

    1. 依赖

    <dependency>
    			<groupId>com.dangdang</groupId>
    			<artifactId>elastic-job-lite-spring</artifactId>
    			<version>2.1.5</version>
    		</dependency>
    

     

    2.编写job

    public class OrderStatisticsJob implements SimpleJob {
    
    	private static final Logger log = LoggerFactory.getLogger(OrderStatisticsJob.class);
    
    	OrdersService ordersSerivce = null;
    
    	/** 读取配置(配置文件以后上分布式配置动态维护) **/
    	private void readConfig() {
    		ordersSerivce = (OrdersService) ApplicationHelp.getBean("ordersService");
    	}
    
    	synchronized public void start(int sharding) {
    
    	}
    
    	@Override
    	public void execute(ShardingContext shardingContext) {
    		// TODO Auto-generated method stub
    		log.info("shardingContext:{}", shardingContext.getShardingItem());
    		readConfig();
    		start(1);
    	}
    }
    
    public class MyDataFlowJob implements DataflowJob<User> {
    
        @Override
        public List<User> fetchData(ShardingContext shardingContext) {
            List<User> users = null;//查询users from db
            return users;
        }
     
        @Override
        public void processData(ShardingContext shardingContext, List<User> data) {
            for (User user: data) {
                user.setStatus(1);
                //update user
            }
        }
    }

    3. Spring配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
           xmlns:job="http://www.dangdang.com/schema/ddframe/job"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.dangdang.com/schema/ddframe/reg
                            http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                            http://www.dangdang.com/schema/ddframe/job
                            http://www.dangdang.com/schema/ddframe/job/job.xsd">
        <!--配置作业注册中心 -->
        <reg:zookeeper id="regCenter" server-lists="localhost:2181" namespace="dd-job"
                       base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
     
        <!-- 配置作业-->
        <job:simple id="orderStatisticsJob" class="com.beta.cb.mall.task.job.OrderStatisticsJob" registry-center-ref="regCenter"
                    sharding-total-count="2" cron="0/2 * * * * ?" />
       <job:dataflow id="myDataFlowJob" class="com.fanfan.sample001.MyDataFlowJob" registry-center-ref="regCenter"
                  sharding-total-count="2" cron="0 0 02 * * ?" streaming-process="true" overwrite="true" /> </beans>
  • 相关阅读:
    ***25 k个一组反转链表
    24 交换链表中相连的节点
    19 删除链表倒数第N个节点
    2 两数相加
    23 合并K个有序链表
    21 合并两个有序链表
    114 判断一个链表是否存在环并返回环起点
    141 链表是否存在环
    160 寻找链表交点
    92 指定区间链表逆置
  • 原文地址:https://www.cnblogs.com/yangfei-beijing/p/9294074.html
Copyright © 2011-2022 走看看