zoukankan      html  css  js  c++  java
  • 【原创】大数据基础之Azkaban(1)简介、源代码解析

    Azkaban3.45

    一 简介

    1 官网

    https://azkaban.github.io/

    Azkaban was implemented at LinkedIn to solve the problem of Hadoop job dependencies. We had jobs that needed to run in order, from ETL jobs to data analytics products.

    Initially a single server solution, with the increased number of Hadoop users over the years, Azkaban has evolved to be a more robust solution.

    Azkaban是由LinkedIn为了解决Hadoop环境下任务依赖问题而开发的,LinkedIn团队有很多任务需要按照顺序运行,包括ETL任务以及数据分析任务;

    Azkaban一开始是单server方案,现在已经演化为一个更健壮的方案;(可惜当前版本的WebServer还是单点)

    Azkaban consists of 3 key components:

    • Relational Database (MySQL)
    • AzkabanWebServer
    • AzkabanExecutorServer

    Azkaban有3个核心组件:Mysql、WebServer、ExecutorServer;

    2 部署

    3 数据库表结构

    projects:项目

    project_flows:工作流定义

    execution_flows:工作流实例

    execution_jobs:任务实例

    triggers:调度定义

    ps:表中很多数据都是编码的,enc_type是编码类型(对应的枚举为EncodingType),2是gzip编码,其他为无编码,2需要调用GZIPUtils.transformBytesToObject解析得到原始字符串;

    4 概念

    l  Job:最小的执行单元,作为DAG的一个结点,即任务

    l  Flow:由多个Job组成,并通过dependent配置Job的依赖属性,即工作流

    l  Tirgger:根据指定Cron信息触发Flow,即调度

    二 代码解析

    1 启动过程

    Web Server

    AzkabanWebServer.main

             launch

                      prepareAndStartServer

                              configureRoutes

                                       TriggerManager.start

                              FlowTriggerService.start

                                       recoverIncompleteTriggerInstances

                                                SELECT %s FROM execution_dependencies WHERE trigger_instance_id in (SELECT trigger_instance_id FROM execution_dependencies WHERE dep_status = %s or dep_status = %s or (dep_status = %s and flow_exec_id = %s))

                              FlowTriggerScheduler.start

    ExecutorManager

             setupExecutors

             loadRunningFlows

    QueueProcessorThread.run

    ExecutingManagerUpdaterThread.run

    Executor Server

    AzkabanExecutorServer.main

             launch

                      AzkabanExecutorServer.start

                              insertExecutorEntryIntoDB

    2 工作流执行过程

    Web Server两个入口:

    ExecuteFlowAction.doAction

    ExecutorServlet.ajaxExecuteFlow

    Web Server分配任务:

    ExecutorManager.submitExecutableFlow

             JdbcExecutorLoader.uploadExecutableFlow

                      INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time) values (?,?,?,?,?,?,?)

             ExecutorLoader.addActiveExecutableReference

                      INSERT INTO active_executing_flows (exec_id, update_time) values (?,?)

             queuedFlows.enqueue

    QueueProcessorThread.run

             processQueuedFlows

                      ExecutorManager.selectExecutorAndDispatchFlow (get from queuedFlows)

                              selectExecutor

                              dispatch

                                       JdbcExecutorLoader.assignExecutor

                                                UPDATE execution_flows SET executor_id=? where exec_id=?

                                       ExecutorApiGateway.callWithExecutable (调用Executor Server)

    Executor Server执行任务:

    ExecutorServlet.doGet

             handleAjaxExecute

                      FlowRunnerManager.submitFlow

                              JdbcExecutorLoader.fetchExecutableFlow

                                     SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE exec_id=?

                              FlowPreparer.setup

                              FlowRunner.run

                                       setupFlowExecution

                                       updateFlow

                                                UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? WHERE exec_id=?

                                       runFlow

                                                progressGraph

                                                         runReadyJob

                                                                 runExecutableNode

                                                                          JobRunner.run

                                                                                   uploadExecutableNode

                                                                                            INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)

                                                                                   prepareJob

                                                                                   runJob

                                                                                            Job.run (ProcessJob, JavaJob)

    Web Server轮询流程状态:

    ExecutingManagerUpdaterThread.run

             getFlowToExecutorMap

             ExecutorApiGateway.callWithExecutionId

             updateExecution

    3 调度执行过程

    TriggerManager.start

             loadTriggers

                      SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM triggers

             TriggerScannerThread.start

                      checkAllTriggers

                              onTriggerTrigger

                                       TriggerAction.doAction

                                                ExecuteFlowAction.doAction

    PS:还有另一套完全独立的定时任务逻辑,通过azkaban.server.schedule.enable_quartz控制(默认false),以下为register job到quartz:

    ProjectManagerServlet.ajaxHandleUpload

             SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=? AND active=true

             ProjectManager.loadAllProjectFlows

                      SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=?

             FlowTriggerScheduler.scheduleAll

                      SELECT MAX(flow_version) FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=?

                      SELECT flow_file FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=? AND flow_version=?

                      registerJob

    以下为quartz job执行:

    FlowTriggerQuartzJob.execute

             FlowTriggerService.startTrigger

                      TriggerInstanceProcessor.processSucceed

                              TriggerInstanceProcessor.executeFlowAndUpdateExecID

                                       ExecutorManager.submitExecutableFlow

    4 任务执行过程

    Job是任务的核心接口,所有具体任务都是该接口的子类:

    Job

             AbstractJob

                      AbstractProcessJob

                              ProcessJob (Shell任务)

                                       JavaProcessJob (Java任务)

                                                JavaJob

  • 相关阅读:
    2013 蓝桥杯B组C++
    Kruskal 算法 && Kruskal 重构树
    并查集与其优化(启发式合并、压缩路径)
    【2021 首祭】一周晴天
    Docker以过时,看Containerd怎样一统天下
    史上最全的Nginx配置文档
    Windows环境Android studio运行RN项目,Node突然闪退
    solr docker 配置
    腾讯2017暑期实习生编程题详解
    华为2016研发工程师编程题详解
  • 原文地址:https://www.cnblogs.com/barneywill/p/9895130.html
Copyright © 2011-2022 走看看