zoukankan      html  css  js  c++  java
  • oozie coordinator 定时调度

      (本段内容摘自http://blog.sina.com.cn/s/blog_e699b42b0102xjqw.html  Oozie总结 行成于思的博客)      Oozie提出了Coordinator的概念,它能够将每个工作流Job作为一个动作(Action)来运行,相当于工作流定义中的一个执行节点(我们可以理解为工作流的工作流),这样就能够将多个工作流Job组织起来,称为Coordinator Job,并指定触发时间和频率,还可以配置数据集、并发数等。一个Coordinator Job包含了在Job外部设置执行周期和频率的语义,类似于在工作流外部增加了一个协调器来管理这些工作流的工作流Job的运行。 相比较于单个任务的配置,Coordinator的配置是在job.properties中将oozie.wf.application.path修改为oozie.coord.application.path。并且还要定义一个coordinator.xml

    coordinator application:

       coordinator application是在满足一组条件时触发动作(通常是工作流作业)的程序。条件可以是时间频率、新数据集实例或其他外部事件。

       coordinator application的类型:同步:它的协调器动作是在指定的时间间隔创建的,通常是参数化的。

     coordinator job:

      要创建一个coordinator job,必须向协调器引擎提供解决所有coordinator application参数的作业配置.

      coordinator  job是从开始时间到结束时间运行的coordinator application的运行实例,开始时间必须比结束时间早。

      通常情况下,一个coordinator 作业是下列状态之一:PREP, RUNNING, RUNNINGWITHERROR, PREPSUSPENDED, SUSPENDED, SUSPENDEDWITHERROR, PREPPAUSED, PAUSED, PAUSEDWITHERROR, SUCCEEDED, DONEWITHERROR, KILLED, FAILED .

      有效的coordinator job状态转换是:

        PREP --> PREPSUSPENDED | PREPPAUSED | RUNNING | KILLED
        RUNNING --> RUNNINGWITHERROR | SUSPENDED | PAUSED | SUCCEEDED | KILLED
        RUNNINGWITHERROR --> RUNNING | SUSPENDEDWITHERROR | PAUSEDWITHERROR | DONEWITHERROR | KILLED | FAILED
        PREPSUSPENDED --> PREP | KILLED
        SUSPENDED --> RUNNING | KILLED
        SUSPENDEDWITHERROR --> RUNNINGWITHERROR | KILLED
        PREPPAUSED --> PREP | KILLED
        PAUSED --> SUSPENDED | RUNNING | KILLED
        PAUSEDWITHERROR --> SUSPENDEDWITHERROR | RUNNINGWITHERROR | KILLED
        FAILED | KILLED --> IGNORED
        IGNORED --> RUNNING 

      当一个coordinator 提交作业,Oozie解析 coordinator job XML。Oozie然后创建与状态准备coordinator 记录并返回一个唯一的ID,如果没有设置暂停时间coordinator 也立即开始.

    Coordinator Action:

      coordinator job 创建并执行 coordinator actions.

      coordinator action通常是一个workflow job ,它消耗并生成数据集实例。

     一旦创建了coordinator action(这也被称为正在实现的action), coordinator action 将一直等待,直到满足执行所需的所有输入,或者直到等待超时为止。

      

    Synchronous Coordinator Application定义:

       synchronous coordinator 是由 name, start time ,end time, the frequency of creation of its coordinator actions, the input events, the output events , action control information来定义的.

    语法:  

    <coordinator-app name="[NAME]" frequency="[FREQUENCY]"
                        start="[DATETIME]" end="[DATETIME]" timezone="[TIMEZONE]"
                        xmlns="uri:oozie:coordinator:0.1">   
    #frequency:执行频率,小于五分钟要修改配置 start,end:开始与结束时间,若想跟北京时间一样也要修改配置文件,并修改时间格式
    <controls> <timeout>[TIME_PERIOD]</timeout> <concurrency>[CONCURRENCY]</concurrency> <execution>[EXECUTION_STRATEGY]</execution> </controls> . <datasets>     <include>[SHARED_DATASETS]</include> ... . <!-- Synchronous datasets --> #---数据生成目录 <dataset name="[NAME]" frequency="[FREQUENCY]" initial-instance="[DATETIME]" timezone="[TIMEZONE]"> <uri-template>[URI_TEMPLATE]</uri-template> </dataset> ... . </datasets> . <input-events>    #----定义了数据触发条件 <data-in name="[NAME]" dataset="[DATASET]"> <instance>[INSTANCE]</instance> ... </data-in> ... <data-in name="[NAME]" dataset="[DATASET]"> <start-instance>[INSTANCE]</start-instance> <end-instance>[INSTANCE]</end-instance> </data-in> ... </input-events> <output-events> <data-out name="[NAME]" dataset="[DATASET]"> <instance>[INSTANCE]</instance> </data-out> ... </output-events> <action> <workflow> <app-path>[WF-APPLICATION-PATH]</app-path>    #---workflow.xml所在hdfs目录 <configuration> <property> #----定义传给workflow的参数 <name>[PROPERTY-NAME]</name> <value>[PROPERTY-VALUE]</value> </property> ... </configuration> </workflow> </action> </coordinator-app>

    官网给出的例子:

    <coordinator-app name="hello-coord" frequency="${coord:days(1)}"
                        start="2009-01-02T08:00Z" end="2009-01-02T08:00Z"
                        timezone="America/Los_Angeles"
                        xmlns="uri:oozie:coordinator:0.1">
          <datasets>
            <dataset name="logs" frequency="${coord:days(1)}"
                     initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles">
              <uri-template>hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY}/data</uri-template>
            </dataset>
            <dataset name="siteAccessStats" frequency="${coord:days(1)}"
                     initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles">
              <uri-template>hdfs://bar:8020/app/stats/${YEAR}/${MONTH}/${DAY}/data</uri-template>
            </dataset>
          </datasets>
          <input-events>    
            <data-in name="input" dataset="logs">
              <instance>2009-01-02T08:00Z</instance>
            </data-in>
          </input-events>
          <output-events>
             <data-out name="output" dataset="siteAccessStats">
               <instance>2009-01-02T08:00Z</instance>
             </data-out>
          </output-events>
          <action>
            <workflow>
              <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path>    
              <configuration>
                <property>   
                  <name>wfInput</name>
                  <value>${coord:dataIn('input')}</value>
                </property>
                <property>
                  <name>wfOutput</name>
                  <value>${coord:dataOut('output')}</value>
                </property>
             </configuration>
           </workflow>
          </action>
       </coordinator-app>

    我们工作时写的:

    coordinator.xml
    <coordinator-app name="cron-coord" frequency="${coord:minutes(6)}" start="${start}" end="${end}" timezone="Asia/Shanghai" 
    xmlns="uri:oozie:coordinator:0.2"> <action> <workflow> <app-path>${workflowAppUri}</app-path> <configuration> <property> <name>jobTracker</name> <value>${jobTracker}</value> </property> <property> <name>nameNode</name> <value>${nameNode}</value> </property> <property> <name>queueName</name> <value>${queueName}</value> </property> <property> <name>mainClass</name> <value>com.ocn.itv.rinse.ErrorCollectRinse</value> </property> </configuration> </workflow> </action> </coordinator-app>
    workflow.xml

    <workflow-app name="Spark-example1" xmlns="uri:oozie:workflow:0.5"> <start to="spark-SparkOozieAction"/> <action name="spark-SparkOozieAction"> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <master>yarn-cluster</master> <mode>cluster</mode> <name>Spark Example</name> <class>${mainClass}</class> <jar>ocn-itv-spark-3.0.3-rc1.jar</jar> <spark-opts>${sparkopts}</spark-opts> <arg>${input}</arg> </spark > <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>
    job.properties

    nameNode=hdfs://hgdp-001:8020 jobTracker=hgdp-001:8032 queueName=default input=2017-05-09  #输入参数 start=2017-08-28T17:50+0800 #开始时间 end=2017-08-28T18:50+0800  #结束时间 sparkopts=--executor-memory 1G oozie.use.system.libpath=True  #是否开启系统用户lib库(有action依赖的jar包) hdfspath=user/root examplesRoot=ocn-itv-oozie #全局目录 oozie.libpath=${nameNode}/${hdfspath}/${examplesRoot}/lib/ #用户lib库地址(自添加jar包) workflowAppUri=${nameNode}/${hdfspath}/${examplesRoot}/wf/wf1/ #workflow.xml所在目录 oozie.coord.application.path=${nameNode}/${hdfspath}/${examplesRoot}/cd/cd1/  #coordinator.xml坐在目录

    最后运行:

      启动任务:oozie job -config job.properties -run -oozie http://xxxx(地址):11000/oozie

    运行结果:

  • 相关阅读:
    Helvetic Coding Contest 2017 online mirror (teams allowed, unrated) J
    ROS_Kinetic_19 群机器人框架示例(micros swarm framework)
    ROS_Kinetic_18 使用V-Rep3.3.1和Matlab2015b(vrep_ros_bridge)续
    ROS_Kinetic_17 使用V-Rep3.3.1(vrep_ros_bridge)
    USB OTG原理+ ID 检测原理
    高通QSD MSM APQ区别
    qualcomm memory dump 抓取方法
    msm8974 camera driver添加新摄像头kernel hal修改
    现代控制理论-章节组织结构和仿真应用案例详细分析
    ROS_Kinetic_16 ubuntu中安装使用Matlab和ROS
  • 原文地址:https://www.cnblogs.com/jjSmileEveryDay/p/7457385.html
Copyright © 2011-2022 走看看