当当的Elastic-Job开源出了两种分布式Job的解决方案:
1. elastic-job-lite,这是一个无中心节点的调度;
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
2. elastic-job-cloud是一个有中心节点的分布式调度开源工具
Elastic-Job-Cloud使用Mesos + Docker(TBD)的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。
Elastic-Job-Lite和Elastic-Job-Cloud提供同一套API开发作业,它们的核心都是elastic-job-core,开发者仅需一次开发,即可根据需要以Lite或Cloud的方式部署
Elastic-Job-Lite实现分布式job的原理:
要在分布式集群环境下去安全的执行一个调度任务,我们常规的想法就是保证在集群环境下,只有集群中的一台机器能够获取执行任务的权限。
但是Elastic-Job在实现分布式Job时没有按照这个思路去设计,而是将集群中所有的机器都利用起来,去多进程多线程的执行我们的作业任务。
那么它是如何做到分布式作业任务的不重不漏的呢?
Elastic-Job提出了一个数据分片(shardingItem)的概念。
举个例子:
假如我现在部署了3台机器的一个集群,并且按照Elastic-Job的要求设置了分片数量shardingCount=10,则按照Elastic-Job基于平均分配算法的分片策略得到的分片结果为:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8] (参见:com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy.java)
那么在每一台机器上去执行时,我只要取到本机的数据分片对应的数据来处理,就能够实现数据的分片处理。并且集群中所有机器处理的数据总和即为全量数据。
这样就能把集群中所有的机器都调动起来去同时并行的处理任务,效率比较高。
更有甚者,如果本机的数据分片分到了多个分片(即一个JVM进程分到了多个分片),则Elastic-Job会为每一个分片去启动一个线程来执行分片任务。
这样,一台机器就会开启多个线程就并行的处理作业任务,效率进一步的提高。
(参见:com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor#process(ShardingContexts shardingContexts,
JobExecutionEvent.ExecutionSource executionSource))
特殊场景:分片数为1时
public class MyElasticJob implements SimpleJob { public void execute(ShardingContext shardingContext) { // 1.当分片数为1时,在同一个zookepper和jobname情况下,多台机器部署了Elastic // job时,只有拿到shardingContext.getShardingItem()为0的机器得以执行,其他的机器不执行 // 2.当分片数大于1时,假如有3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。此时每台服务器可根据拿到的shardingItem值进行相应的处理, // 举例场景: // 假如job处理数据库中的数据业务,方法为:A服务器处理数据库中Id以0,1,2结尾的数据,B处理数据库中Id以3,4,5结尾的数据,C处理器处理6,7,8,9结尾的数据,合计处理0-9为全部数据 // 如果服务器C崩溃,Elastic // Job自动进行进行失效转移,将C服务器的分片转移到A和B服务器上,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9 // 此时,A服务器处理数据库中Id以0,1,2,3,4结尾的数据,B处理数据库中Id以5,6,7,8,9结尾的数据,合计处理0-9为全部数据. processByEndId(shardingContext.getShardingItem()); } private void processByEndId(int shardingContext) { // TODO: 处理数据Id结尾为 shardingContext的数据 } }
主要功能
a) 分布式:重写Quartz基于数据库的分布式功能,改用Zookeeper实现注册中心。
b) 并行调度:采用任务分片方式实现。将一个任务拆分为n个独立的任务项,由分布式的服务器并行执行各自分配到的分片项。
c) 弹性扩容缩容:将任务拆分为n个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,elastic-job将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片。
d) 集中管理:采用基于Zookeeper的注册中心,集中管理和协调分布式作业的状态,分配和监听。外部系统可直接根据Zookeeper的数据管理和监控elastic-job。
e) 定制化流程型任务:作业可分为简单和数据流处理两种模式,数据流又分为高吞吐处理模式和顺序性处理模式,其中高吞吐处理模式可以开启足够多的线程快速的处理数据,而顺序性处理模式将每个分片项分配到一个独立线程,用于保证同一分片的顺序性,这点类似于kafka的分区顺序性。
2. 其他功能
a) 失效转移:弹性扩容缩容在下次作业运行前重分片,但本次作业执行的过程中,下线的服务器所分配的作业将不会重新被分配。失效转移功能可以在本次作业运行中用空闲服务器抓取孤儿作业分片执行。同样失效转移功能也会牺牲部分性能。
b) Spring命名空间支持:elastic-job可以不依赖于spring直接运行,但是也提供了自定义的命名空间方便与spring集成。
c) 运维平台:提供web控制台用于管理作业。
3. 非功能需求
a) 稳定性:在服务器无波动的情况下,并不会重新分片;即使服务器有波动,下次分片的结果也会根据服务器IP和作业名称哈希值算出稳定的分片顺序,尽量不做大的变动。
b) 高性能:同一服务器的批量数据处理采用自动切割并多线程并行处理。
c) 灵活性:所有在功能和性能之间的权衡,都可通过配置开启/关闭。如:elastic-job会将作业运行状态的必要信息更新到注册中心。如果作业执行频度很高,会造成大量Zookeeper写操作,而分布式Zookeeper同步数据可能引起网络风暴。因此为了考虑性能问题,可以牺牲一些功能,而换取性能的提升。
d) 幂等性:elastic-job可牺牲部分性能用以保证同一分片项不会同时在两个服务器上运行。
e) 容错性:作业服务器和Zookeeper断开连接则立即停止作业运行,用于防止分片已经重新分配,而脑裂的服务器仍在继续执行,导致重复执行。
以一个官方的例子(com.dangdang.ddframe.job.example.JavaMain.java)来分析Elastic-Job的执行过程:
准备:
由于Elastic-Job是基于Quratz来封装的,我们先看一下Quartz是如何来提交一个作业任务的:
public class MyJob implements Job { @Override public void execute(JobExecutionContext arg0) throws JobExecutionException { System.out.println(DateTime.now() + "-------执行MyJob"); } public static void main(String[] args) { // 通过schedulerFactory获取一个调度器 SchedulerFactory schedulerfactory = new StdSchedulerFactory(); Scheduler scheduler = null; try { // 通过schedulerFactory获取一个调度器 scheduler = schedulerfactory.getScheduler(); // 创建jobDetail实例,绑定Job实现类, 指明job的名称,所在组的名称,以及绑定job类 JobDetail job = JobBuilder.newJob(MyJob.class).withIdentity("job1", "jgroup1").build(); // 定义调度触发规则, 使用simpleTrigger规则(cornTrigger) :从第2s开始,每5s执行一次。(http://cron.qqe2.com/ 生成corn) Trigger trigger = TriggerBuilder.newTrigger().withIdentity("simpleTrigger", "triggerGroup") .withSchedule(CronScheduleBuilder.cronSchedule("2/5 * * * * ? *")).startNow().build(); // 把作业和触发器注册到任务调度中 scheduler.scheduleJob(job, trigger); // 启动调度 System.out.println(DateTime.now() + "------job start----"); scheduler.start(); } catch (Exception e) { e.printStackTrace(); } } }
以此为基础,我们来看Elastic-Job-Lite是如何对它进行封装的:
0. 入口 (我们要执行的任务是JavaSimpleJob.java)
// 将要执行的作业任务放到 SimpleJobConfiguration 中
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName());
// 启动
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build(), jobEventConfig).init();
作业任务 job 是通过 LiteJobConfiguration 来定义的,里面会定义是哪一种类型的job(JobTypeConfiguration)。
现在支持3种类型的job:SimpleJobConfiguration、DataflowJobConfiguration、ScriptJobConfiguration
1. JobScheduler#init()
1.1 com.dangdang.ddframe.job.lite.api.JobScheduler 实例化时,会构造JobFacade。
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
JobFacade 是用来操作分片上下文的(后面会用到)
1.2 创建org.quartz.JobDetail
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
.....
scheduler.start(); // 启动Quartz
2. Quartz执行作业任务
到了corn表达式设定的执行时间,Quartz会去执行LiteJob#execute()
2.1 LiteJob#execute(final JobExecutionContext context)
// 根据作业任务的类型,拿到作业任务对应的JobExecutor,然后去执行
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
2.2 AbstractElasticJobExecutor#execute()
/** * 执行作业. */ public final void execute() { try { jobFacade.checkJobExecutionEnvironment(); } catch (final JobExecutionEnvironmentException cause) { jobExceptionHandler.handleException(jobName, cause); }
// 重要:获取当前作业服务器的分片上下文。如果发现没有leader,就选出一个leader,并进行分片 ShardingContexts shardingContexts = jobFacade.getShardingContexts(); if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName)); } if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) { if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, shardingContexts.getShardingItemParameters().keySet())); } return; } try { jobFacade.beforeJobExecuted(shardingContexts); // CHECKSTYLE:OFF } catch (final Throwable cause) { // CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER); while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE); } jobFacade.failoverIfNecessary(); try { jobFacade.afterJobExecuted(shardingContexts); // CHECKSTYLE:OFF } catch (final Throwable cause) { // CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } }
附:
分片服务:
com.dangdang.ddframe.job.lite.internal.sharding.ShardingService
ShardingService#getShardingItems(String jobInstanceId) -- 获取本机的shardingItems
默认分片策略:
com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy
主节点选举:
com.dangdang.ddframe.job.lite.internal.election.LeaderService
作业实例的唯一标记:
com.dangdang.ddframe.job.lite.api.strategy.JobInstance
jobInstanceId = IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
节点信息写入:
PersistShardingInfoTransactionExecutionCallback#execute()
作业节点数据:
com.dangdang.ddframe.job.lite.internal.storage.JobNodeStorage
注意 ShardingContexts 与 ShardingContext 的区别:
ShardingContexts是本机的分片上下文(里面保存了本机分配到的shardingItemParameters的集合)
/** * 分配于本作业实例的分片项和参数的Map. */ private final Map<Integer, String> shardingItemParameters;
ShardingContext是当前线程的分片上下文(里面只保存有一个shardingItem)
/** * 分配于本作业实例的分片项. */ private final int shardingItem;
Elastic-Job特性:
1. Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。
注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。
2. 为了保证作业的在分布式场景下的一致性,一旦作业与注册中心无法通信,运行中的作业会立刻停止执行,但作业的进程不会退出,这样做的目的是为了防止作业重分片时,将与注册中心失去联系的节点执行的分片分配给另外节点,导致同一分片在两个节点中同时执行。
3. Elastic-Job-Lite采用无中心化设计,若每个客户端的配置不一致,不做控制的话,最后一个启动的客户端配置将会成为注册中心的最终配置。
4. 弹性扩容缩容:将任务拆分为n个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,elastic-job将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片
5. 定制化流程型任务:作业可分为简单和数据流处理两种模式,数据流又分为高吞吐处理模式和顺序性处理模式,其中高吞吐处理模式可以开启足够多的线程快速的处理数据,而顺序性处理模式将每个分片项分配到一个独立线程,用于保证同一分片的顺序性,这点类似于kafka的分区顺序性。
6. 失败转移:http://www.cnblogs.com/nevermorewang/p/5744324.html
LTS、xxl-job、Elastic-Job对比
Elastic-Job官方文档及相关文章:
http://dangdangdotcom.github.io/elastic-job/elastic-job-lite/00-overview/
https://my.oschina.net/u/719192/blog/506062?nocache=1497421466555
http://www.infoq.com/cn/articles/dangdang-distributed-work-framework-elastic-job/
开源方案对比: