概述
Spark Action 运行一个spark作业;
工作流作业将会等待spark作业完成之后才继续执行下一个操作;
为了运行spark作业,你必须给spark action 配置resource-manager
, name-node
, Spark master
xml元素和一些必要的元素,命令行参数,spark的可选配置都被指定在spark-opts这个元素里设置;
一个spark action 在启动spark作业之前可以配置删除或者创建hdfs文件目录;
Oozie EL表达式可以在内联配置中使用。配置元素中指定的属性值将覆盖job-xml文件中指定的值;
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:1.0">
...
<action name="[NODE-NAME]">
<spark xmlns="uri:oozie:spark-action:1.0">
<resource-manager>[RESOURCE-MANAGER]</resource-manager>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<job-xml>[SPARK SETTINGS FILE]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<master>[SPARK MASTER URL]</master>
<mode>[SPARK MODE]</mode>
<name>[SPARK JOB NAME]</name>
<class>[SPARK MAIN CLASS]</class>
<jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar>
<spark-opts>[SPARK-OPTIONS]</spark-opts>
<arg>[ARG-VALUE]</arg>
...
<arg>[ARG-VALUE]</arg>
...
</spark>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>
<delete path="hdfs://xxxx/a"/> <mkdir path="hdfs://xxxx"/>
-
class元素
spark应用的主函数
-
spark-opts元素
实践(oozie-5.0.0-cdh6.0.1)
1. 准备job.properties和workflow.xml文件内容编写
-
job.properties
# NN Yarn
nameNode=hdfs://NN:8020
resourceManager=node1:8032
# spark
master=yarn
deployMode=cluster
queueName=root.default
appName=Spark-Wordcount
class=com.sls.sonar.tmp.ColumnValueCount
jar=${nameNode}/tmp/spark2/statistics-1.0-SNAPSHOT-jar-with-dependencies.jar
sparkOpts=--queue root.thequeue --conf spark.yarn.historyServer.address=http://node03.localdomain:18088 --conf spark.eventLog.dir=${nameNode}/user/spark/applicationHistory --conf spark.eventLog.enabled=true
commandLineArg1=namespace:tableName
# oozie
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/tmp/spark/workflow.xml
- workflow.xml
<workflow-app xmlns="uri:oozie:workflow:1.0" name="SparkWordCount"> <start to="spark-node"/> <action name="spark-node"> <spark xmlns="uri:oozie:spark-action:1.0"> <resource-manager>${resourceManager}</resource-manager> <name-node>${nameNode}</name-node> <master>${master}</master> <mode>${deployMode}</mode> <name>${appName}</name> <class>${class}</class> <jar>${jar}</jar> <spark-opts>${sparkOpts}</spark-opts> <arg>${commandLineArg1}</arg> </spark> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>
2. 将workflow.xml和jar上传到指定的hdfs目录
3. 执行提交workflow作业命令
oozie job --oozie http://localhost:11000/oozie/ -config /home/datauser/oozie_example/job.properties -run
-
进入Oozie Editor界面
query=》Scheduler=》workflow
-
任务图标栏
任务图标栏最前面选择ACTIONS,默认的是DOCUMENTS
-
拖动要调度的任务对应的图标到指定位置
-
配置对应任务的参数
Spark on Yarn
为了确保spark工作在spark历史服务器中可以查到,需要在spark-opts标签元素用--conf指定或者在oozie-site.xml中oozie.service.SparkConfiturationService.spark.configrations
中指定下面的三个参数:
-
spark.yarn.historyServer.address=http://spark-host:18088
-
spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory
-
spark.eventLog.enabled=true
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:spark="uri:oozie:spark-action:1.0" elementFormDefault="qualified"
targetNamespace="uri:oozie:spark-action:1.0">
.
<xs:include schemaLocation="oozie-common-1.0.xsd"/>
.
<xs:element name="spark" type="spark:ACTION"/>
.
<xs:complexType name="ACTION">
<xs:sequence>
<xs:choice>
<xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="resource-manager" type="xs:string" minOccurs="0" maxOccurs="1"/>
</xs:choice>
<xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/>
<xs:element name="launcher" type="spark:LAUNCHER" minOccurs="0" maxOccurs="1"/>
<xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
<xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
.
</xs:schema>