今天这篇文章分析一下DolphinScheduler2.0.0 版本的源码
关于如何搭建DolphinScheduler2.0.0源码分析环境,可以参考官方网站和我之前的文章。
下面开始分析:
第一步:先在idea启动ApiApplicationServer和MasterServer进程和WorkerServer进程。
第二步:启动前端程序,切换到dolphinscheduler-ui子文件夹下,用cmd运行npm run start
第三步:用浏览器打开localhost:8888,输入用户名密码登录。初始的用户名密码是: dolphinscheduler / dolphinscheduler123
进去后,新建一个项目如下
第四步:创建一个新工作流
第五步:切换到后台mysql数据库,看一下此时数据库的情况:
打开t_ds_task_definition表,查看一下,信息保存如下:
第六步:上线刚才的任务进行运行
看一下提交之后,后台MasterServer进程的日志输出情况:
[INFO] 2021-11-23 11:43:00.387 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1162] - remove task from stand by list, id: 1 name:test_shell [INFO] 2021-11-23 11:43:00.392 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[291] - process event: State Event :key: null type: TASK_STATE_CHANGE executeStatus: FAILURE task instance id: 1 process instance id: 1 context: null [INFO] 2021-11-23 11:43:00.397 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[369] - work flow 1 task 1 state:FAILURE [INFO] 2021-11-23 11:43:00.397 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1146] - add task to stand by list: test_shell [INFO] 2021-11-23 11:43:00.397 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[376] - failure task will be submitted: process id: 1, task instance id: 1 state:FAILURE retry times:0 / 1, interval:30 [INFO] 2021-11-23 11:44:00.038 org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob:[74] - scheduled fire time :Tue Nov 23 11:44:00 CST 2021, fire time :Tue Nov 23 11:44:00 CST 2021, process id :1 [INFO] 2021-11-23 11:44:00.917 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[243] - find command 2, slot:0 : [INFO] 2021-11-23 11:44:00.918 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[186] - find one command: id: 2, type: SCHEDULER [INFO] 2021-11-23 11:44:00.936 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[209] - handle command end, command 2 process 2 start... [INFO] 2021-11-23 11:44:00.951 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1146] - add task to stand by list: test_shell [INFO] 2021-11-23 11:44:00.954 org.apache.dolphinscheduler.service.process.ProcessService:[1093] - start submit task : test_shell, instance id:2, state: RUNNING_EXECUTION [INFO] 2021-11-23 11:44:00.966 org.apache.dolphinscheduler.service.process.ProcessService:[1106] - end submit task to db successfully:2 test_shell state:SUBMITTED_SUCCESS complete, instance id:2 state: RUNNING_EXECUTION [INFO] 2021-11-23 11:44:00.967 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[120] - task ready to submit: TaskInstance{id=2, name='test_shell', taskType='SHELL', processInstanceId=2, processInstanceName='null', state=SUBMITTED_SUCCESS, firstSubmitTime=Tue Nov 23 11:44:00 CST 2021, submitTime=Tue Nov 23 11:44:00 CST 2021, startTime=null, endTime=null, host='null', executePath='null', logPath='null', retryTimes=0, alertFlag=NO, processInstance=null, processDefine=null, pid=0, appLink='null', flag=YES, dependency='null', duration=null, maxRetryTimes=1, retryInterval=30, taskInstancePriority=MEDIUM, processInstancePriority=MEDIUM, dependentResult='null', workerGroup='default', environmentCode=-1, environmentConfig='null', executorId=1, executorName='null', delayTime=5, dryRun=0} [ERROR] 2021-11-23 11:44:00.981 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[334] - tenant not exists,process instance id : 2,task instance id : 2 [INFO] 2021-11-23 11:44:00.993 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[130] - master submit success, task : test_shell [ERROR] 2021-11-23 11:44:00.993 org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer:[116] - dispatcher task error java.lang.NullPointerException: null at org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer.dispatch(TaskPriorityQueueConsumer.java:131) at org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer.run(TaskPriorityQueueConsumer.java:100) [INFO] 2021-11-23 11:44:00.997 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1162] - remove task from stand by list, id: 2 name:test_shell [INFO] 2021-11-23 11:44:01.000 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[291] - process event: State Event :key: null type: TASK_STATE_CHANGE executeStatus: FAILURE task instance id: 2 process instance id: 2 context: null [INFO] 2021-11-23 11:44:01.002 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[369] - work flow 2 task 2 state:FAILURE [INFO] 2021-11-23 11:44:01.003 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1146] - add task to stand by list: test_shell [INFO] 2021-11-23 11:44:01.003 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[376] - failure task will be submitted: process id: 2, task instance id: 2 state:FAILURE retry times:0 / 1, interval:30
再过一分钟,再次调用的时候,再次输出新日志如下:
[INFO] 2021-11-23 11:45:00.079 org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob:[74] - scheduled fire time :Tue Nov 23 11:45:00 CST 2021, fire time :Tue Nov 23 11:45:00 CST 2021, process id :1 [INFO] 2021-11-23 11:45:00.657 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[243] - find command 3, slot:0 : [INFO] 2021-11-23 11:45:00.657 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[186] - find one command: id: 3, type: SCHEDULER [INFO] 2021-11-23 11:45:00.748 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[209] - handle command end, command 3 process 3 start... [INFO] 2021-11-23 11:45:00.767 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1146] - add task to stand by list: test_shell [INFO] 2021-11-23 11:45:00.769 org.apache.dolphinscheduler.service.process.ProcessService:[1093] - start submit task : test_shell, instance id:3, state: RUNNING_EXECUTION [INFO] 2021-11-23 11:45:00.799 org.apache.dolphinscheduler.service.process.ProcessService:[1106] - end submit task to db successfully:3 test_shell state:SUBMITTED_SUCCESS complete, instance id:3 state: RUNNING_EXECUTION [INFO] 2021-11-23 11:45:00.800 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[120] - task ready to submit: TaskInstance{id=3, name='test_shell', taskType='SHELL', processInstanceId=3, processInstanceName='null', state=SUBMITTED_SUCCESS, firstSubmitTime=Tue Nov 23 11:45:00 CST 2021, submitTime=Tue Nov 23 11:45:00 CST 2021, startTime=null, endTime=null, host='null', executePath='null', logPath='null', retryTimes=0, alertFlag=NO, processInstance=null, processDefine=null, pid=0, appLink='null', flag=YES, dependency='null', duration=null, maxRetryTimes=1, retryInterval=30, taskInstancePriority=MEDIUM, processInstancePriority=MEDIUM, dependentResult='null', workerGroup='default', environmentCode=-1, environmentConfig='null', executorId=1, executorName='null', delayTime=5, dryRun=0} [ERROR] 2021-11-23 11:45:00.810 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[334] - tenant not exists,process instance id : 3,task instance id : 3 [INFO] 2021-11-23 11:45:00.821 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[130] - master submit success, task : test_shell [ERROR] 2021-11-23 11:45:00.821 org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer:[116] - dispatcher task error java.lang.NullPointerException: null at org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer.dispatch(TaskPriorityQueueConsumer.java:131) at org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer.run(TaskPriorityQueueConsumer.java:100) [INFO] 2021-11-23 11:45:00.825 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1162] - remove task from stand by list, id: 3 name:test_shell [INFO] 2021-11-23 11:45:00.827 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[291] - process event: State Event :key: null type: TASK_STATE_CHANGE executeStatus: FAILURE task instance id: 3 process instance id: 3 context: null [INFO] 2021-11-23 11:45:00.829 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[369] - work flow 3 task 3 state:FAILURE [INFO] 2021-11-23 11:45:00.829 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1146] - add task to stand by list: test_shell [INFO] 2021-11-23 11:45:00.829 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[376] - failure task will be submitted: process id: 3, task instance id: 3 state:FAILURE retry times:0 / 1, interval:30
第七步:根据上面一步的输出,去源代码中找到输出日志的文件位置:
我们先来找第一句:
[INFO] 2021-11-23 11:45:00.079 org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob:[74] - scheduled fire time :Tue Nov 23 11:45:00 CST 2021, fire time :Tue Nov 23 11:45:00 CST 2021, process id :1
我们以红色的为关键词,用idea的find in path整个工程去找到这个输出是在哪个java文件的哪个类哪个方法。
第一步:调度会先执行,src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java的execute()方法。
接下来看下一句:
[INFO] 2021-11-23 11:45:00.657 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[243] - find command 3, slot:0 :
这是src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java的findOneCommand()方法。
接下来再看下一句:
[INFO] 2021-11-23 11:45:00.657 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[186] - find one command: id: 3, type: SCHEDULER
这是src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java的scheduleProcess()方法。
接下来看下一句log:
[INFO] 2021-11-23 11:45:00.748 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[209] - handle command end, command 3 process 3 start...
这仍然是src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java的scheduleProcess()方法。
接下来再看下一句:
[INFO] 2021-11-23 11:45:00.767 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1146] - add task to stand by list: test_shell
这是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的addTaskToStandByList(TaskInstance taskInstance)方法。
接下来看下一句log:
[INFO] 2021-11-23 11:45:00.769 org.apache.dolphinscheduler.service.process.ProcessService:[1093] - start submit task : test_shell, instance id:3, state: RUNNING_EXECUTION
这是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的submitTask(TaskInstance taskInstance)方法。
接下来再看下一句log:
[INFO] 2021-11-23 11:45:00.799 org.apache.dolphinscheduler.service.process.ProcessService:[1106] - end submit task to db successfully:3 test_shell state:SUBMITTED_SUCCESS complete, instance id:3 state: RUNNING_EXECUTION
这是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的submitTask(TaskInstance taskInstance)方法。
接下来再看下一句日志:
[INFO] 2021-11-23 11:45:00.800 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[120] - task ready to submit: TaskInstance{id=3, name='test_shell', taskType='SHELL', processInstanceId=3, processInstanceName='null', state=SUBMITTED_SUCCESS, firstSubmitTime=Tue Nov 23 11:45:00 CST 2021, submitTime=Tue Nov 23 11:45:00 CST 2021, startTime=null, endTime=null, host='null', executePath='null', logPath='null', retryTimes=0, alertFlag=NO, processInstance=null, processDefine=null, pid=0, appLink='null', flag=YES, dependency='null', duration=null, maxRetryTimes=1, retryInterval=30, taskInstancePriority=MEDIUM, processInstancePriority=MEDIUM, dependentResult='null', workerGroup='default', environmentCode=-1, environmentConfig='null', executorId=1, executorName='null', delayTime=5, dryRun=0}
这是src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java的dispatchTask(TaskInstance taskInstance, ProcessInstance processInstance)方法。
接下来看下一句日志:
[ERROR] 2021-11-23 11:45:00.810 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[334] - tenant not exists,process instance id : 3,task instance id : 3
这是src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java的verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance)方法。
执行到这里说明报错了,租户不存在的错误。
虽然报错了,但是我们还是要继续看完日志,就当作这次是一次失败的执行,我们看看失败的流程是什么样的。
再往下走就是:
[INFO] 2021-11-23 11:45:00.821 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[130] - master submit success, task : test_shell
这是src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java的dispatchTask(TaskInstance taskInstance, ProcessInstance processInstance)方法。
接着往下看日志:
[ERROR] 2021-11-23 11:45:00.821 org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer:[116] - dispatcher task error
java.lang.NullPointerException: null
at org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer.dispatch(TaskPriorityQueueConsumer.java:131)
at org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer.run(TaskPriorityQueueConsumer.java:100)
这是src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java的run()方法。
接着往下走:
[INFO] 2021-11-23 11:45:00.825 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1162] - remove task from stand by list, id: 3 name:test_shell
这是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的removeTaskFromStandbyList(TaskInstance taskInstance)方法。
接着往下走是:
[INFO] 2021-11-23 11:45:00.827 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[291] - process event: State Event :key: null type: TASK_STATE_CHANGE executeStatus: FAILURE task instance id: 3 process instance id: 3 context: null
这是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的stateEventHandler(StateEvent stateEvent)方法。
接着往下走:
[INFO] 2021-11-23 11:45:00.829 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[369] - work flow 3 task 3 state:FAILURE
这是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的taskFinished(TaskInstance task)方法。
接着往下走是:
[INFO] 2021-11-23 11:45:00.829 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1146] - add task to stand by list: test_shell
这是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的addTaskToStandByList(TaskInstance taskInstance)方法。
接着往下走:
[INFO] 2021-11-23 11:45:00.829 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[376] - failure task will be submitted: process id: 3, task inst
这是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的taskFinished(TaskInstance task)方法。
到这里,一次代码流程的分析就结束了,当然,这个日志是一次失败的调度流程。失败的流程也是值得学习的,下一篇我们接着执行正确的流程。