联童科技是一家智能化母婴童产业平台,从事母婴童行业以及互联网技术多年,拥有丰富的母婴门店运营和系统开发经验,在会员经营和商品经营方面,能够围绕会员需求,深入场景,更贴近合作伙伴和消费者,提供最优服务产品,公司致力于以技术来驱动母婴童产业的发展,公司也希望借助于大数据为客户提供更多智能数据分析和决策分析,大数据是公司重点发展的一部分,公司从成立初期起就搭建了大数据团队,有了大数据团队后,大数据调度平台的构建自然是最基础也是最重要的环节。
一、为什么选择incubator-dolphinscheduler
1、incubator-dolphinscheduler是一个由国内公司发起的开源项目,中国本土社区成员非常活跃,更加容易去进行社区沟通,同时联童也希望能加入到这个社区中,一起把这个由本土成员为主成立的社区做的更好。
2、incubator-dolphinscheduler 能够支撑非常多的应用场景
- 以DAG图的方式将Task按照任务的依赖关系关联起来,可实时可视化监控任务的运行状态
- 支持丰富的任务类型:Shell、MR、Spark、SQL(mysql、postgresql、hive、sparksql),Python,Sub_Process、Procedure,flink,datax,sqoop,http等
- 支持工作流定时调度、依赖调度、手动调度、手动暂停/停止/恢复,同时支持失败重试/告警、从指定节点恢复失败、Kill任务等操作
- 支持工作流优先级、任务优先级及任务的故障转移及任务超时告警/失败
- 支持工作流全局参数及节点自定义参数设置
- 支持资源文件的在线上传/下载,管理等,支持在线文件创建、编辑
- 支持任务日志在线查看及滚动、在线下载日志等
- 实现集群HA,通过Zookeeper实现Master集群和Worker集群去中心化
- 支持对
Master/Worker
cpu load,memory,cpu在线查看 - 支持工作流运行历史树形/甘特图展示、支持任务状态统计、流程状态统计
- 支持补数
- 支持多租户
- 支持国际化
其中DAG图 借鉴自spark ,在dolphinscheduler 一个工作流可以对应多个工作任务,每一个工作任务对应一个DAG中的节点。
3、incubator-dolphinscheduler在保证了高并发和高可用的设计时,架构思路也相对简单,技术架构中没有引入非常多的复杂技术组件,降低了学习和维护的成本。
备注:此架构图摘自社区官方网站
incubator-dolphinscheduler在设计时,除了zookeeper外,没有引入太多复杂的技术组件。整个架构以zookeeper 作为集群管理,采用去中心化思想进行设计。
二、incubator-dolphinscheduler功能的不足
1、无法支持串行调度策略
incubator-dolphinscheduler 在一开始设计时,只支持并行调度,不支持串行调度,而在联童中,大部分场景都是需要串行运行的,也就是每一个工作流任务都只能有一个实例在运行,同一个工作流任务中必须要等前一个实例执行结束,下一个实例才能开始执行,这种场景大多出现在准实时任务中。
2、任务依赖不够强大,只能支持被动等待依赖执行成功,无法主动触发下游工作流实例运行
如下图所示,只能支持在创建任务时,被动去等待依赖执行成功,无法在当前任务执行成功后,主动去触发别的工作流任务执行。
3、部分模块中用户体验不足,并且在数据量大时,部分模块数据查询性能较慢
4、缺少比较完备的监控体系
在 incubator-dolphinscheduler 只提供了一些简单的监控,当有多大几千个任务在运行时,很难做到完备监控,更是缺少对每一个任务运行的性能分析。
5、缺少任务被依赖查看
比如一个工作流或者工作流中的一个节点被哪些任务依赖了,被多少任务依赖了,完全没有办法查看。
三、我们对于incubator-dolphinscheduler的功能升级开发
1、增加串行调度的支持
如下图所示,我们在原有并行执行的基础上,增加了串行执行方式。
在串行执行时,我们还增加了串行执行的队列功能,每一任务都可以指定队列的长度大小。
2、增加主动触发下游工作流实例运行
如下图所示,我们在原有并行执行的基础上,增加主动触发下游一个或者多个工作流实例运行。
运行后效果如下:
3、一些较大的Bug修复
联童在使用 incubator-dolphinscheduler时,同样也踩过不少的坑,这里我们举其中一个例子,比如在内部使用时,同事反馈最多的问题就是调度任务的日志刷新不及时,有时候很久才能刷新出日志。后来经过源码分析,发现是源码中存在了一些不太健壮的处理导致了这个问题。
incubator-dolphinscheduler 中AbstractCommandExecutor.java 部分源码
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.server.worker.task; import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE; import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_KILL; import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.slf4j.Logger; /** * abstract command executor */ public abstract class AbstractCommandExecutor { /** * rules for extracting application ID */ protected static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX); protected StringBuilder varPool = new StringBuilder(); /** * process */ private Process process; /** * log handler */ protected Consumer<List<String>> logHandler; /** * logger */ protected Logger logger; /** * log list */ protected final List<String> logBuffer; /** * taskExecutionContext */ protected TaskExecutionContext taskExecutionContext; /** * taskExecutionContextCacheManager */ private TaskExecutionContextCacheManager taskExecutionContextCacheManager; public AbstractCommandExecutor(Consumer<List<String>> logHandler, TaskExecutionContext taskExecutionContext, Logger logger) { this.logHandler = logHandler; this.taskExecutionContext = taskExecutionContext; this.logger = logger; this.logBuffer = Collections.synchronizedList(new ArrayList<>()); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); } /** * build process * * @param commandFile command file * @throws IOException IO Exception */ private void buildProcess(String commandFile) throws IOException { // setting up user to run commands List<String> command = new LinkedList<>(); //init process builder ProcessBuilder processBuilder = new ProcessBuilder(); // setting up a working directory processBuilder.directory(new File(taskExecutionContext.getExecutePath())); // merge error information to standard output stream processBuilder.redirectErrorStream(true); // setting up user to run commands command.add("sudo"); command.add("-u"); command.add(taskExecutionContext.getTenantCode()); command.add(commandInterpreter()); command.addAll(commandOptions()); command.add(commandFile); // setting commands processBuilder.command(command); process = processBuilder.start(); // print command printCommand(command); } .......... /** * get the standard output of the process * * @param process process */ private void parseProcessOutput(Process process) { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId()); ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); parseProcessOutputExecutorService.submit(new Runnable() { @Override public void run() { BufferedReader inReader = null; try { inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); String line; long lastFlushTime = System.currentTimeMillis(); while ((line = inReader.readLine()) != null) { if (line.startsWith("${setValue(")) { varPool.append(line.substring("${setValue(".length(), line.length() - 2)); varPool.append("$VarPool$"); } else { logBuffer.add(line); lastFlushTime = flush(lastFlushTime); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { clear(); close(inReader); } } }); parseProcessOutputExecutorService.shutdown(); } ................ /** * when log buffer siz or flush time reach condition , then flush * * @param lastFlushTime last flush time * @return last flush time */ private long flush(long lastFlushTime) { long now = System.currentTimeMillis(); /** * when log buffer siz or flush time reach condition , then flush */ if (logBuffer.size() >= Constants.DEFAULT_LOG_ROWS_NUM || now - lastFlushTime > Constants.DEFAULT_LOG_FLUSH_INTERVAL) { lastFlushTime = now; /** log handle */ logHandler.accept(logBuffer); logBuffer.clear(); } return lastFlushTime; } /** * close buffer reader * * @param inReader in reader */ private void close(BufferedReader inReader) { if (inReader != null) { try { inReader.close(); } catch (IOException e) { logger.error(e.getMessage(), e); } } } protected List<String> commandOptions() { return Collections.emptyList(); } protected abstract String buildCommandFilePath(); protected abstract String commandInterpreter(); protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; }
在这段源码中,parseProcessOutput(Process process) 方法是负责任务日志的获取以及Flush。 但是由于采用了BufferedReader 中的readLine() 方法来读取任务进程的process.getInputStream()日志,由于readLine() 是一个阻塞方法,
flush(long lastFlushTime) 方法在处理时有一个判断条件if (logBuffer.size() >= Constants.DEFAULT_LOG_ROWS_NUM || now - lastFlushTime > Constants.DEFAULT_LOG_FLUSH_INTERVAL),只有当日志条数达到64条或者间隔1s时才会
flush。按理说,代码其实是要实现至少每隔1s会flash 一次日志,但是由于readLine() 是一个阻塞方法,所以并不会一直在执行,而是readLine()必须是读取到新数据后,才会执行flush方法。 那么在出现1s内产生的任务日志不满足64条,而任务又很久没有新日志出现时,就会触发这个bug。例如执行如下一个shell 脚本任务,由于每个执行步骤产生的日志少,而且每个步骤执行的时间又很久,时间间隔很大,就会出现很久都不会刷新上一次产生的日志。
#!/bin/bash echo "hello world" exec 10m sleep 100000s echo "hello world2" exec 10m sleep 100000s echo "hello world3" exec 10m sleep 100000s
之后我们对这段源码进行了重写,采用了两个线程进行处理,一个线程负责readline(),一个线程负责flush.做到在readline()方法的线程阻塞时,不影响flush线程的处理。
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
public abstract class AbstractCommandExecutor { /** * rules for extracting application ID */ protected static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX); /** * process */ private Process process; /** * log handler */ protected Consumer<List<String>> logHandler; /** * logger */ protected Logger logger; /** * log list */ protected final List<String> logBuffer; protected boolean logOutputIsScuccess = false; /** * taskExecutionContext */ protected TaskExecutionContext taskExecutionContext; /** * taskExecutionContextCacheManager */ private TaskExecutionContextCacheManager taskExecutionContextCacheManager; ......... /** * get the standard output of the process * * @param process process */ private void parseProcessOutput(Process process) { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId()); ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService"); getOutputLogService.submit(() -> { BufferedReader inReader = null; try { inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); String line;while ((line = inReader.readLine()) != null) { logBuffer.add(line); } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { logOutputIsScuccess = true; close(inReader); } }); getOutputLogService.shutdown(); ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); parseProcessOutputExecutorService.submit(() -> { try { long lastFlushTime = System.currentTimeMillis(); while (logBuffer.size() > 0 || !logOutputIsScuccess) { if (logBuffer.size() > 0) { lastFlushTime = flush(lastFlushTime); } else { Thread.sleep(Constants.DEFAULT_LOG_FLUSH_INTERVAL); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { clear(); } }); parseProcessOutputExecutorService.shutdown(); } ....... /** * when log buffer siz or flush time reach condition , then flush * * @param lastFlushTime last flush time * @return last flush time */ private long flush(long lastFlushTime) throws InterruptedException { long now = System.currentTimeMillis(); /** * when log buffer siz or flush time reach condition , then flush */ if (logBuffer.size() >= Constants.DEFAULT_LOG_ROWS_NUM || now - lastFlushTime > Constants.DEFAULT_LOG_FLUSH_INTERVAL) { lastFlushTime = now; /** log handle */ logHandler.accept(logBuffer); logBuffer.clear(); } return lastFlushTime; } ....... }
4、将调度系统的监控接入到prometheus和grafana中
incubator-dolphinscheduler 只提供了一些如下的简单实时监控,尤其缺少对任务的监控。
联童在此基础上,引入了prometheus和grafana。
使用prometheus和grafana 不但可以监控到调度系统任务的总体运行,也可以监控到单个任务的运行耗时曲线等。
5、对incubator-dolphinscheduler 的性能优化
1、项目中工作流实例以及任务实例在查询时由于查询了不需要的longtext字段导致查询分页性能下降。
2、工作流实例查询时,增加了默认查询时间范围为一天以及查询时间范围的选择,减少查询数据量。
3、任务实例查询时,增加了默认查询时间范围为一天以及查询时间范围的选择,减少查询数据量。
4、其他一些查询上的索引性能优化等。
6、对incubator-dolphinscheduler 的用户体验的优化
1、 任务在下线时,增加确认框提示,防止用户误操作导致任务下线。
2、工作流定义列表、工作流实例列表、任务实例列表,项目列表中增加相应id的展示
3、任务实例查询,增加更多的输入框,方便快速查找到对应的实例。
4、增加快捷按钮,能直接跳转到指定的工作流实例下,跳转时,能带上工作流名称作为搜索条件自动查询出指定的列表。
5、项目首页增加失败工作流实例统计
6、项目首页增加用户搜索输入框,可以查询自己用户的运行统计数据。
7、钉钉告警时,允许配置钉钉告警时@到指定的用户。
8、datax 任务中功能的一些扩展。
9、任务被动依赖时,之前没有考虑任务还没的及运行的情况。
private DependResult calculateResultForTasks(DependentItem dependentItem, List<DateInterval> dateIntervals) { DependResult result = DependResult.FAILED; for (DateInterval dateInterval : dateIntervals) { ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(), dateInterval); if (processInstance == null) { //笔者新补充代码的 ProcessDefinition processDefinition = processService.findProcessDefineById(dependentItem.getDefinitionId()); if(null != processDefinition && processDefinition.getFlag().equals(Flag.YES)){ logger.info("processDefinition is not run ,DependResult calculateResultForTasks wait processDefinition id:{} to run...",dependentItem.getDefinitionId()); return DependResult.WAITING; } logger.error("cannot find the right process instance: definition id:{}, start:{}, end:{}", dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime()); return DependResult.FAILED; } // need to check workflow for updates, so get all task and check the task state if (dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)) { result = dependResultByProcessInstance(processInstance); } else { result = getDependTaskResult(dependentItem.getDepTasks(), processInstance, dateInterval.getStartTime(), dateInterval.getEndTime()); } if (result != DependResult.SUCCESS) { break; } } return result; }
10、 在工作流DAG页面增加上线、下线、定时、运行等按钮,这样避免用户返回到切换到列表中去操作,增强用户体验。
11、工作流可以修改所属用户
12、任务依赖查看页面中,增加快捷按钮,可以跳转到依赖的工作流实例页面和任务实例页面,方便点击查看被依赖的任务的运行情况。
13、工作流定义页面中,可以按照工作流上线状态和定时上线状态进行检索。
14、其他的一些优化改进,比如改进任务流里失败的任务手动重跑成功后,后面的依赖 还是不能正常重跑,必须要 把整个工作流中每一个任务节点都重新运行一次等,此处不列举了。
7、增加被任务被依赖的查看
四、联童对于开源社区的拥抱和回馈
联童虽然是一家新兴起的母婴童公司,但是在成立的初始,就秉承着以技术来驱动母婴童产业的发展,公司拥有一个非常好的技术团队,也一直在拥抱开源社区,目前已经引入了incubator-dolphinscheduler、prometheus、grafana 、hadoop、spark、flink、hive、presto......等很多开源项目来支撑公司的技术驱动。在未来,联童也一定回不断的去回馈开源社区,去提供更多的Pull requests,贡献自己的一份力量。
【备注】本篇文章 摘选自【联童直通车】公众号: https://mp.weixin.qq.com/s?__biz=MzI5NDM3MzU0MQ==&mid=2247483996&idx=1&sn=cdb180446fa215705b21e09bdfb632fd&chksm=ec629396db151a80ee6c8b7f860cf4061b72e2bf0f1f099ef281bc914d8f5bcd60e70c54c2da&scene=126&&sessionid=1614058343&version=3.1.1.3006&platform=win#rd
欢迎加入APachec dolphinscheduler社区
https://github.com/apache/incubator-dolphinscheduler (请记得fork和star)
订阅邮件列表
用自己的邮箱向dev-subscribe@dolphinscheduler.apache.org发送一封邮件,主题和内容任意。
接收确认邮件并回复。完成步骤1后,将收到一封来自dev-help@dolphinscheduler.apache.org的确认邮件(如未收到,请确认邮件是否被自动归入垃圾邮件、推广邮件、订阅邮件等文件夹)。然后直接回复该邮件,或点击邮件里的链接快捷回复即可,主题和内容任意。
接收欢迎邮件。完成以上步骤后,会收到一封主题为WELCOME to dev@dolphinscheduler.apache.org的欢迎邮件,至此已成功订阅Apache DolphinScheduler(Incubating)的邮件列表。