zoukankan      html  css  js  c++  java
  • Oozie分布式工作流——从理论和实践分析使用节点间的参数传递

    Oozie支持Java Action,因此可以自定义很多的功能。本篇就从理论和实践两方面介绍下Java Action的妙用,另外还涉及到oozie中action之间的参数传递。

    本文大致分为以下几个部分:

    • Java Action教程文档
    • 自定义Java Action实践
    • 从源码的角度讲解Java Action与Shell Action的参数传递。

    如果你即将或者想要使用oozie,那么本篇的文章将会为你提供很多参考的价值。

    Java Action文档

    java action会自动执行提供的java classpublic static void main方法, 并且会在hadoop集群启动一个单独的map-reduce的map任务来执行的。因此,如果你自定义了一个java程序,它会提交到集群的某一个节点执行,不会每个节点都执行一遍。

    workflow任务会等待java程序执行完继续执行下一个action。当java类正确执行退出后,将会进入ok控制流;当发生异常时,将会进入error控制流。Java程序绝对不能使用System.exit(int n)将会导致action进入error控制流。

    在action的配置中,也支持EL表达式。并且使用<capture-output>也可以把数据输出出来,然后后面的action就可以基于EL表达式使用了。

    语法规则

    <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
        ...
        <action name="[NODE-NAME]">
            <java>
                <job-tracker>[JOB-TRACKER]</job-tracker>
                <name-node>[NAME-NODE]</name-node>
                <prepare>
                   <delete path="[PATH]"/>
                   ...
                   <mkdir path="[PATH]"/>
                   ...
                </prepare>
                <job-xml>[JOB-XML]</job-xml>
                <configuration>
                    <property>
                        <name>[PROPERTY-NAME]</name>
                        <value>[PROPERTY-VALUE]</value>
                    </property>
                    ...
                </configuration>
                <main-class>[MAIN-CLASS]</main-class>
    			<java-opts>[JAVA-STARTUP-OPTS]</java-opts>
    			<arg>ARGUMENT</arg>
                ...
                <file>[FILE-PATH]</file>
                ...
                <archive>[FILE-PATH]</archive>
                ...
                <capture-output />
            </java>
            <ok to="[NODE-NAME]"/>
            <error to="[NODE-NAME]"/>
        </action>
        ...
    </workflow-app>
    

    prepare元素,支持创建或者删除指定的文件内容。在delete时,支持通配的方式指定特定的路径。java-opts以及java-opt参数提供了执行java应用时分配的JVM。

    举个例子:

    <workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
        ...
        <action name="myfirstjavajob">
            <java>
                <job-tracker>foo:8021</job-tracker>
                <name-node>bar:8020</name-node>
                <prepare>
                    <delete path="${jobOutput}"/>
                </prepare>
                <configuration>
                    <property>
                        <name>mapred.queue.name</name>
                        <value>default</value>
                    </property>
                </configuration>
                <main-class>org.apache.oozie.MyFirstMainClass</main-class>
                <java-opts>-Dblah</java-opts>
    			<arg>argument1</arg>
    			<arg>argument2</arg>
            </java>
            <ok to="myotherjob"/>
            <error to="errorcleanup"/>
        </action>
        ...
    </workflow-app>
    

    覆盖Main方法

    oozie中的很多action都支持这个功能,在configure中指定classpath下的一个类方法,它会覆盖当前action的main方法。这在不想重新编译jar包,而想替换程序时,非常有用。

    自定义Java action程序以及部署

    Java程序可以任意定义,比如写一个最简单的hellword,然后打包成lib。

    然后需要定义oozie脚本:

    <action name="java-7cbb">
        <java>
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <configuration>
                    <property>
                        <name>mapred.job.queue.name</name>
                        <value>default</value>
                    </property>
                </configuration>
                <main-class>a.b.c.Main</main-class>
                <arg>arg1</arg>
                <arg>arg2</arg>
                <file>/oozie/lib/ojdbc7.jar#ojdbc7.jar</file>
                <capture-output/>
            </java>
            <ok to="end"/>
            <error to="Kill"/>
        </action>
    

    其中几个比较重要的属性,千万不能拉下:

    • 1 需要指定Map-reduce的队列:mapred.job.queue.name
    • 2 指定Main class<main-class>
    • 3 如果依赖其他的jar,需要添加<file>
    • 4 如果想要捕获输出,需要设置<capture-output>

    如果使用HUE图形化配置,就比较简单了:

    点击右上角的齿轮,配置其他的属性信息:

    基于源码分析参数传递

    先从表象来说一下shell action如何传递参数:

    你只需要定义一个普通的shell,在里面使用echo把属性输出出来即可,后面的action自动就可以基于EL表达式使用。

    test='test123'
    echo "test=$test"
    

    这样后面的action就可以直接使用了:

    ${wf:actionData('action-name').test}或者${wf:actionData('action-name')['test']}
    

    很简单是吧!

    在Java里面就没这么容易了:

    无论是 System.out.println() 还是 logger.info/error,都无法捕获到数据
    

    上网找了一篇文章,备受启发

    从中抄了一段代码:

    private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
    ...
    String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
            if (oozieProp != null) {
                File propFile = new File(oozieProp);
                Properties props = new Properties();
                props.setProperty(propKey0, propVal0);
                props.setProperty(propKey1, propVal1);
                OutputStream os = new FileOutputStream(propFile);
                props.store(os, "");
                os.close();
            } else
                throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES + " System property not defined");
    

    果然就好用了....

    为了理解其中的缘由,我们来看看代码。首先在shell action中发现一句话:

    <<< Invocation of Main class completed <<<
    
    
    Oozie Launcher, capturing output data:
    =======================
    

    于是全局搜索,果然找到对应的代码,在org.apache.oozie.action.hadoop.LuancherMapper.java中,line275开始:

    if (errorMessage == null) {
        handleActionData();
        if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) {
            System.out.println();
            System.out.println("Oozie Launcher, capturing output data:");
            System.out.println("=======================");
            System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS));
            System.out.println();
            System.out.println("=======================");
            System.out.println();
        }
        。。。
    }
    

    这里的actionData其实就是个普通的MAP

    private Map<String,String> actionData;
    
    public LauncherMapper() {
        actionData = new HashMap<String,String>();
    }
    

    Map里面保存了很多属性值,其中就包括我们想要捕获的输出内容:

        static final String ACTION_PREFIX = "oozie.action.";
        static final String ACTION_DATA_OUTPUT_PROPS = "output.properties";
    
        ...
        String outputProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS);
        if (outputProp != null) {
            File actionOutputData = new File(outputProp);
            if (actionOutputData.exists()) {
                int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
                actionData.put(ACTION_DATA_OUTPUT_PROPS,
                    getLocalFileContentStr(actionOutputData, "Output", maxOutputData));
            }
        }
        ....
        
        public static String getLocalFileContentStr(File file, String type, int maxLen) throws LauncherException, IOException {
            StringBuffer sb = new StringBuffer();
            FileReader reader = new FileReader(file);
            char[] buffer = new char[2048];
            int read;
            int count = 0;
            while ((read = reader.read(buffer)) > -1) {
                count += read;
                if (maxLen > -1 && count > maxLen) {
                    throw new LauncherException(type + " data exceeds its limit ["+ maxLen + "]");
                }
                sb.append(buffer, 0, read);
            }
            reader.close();
            return sb.toString();
        }
    

    可以看到其实就是从oozie.action.output.properties指定的目录里面去读内容,然后输出出来,后面的action就可以用了。这就是为什么上面抄的那段代码可以使用的原因。

    那么问题是,shell为什么直接echo就行,java里面却要这么费劲?

    别急,先来看看java action的启动逻辑:

        public static void main(String[] args) throws Exception {
            run(JavaMain.class, args);
        }
    
        @Override
        protected void run(String[] args) throws Exception {
            ...
            Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class);
            ...
            Method mainMethod = klass.getMethod("main", String[].class);
            try {
                mainMethod.invoke(null, (Object) args);
            } catch(InvocationTargetException ex) {
                // Get rid of the InvocationTargetException and wrap the Throwable
                throw new JavaMainException(ex.getCause());
            }
        }
    

    它什么也没做,就是启动了目标类的main方法而已。

    再来看看shell:

        private int execute(Configuration actionConf) throws Exception {
            ...
            //判断是否要捕获输出
            boolean captureOutput = actionConf.getBoolean(CONF_OOZIE_SHELL_CAPTURE_OUTPUT, false);
    
            //执行命令
            Process p = builder.start();
    
            //处理进程
            Thread[] thrArray = handleShellOutput(p, captureOutput);
    
            ...
            return exitValue;
        }
        protected Thread[] handleShellOutput(Process p, boolean captureOutput)
                throws IOException {
            BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
            BufferedReader error = new BufferedReader(new InputStreamReader(p.getErrorStream()));
    
            // 捕获标准输出
            OutputWriteThread thrStdout = new OutputWriteThread(input, true, captureOutput);
            thrStdout.setDaemon(true);
            thrStdout.start();
    
            OutputWriteThread thrStderr = new OutputWriteThread(error, false, false);
            thrStderr.setDaemon(true);
            thrStderr.start();
    
            return new Thread[]{ thrStdout, thrStderr };
        }
        
        class OutputWriteThread extends Thread {
            ...
    
            @Override
            public void run() {
                String line;
                BufferedWriter os = null;
    
                //读取数据保存在目标文件中
    
                try {
                    if (needCaptured) {
                        File file = new File(System.getProperty(LauncherMapper.ACTION_PREFIX + LauncherMapper.ACTION_DATA_OUTPUT_PROPS));
                        os = new BufferedWriter(new FileWriter(file));
                    }
                    while ((line = reader.readLine()) != null) {
                        if (isStdout) { // For stdout
                            // 1. Writing to LM STDOUT
                            System.out.println("Stdoutput " + line);
                            // 2. Writing for capture output
                            if (os != null) {
                                if (Shell.WINDOWS) {
                                    line = line.replace("\u", "\\u");
                                }
                                os.write(line);
                                os.newLine();
                            }
                        }
                        else {
                            System.err.println(line); // 1. Writing to LM STDERR
                        }
                    }
                }
                catch (IOException e) {
                    ...
                }finally {
                    ...
                }
            }
        }
    

    这样就很清晰了,shell自动帮我们把输出的内容写入了oozie.action.output.properties文件中。而在java中则需要用户自己来定义写入的过程。

    后续将会介绍一下oozie中比较高级的用法——EL表达式

  • 相关阅读:
    setsid
    dup
    信号量
    linux标准输入输出
    linux守护进程范例
    c++字符串操作
    浏览器缓存
    bfc
    苹果手机自制铃声
    vue-cli 源码解读
  • 原文地址:https://www.cnblogs.com/xing901022/p/6501448.html
Copyright © 2011-2022 走看看