zoukankan      html  css  js  c++  java
  • Oozie分布式任务的工作流——Spark篇

    Spark是现在应用最广泛的分布式计算框架,oozie支持在它的调度中执行spark。在我的日常工作中,一部分工作就是基于oozie维护好每天的spark离线任务,合理的设计工作流并分配适合的参数对于spark的稳定运行十分重要。

    Spark Action

    这个Action允许执行spark任务,需要用户指定job-tracker以及name-node。先看看语法规则:

    语法规则

    <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3">
        ...
        <action name="[NODE-NAME]">
            <spark xmlns="uri:oozie:spark-action:0.1">
                <job-tracker>[JOB-TRACKER]</job-tracker>
                <name-node>[NAME-NODE]</name-node>
                <prepare>
                   <delete path="[PATH]"/>
                   ...
                   <mkdir path="[PATH]"/>
                   ...
                </prepare>
                <job-xml>[SPARK SETTINGS FILE]</job-xml>
                <configuration>
                    <property>
                        <name>[PROPERTY-NAME]</name>
                        <value>[PROPERTY-VALUE]</value>
                    </property>
                    ...
                </configuration>
                <master>[SPARK MASTER URL]</master>
                <mode>[SPARK MODE]</mode>
                <name>[SPARK JOB NAME]</name>
                <class>[SPARK MAIN CLASS]</class>
                <jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar>
                <spark-opts>[SPARK-OPTIONS]</spark-opts>
                <arg>[ARG-VALUE]</arg>
                    ...
                <arg>[ARG-VALUE]</arg>
                ...
            </spark>
            <ok to="[NODE-NAME]"/>
            <error to="[NODE-NAME]"/>
        </action>
        ...
    </workflow-app>
    

    prepare元素

    它里面可以执行删除文件或者创建目录的操作,比如

    <delete path="hdfs://xxxx/a"/>
    <mkdir path="hdfs://xxxx"/>
    

    一般来说,离线的spark任务最重都会生成一些数据,这些数据可能存储到数据库中,也可能直接存储到hdfs,如果存储到hdfs就涉及到清除目录了。比如你可能在测试环境需要频繁的重复运行spark任务,那么每次都需要清除目录文件,创建新的目录才行。

    job-xml

    spark 任务的参数也可以放在job-xml所在的xml中。

    confugration

    这里面的配置的参数将会传递给spark任务。

    master

    spark运行的模式,表示spark连接的集群管理器。默认可以使spark的独立集群(spark://host:port)或者是mesos(mesos://host:port)或者是yarn(yarn),以及本地模式local

    mode

    因为spark任务也可以看做主节点和工作节点模式,主节点就是驱动程序。这个驱动程序既可以运行在提交任务的机器,也可以放在集群中运行。

    这个参数就是用来设置,驱动程序是以客户端的形式运行在本地机器,还是以集群模式运行在集群中。

    name

    spark应用的名字

    class

    spark应用的主函数

    jar

    spark应用的jar包

    spark-opts

    提交给驱动程序的参数。比如--conf key=value或者是在oozie-site.xml中配置的oozie.service.SparkConfiguationService.spark.configurations

    arg

    这个参数是用来提交给spark应用的参数

    例子

    官网给出的例子:

    <workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
        ...
        <action name="myfirstsparkjob">
            <spark xmlns="uri:oozie:spark-action:0.1">
                <job-tracker>foo:8021</job-tracker>
                <name-node>bar:8020</name-node>
                <prepare>
                    <delete path="${jobOutput}"/>
                </prepare>
                <configuration>
                    <property>
                        <name>mapred.compress.map.output</name>
                        <value>true</value>
                    </property>
                </configuration>
                <master>local[*]</master>
                <mode>client<mode>
                <name>Spark Example</name>
                <class>org.apache.spark.examples.mllib.JavaALS</class>
                <jar>/lib/spark-examples_2.10-1.1.0.jar</jar>
                <spark-opts>--executor-memory 20G --num-executors 50</spark-opts>
                <arg>inputpath=hdfs://localhost/input/file.txt</arg>
                <arg>value=2</arg>
            </spark>
            <ok to="myotherjob"/>
            <error to="errorcleanup"/>
        </action>
        ...
    </workflow-app>
    

    我自己工作时的例子:

    <action name="NODE1">
    	<spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>cluster</mode>
            <name>NODE1</name>
            <class>com.test.NODE1_Task</class>
            <jar>/lib/dw.jar</jar>
            <spark-opts>--executor-memory 1G --num-executors 6 --executor-cores 1 --conf spark.storage.memoryFraction=0.8</spark-opts>
            <arg>参数1</arg>
            <arg>参数2</arg>
            <arg>参数3</arg>
        </spark>
    </action>
    

    日志

    spark action日志会重定向到oozie的mapr启动程序的stdout/stderr中。

    通过oozie的web控制条,可以看到spark的日志。

    spark on yarn

    如果想要把spark运行在yarn上,需要按照下面的步骤执行:

    • 在spark action中加载spark-assembly包
    • 指定master为yarn-client或者yarn-master

    为了确保spark工作在spark历史服务器中可以查到,需要保证在--conf中或者oozie.service.SparkConfiturationService.spark.configrations中设置下面的三个参数:

    • spark.yarn.historyServer.address=http://spark-host:18088
    • spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory
    • spark.eventLog.enabled=true

    spark action的schema

    <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
               xmlns:spark="uri:oozie:spark-action:0.1" elementFormDefault="qualified"
               targetNamespace="uri:oozie:spark-action:0.1">    <xs:element name="spark" type="spark:ACTION"/>
        <xs:complexType name="ACTION">
            <xs:sequence>
                <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/>
                <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/>
                <xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/>
                <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
                <xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
                <xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/>
                <xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/>
                <xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/>
                <xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/>
                <xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/>
                <xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/>
                <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            </xs:sequence>
        </xs:complexType>
        <xs:complexType name="CONFIGURATION">
            <xs:sequence>
                <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
                    <xs:complexType>
                        <xs:sequence>
                            <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
                            <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
                            <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
                        </xs:sequence>
                    </xs:complexType>
                </xs:element>
            </xs:sequence>
        </xs:complexType>
        <xs:complexType name="PREPARE">
            <xs:sequence>
                <xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/>
                <xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
            </xs:sequence>
        </xs:complexType>
        <xs:complexType name="DELETE">
            <xs:attribute name="path" type="xs:string" use="required"/>
        </xs:complexType>
        <xs:complexType name="MKDIR">
            <xs:attribute name="path" type="xs:string" use="required"/>
        </xs:complexType>
    </xs:schema>
    
  • 相关阅读:
    VS2008 环境中完美搭建 Qt 4.7.4 静态编译的调试与发布 Inchroy's Blog 博客频道 CSDN.NET
    编写可丢弃的代码
    c++ using namespace std; 海明威 博客园
    解决MySQL server has gone away
    nginx upstream 调度策略
    (2006, 'MySQL server has gone away') 错误解决 dba007的空间 51CTO技术博客
    Linux IO模型漫谈(2) 轩脉刃 博客园
    redis源码笔记 initServer 刘浩de技术博客 博客园
    MySQLdb批量插入数据
    词库的扩充百度百科的抓取你知道这些热词吗? rabbit9898 ITeye技术网站
  • 原文地址:https://www.cnblogs.com/xing901022/p/6216456.html
Copyright © 2011-2022 走看看