zoukankan      html  css  js  c++  java
  • oozie 完整流程实例

    Oozie概述

      Oozie是一个基于Hadoop工作流引擎,也可以称为调度器,它以xml的形式写调度流程,可以调度mr,pig,hive,shell,jar,spark等等。在实际工作中,遇到对数据进行一连串的操作的时候很实用,不需要自己写一些处理代码了,只需要定义好各个action,然后把他们串在一个工作流里面就可以自动执行了。对于大数据的分析工作非常有用.

     Oozie有几个主要概念:

      workflow :工作流 ,顺序执行流程节点,支持fork(分支多个节点),join(合并多个节点为一个)。

      coordinator :多个workflow可以组成一个coordinator,可以把前几个workflow的输出作为后一个workflow的输入,也可以定义workflow的触发条件,来做定时触发。

      bundle: 是对一堆coordinator的抽象, 可绑定多个coordinator。

      job.properties:定义环境变量。

    oozie安装: 略

    oozie格式:

    1.workflow:

    <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
      ...
        <start to="[NODE-NAME]"/>
       <action name="[NODE-NAME]">
              ....
         <ok to="[NODE-NAME]"/> <error to="[NODE-NAME]"/> </action> 
       <kill name="[NODE-NAME]"> <message>[MESSAGE-TO-LOG]</message> </kill>  
       <end name="[NODE-NAME]"/>
    ...

    </workflow-app>
    2.coordinator.xml
    <coordinator-app name="[NAME]" frequency="[FREQUENCY]"
                        start="[DATETIME]" end="[DATETIME]" timezone="[TIMEZONE]"
                        xmlns="uri:oozie:coordinator:0.1">   
    #frequency:执行频率,小于五分钟要修改配置 start,end:开始与结束时间,若想跟北京时间一样也要修改配置文件,并修改时间格式
    
          <controls>
            <timeout>[TIME_PERIOD]</timeout>
            <concurrency>[CONCURRENCY]</concurrency>
            <execution>[EXECUTION_STRATEGY]</execution>
          </controls>
    .
          <datasets>    
            <include>[SHARED_DATASETS]</include>
            ...
    .
            <!-- Synchronous datasets --> #---数据生成目录
            <dataset name="[NAME]" frequency="[FREQUENCY]"
                     initial-instance="[DATETIME]" timezone="[TIMEZONE]">
              <uri-template>[URI_TEMPLATE]</uri-template>
            </dataset>
            ...
    .
          </datasets>
    .
          <input-events>    #----定义了数据触发条件
            <data-in name="[NAME]" dataset="[DATASET]">
              <instance>[INSTANCE]</instance>
              ...
            </data-in>
            ...
            <data-in name="[NAME]" dataset="[DATASET]">
              <start-instance>[INSTANCE]</start-instance>
              <end-instance>[INSTANCE]</end-instance>
            </data-in>
            ...
          </input-events>
          <output-events>
             <data-out name="[NAME]" dataset="[DATASET]">
               <instance>[INSTANCE]</instance>
             </data-out>
             ...
          </output-events>
          <action>
            <workflow>
              <app-path>[WF-APPLICATION-PATH]</app-path>    #---workflow.xml所在hdfs目录
              <configuration>
                <property>    #----定义传给workflow的参数
                  <name>[PROPERTY-NAME]</name>
                  <value>[PROPERTY-VALUE]</value>
                </property>
                ...
             </configuration>
           </workflow>
          </action>
       </coordinator-app>

    官网给出的例子:

    <coordinator-app name="hello-coord" frequency="${coord:days(1)}"
                        start="2009-01-02T08:00Z" end="2009-01-02T08:00Z"
                        timezone="America/Los_Angeles"
                        xmlns="uri:oozie:coordinator:0.1">
          <datasets>
            <dataset name="logs" frequency="${coord:days(1)}"
                     initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles">
              <uri-template>hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY}/data</uri-template>
            </dataset>
            <dataset name="siteAccessStats" frequency="${coord:days(1)}"
                     initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles">
              <uri-template>hdfs://bar:8020/app/stats/${YEAR}/${MONTH}/${DAY}/data</uri-template>
            </dataset>
          </datasets>
          <input-events>    
            <data-in name="input" dataset="logs">
              <instance>2009-01-02T08:00Z</instance>
            </data-in>
          </input-events>
          <output-events>
             <data-out name="output" dataset="siteAccessStats">
               <instance>2009-01-02T08:00Z</instance>
             </data-out>
          </output-events>
          <action>
            <workflow>
              <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path>    
              <configuration>
                <property>   
                  <name>wfInput</name>
                  <value>${coord:dataIn('input')}</value>
                </property>
                <property>
                  <name>wfOutput</name>
                  <value>${coord:dataOut('output')}</value>
                </property>
             </configuration>
           </workflow>
          </action>
       </coordinator-app>

    3.bundle.xml

    语法:

    <bundle-app name=[NAME]  xmlns='uri:oozie:bundle:0.1'> 
      <controls>
           <kick-off-time>[DATETIME]</kick-off-time>    #运行时间
      </controls>
       <coordinator name=[NAME] >
           <app-path>[COORD-APPLICATION-PATH]</app-path> # coordinator.xml所在目录
              <configuration>                 #传给coordinator应用的参数
                <property>
                  <name>[PROPERTY-NAME]</name>   
                  <value>[PROPERTY-VALUE]</value>
                </property>
                ...
             </configuration>
       </coordinator>
       ...
    </bundle-app>  

    官网给出的例子(绑定两个coordinator):

    <bundle-app name='APPNAME' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xmlns='uri:oozie:bundle:0.1'> 
      <controls>
           <kick-off-time>${kickOffTime}</kick-off-time>
      </controls>
       <coordinator name='coordJobFromBundle1' >
           <app-path>${appPath}</app-path>
           <configuration>
             <property>
                  <name>startTime1</name>
                  <value>${START_TIME}</value>
              </property>
             <property>
                  <name>endTime1</name>
                  <value>${END_TIME}</value>
              </property>
          </configuration>
       </coordinator>
       <coordinator name='coordJobFromBundle2' >
           <app-path>${appPath2}</app-path>
           <configuration>
             <property>
                  <name>startTime2</name>
                  <value>${START_TIME2}</value>
              </property>
             <property>
                  <name>endTime2</name>
                  <value>${END_TIME2}</value>
              </property>
          </configuration>
       </coordinator>
    </bundle-app>

    4,.job.properties:

    nameNode               hdfs://xxx:8020    hdfs地址
    jobTracker             xxx5:8034          jobTracker 地址
    queueName              default            oozie队列
    examplesRoot            examples           全局目录
    oozie.usr.system.libpath    true           是否加载用户lib库
    oozie.libpath            share/lib/user    用户lib库
    oozie.wf.appication.path   ${nameNode}/user/${user.name}/... oozie流程所在hdfs地址

    workflow:oozie.wf.application.path

    coordinator:oozie.coord.application.path

    bundle:oozie.bundle.application.path

    Oozie使用:

    写一个oozie,有两个是必要的:job.properties 和 workflow.xml(coordinator.xml,bundle.xml)

    如果想让任务可以定时自动运行,那么需要写coordinator.xml。

    如果想绑定多个coordinator.xml,那么需要写bundle.xml。

    Oozie实例:

    我们工作时的(简略版)实例:(本次以spark action为例,其他博客中有写java,shell action实例)

    bundle.xml:

    <bundle-app name='APPNAME' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' 
    xmlns='uri:oozie:bundle:0.2'> 
        <coordinator name='coordJobFromBundle1' >
           <app-path>${appPath}</app-path>   
       </coordinator>
       <coordinator name='coordJobFromBundle2' >
           <app-path>${appPath2}</app-path>
       </coordinator>
     
    </bundle-app>

    coordinator.xml:

    <coordinator-app name="cron-coord" frequency="${coord:minutes(6)}" start="${start}" 
    end="${end}" timezone="Asia/Shanghai" xmlns="uri:oozie:coordinator:0.2">
        <action>
            <workflow>
                <app-path>${workflowAppUri}</app-path>
                <configuration>
                    <property>
                        <name>jobTracker</name>
                        <value>${jobTracker}</value>
                    </property>
                    <property>
                        <name>nameNode</name>
                        <value>${nameNode}</value>
                    </property>
                    <property>
                        <name>queueName</name>
                        <value>${queueName}</value>
                    </property>
                    <property>
                        <name>mainClass</name>
                        <value>com.ocn.itv.rinse.ErrorCollectRinse</value>
                    </property>
                    <property>
                        <name>mainClass2</name>
                        <value>com.ocn.itv.rinse.UserCollectRinse</value>
                    </property>
                    <property>
                        <name>jarName</name>
                        <value>ocn-itv-spark-3.0.3-rc1.jar</value>
                    </property>
                </configuration>
            </workflow>
        </action>
    </coordinator-app>

    workflow.xml:

    <workflow-app  name="spark-example1" xmlns="uri:oozie:workflow:0.5">  
        <start to="forking"/> 
        <fork name="forking">
            <path start="firstparalleljob"/>
            <path start="secondparalleljob"/>
        </fork>    
        <action name="firstparalleljob">
            <spark xmlns="uri:oozie:spark-action:0.2">  
                <job-tracker>${jobTracker}</job-tracker>  
                <name-node>${nameNode}</name-node>
                <configuration>  
                    <property>  
                        <name>mapred.job.queue.name</name>  
                        <value>${queueName}</value>  
                    </property>                  
                </configuration>            
                <master>yarn-cluster</master>
                <mode>cluster</mode>
                <name>Spark Example</name>
                <class>${mainClass}</class>            
                <jar>${jarName}</jar> 
                <spark-opts>${sparkopts}</spark-opts> 
                <arg>${input}</arg>            
            </spark >   
            <ok to="joining"/>
            <error to="fail"/>    
        </action> 
        <action name="secondparalleljob">
             <spark xmlns="uri:oozie:spark-action:0.2">  
                <job-tracker>${jobTracker}</job-tracker>  
                <name-node>${nameNode}</name-node>
                <configuration>  
                    <property>  
                        <name>mapred.job.queue.name</name>  
                        <value>${queueName}</value>  
                    </property>                  
                </configuration>            
                <master>yarn-cluster</master>
                <mode>cluster</mode>
                <name>Spark Example2</name>
                <class>${mainClass2}</class>            
                <jar>${jarName}</jar> 
                <spark-opts>${sparkopts}</spark-opts> 
                <arg>${input}</arg>            
            </spark >  
            <ok to="joining"/>
            <error to="fail"/>    
        </action>   
        <join name="joining" to="end"/>
          <kill name="fail">  
           <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>  
        </kill>  
       <end name="end"/>  
    </workflow-app> 

    job.properties

    nameNode=hdfs://hgdp-001:8020     #hsfs端口地址
    jobTracker=hgdp-001:8032        #resourceManager的端口
    queueName=default            #oozie队列
    input=2017-05-09             #输入参数
    hdfspath=user/root           #自定义目录
    examplesRoot=ocn-itv-oozie      #自定义全局目录
    oozie.use.system.libpath=True    #是否启动系统lib库
    sparkopts=--executor-memory 1G    #参数设置
    start=2017-09-04T00:05+0800    #coordinator任务开始时间
    end=2017-09-04T00:36+0800      #coordinator任务结束时间
    start2=2017-09-01T00:06+0800
    end2=2017-09-04T00:36+0800
    oozie.libpath=${nameNode}/${hdfspath}/${examplesRoot}/lib/          #用户自定义lib库(存放jar包)
    workflowAppUri=${nameNode}/${hdfspath}/${examplesRoot}/wf/spark/fork/
    workflowAppUri2=${nameNode}/${hdfspath}/${examplesRoot}/wf/spark/single/  #coordinator定时调度对应的workflow.xml所在目录
    appPath=${nameNode}/${hdfspath}/${examplesRoot}/cd/single/
    appPath2=${nameNode}/${hdfspath}/${examplesRoot}/cd/single1/        #bundle调用对应的coordinator.xml所在目录
    oozie.bundle.application.path=${nameNode}/${hdfspath}/${examplesRoot}/bd/bd1/    #bundle.xml所在目录
    #一个bundle调用多个coordinator

    最后运行:

      启动任务:oozie job -config job.properties -run -oozie http://xxxx(地址):11000/oozie

     需要注意的地方:

    一.  coordinator中timezone的时区配置

    Cloudera oozie默认时区是UTC,在开发oozie任务时必须在期望执行的时间上减去8小时,不方便。可以修改时区的配置操作。

    1.在oozie的配置文件中添加如下属性:

    <property>

     <name>oozie.processing.timezone</name>

     <value>GMT+0800</value>

    </property>

    2.如果使用了hue,进入Oozie web ui,选择Settings,然后在Timezone里选择CST(Asia/Shanghai)

    3.coordinator中的timeone设置为:timezone="Asia/Shanghai"

    4.修改时间格式,例如:2017-09-05T15:16+0800

    二.oozie.xx.application.path

    oozie.xx.application.path在job.properties里只能有一个。

    workflow:oozie.wf.application.path

    coordinator:oozie.coord.application.path

    bundle:oozie.bundle.application.path

    三.命名及存放位置问题

    其中workflow.xml,coordinator.xml,bundle.xml名字都不可以修改,要放到hdfs目录中,而job.properties名字可以修改,放在本地即可。

    四.关于workflow.xml 中action的问题:

    可以写多个action依次执行,如下示例所示:

    <workflow-app  name="java-example1" xmlns="uri:oozie:workflow:0.5">  
        <start to="java-Action"/>  
        <action name="java-Action">
         ....
            <ok to="java-Action2"/>
            <error to="fail"/>    
        </action> 
        <action name="java-Action2">
           .... 
            <ok to="end"/>
            <error to="fail"/>    
        </action>   
          <kill name="fail">  
           <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>  
        </kill>  
       <end name="end"/>  
    </workflow-app> 

    也可以设置多个任务并发执行,需要添加fork和join节点,fork节点把任务切分成多个并行任务,join则合并多个并行任务。fork和join节点必须是成对出现的。join节点合并的任务,必须是通一个fork出来的子任务才行。示例如下:

    <workflow-app  name="java-example1" xmlns="uri:oozie:workflow:0.5">  
        <start to="forking"/> 
        <fork name="forking">
            <path start="firstparalleljob"/>
            <path start="secondparalleljob"/>
        </fork>    
        <action name="firstparalleljob">
                .....
            <ok to="joining"/>
            <error to="fail"/>    
        </action> 
        <action name="secondparalleljob">
                ....
            <ok to="joining"/>
            <error to="fail"/>    
        </action>   
        <join name="joining" to="end"/>
          <kill name="fail">  
           <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>  
        </kill>  
       <end name="end"/>  
    </workflow-app> 

    五.<workflow-app name="java-example1" xmlns="uri:oozie:workflow:0.5"> 中 Oozie Schema Version 的问题

    如果设置workflow:0.2,那么就不会显示wofkflow的Graph视图,设置为0.5便可显示。如下图所示:

  • 相关阅读:
    (三) 权限控制 --《springboot与shiro整合》
    (二) shiro集成 --《springboot与shiro整合》
    (一) springboot 项目搭建 --《springboot与shiro整合》
    第四章 编码/加密(学习笔记)
    第三章 授权(学习笔记)
    第二章 身份验证(学习笔记)
    获取小程序码java实现
    paypal退款 java实现
    并发下的数据处理和优化
    Casperjs初体验
  • 原文地址:https://www.cnblogs.com/jjSmileEveryDay/p/7472393.html
Copyright © 2011-2022 走看看