zoukankan      html  css  js  c++  java
  • 教你如何在Spark Scala/Java应用中调用Python脚本

    摘要:本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java程序调用的过程也大体相同。

    本文分享自华为云社区《【Spark】如何在Spark Scala/Java应用中调用Python脚本》,作者: 小兔子615 。

    1.PythonRunner

    对于运行与 JVM 上的程序(即Scala、Java程序),Spark 提供了 PythonRunner 类。只需要调用PythonRunner 的main方法,就可以在Scala或Java程序中调用Python脚本。在实现上,PythonRunner 基于py4j ,通过构造GatewayServer实例让python程序通过本地网络socket来与JVM通信。

    // Launch a Py4J gateway server for the process to connect to; this will let it see our
        // Java system properties and such
        val localhost = InetAddress.getLoopbackAddress()
        val gatewayServer = new py4j.GatewayServer.GatewayServerBuilder()
          .authToken(secret)
          .javaPort(0)
          .javaAddress(localhost)
          .callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
          .build()
        val thread = new Thread(new Runnable() {
          override def run(): Unit = Utils.logUncaughtExceptions {
            gatewayServer.start()
          }
        })
        thread.setName("py4j-gateway-init")
        thread.setDaemon(true)
        thread.start()
     
        // Wait until the gateway server has started, so that we know which port is it bound to.
        // `gatewayServer.start()` will start a new thread and run the server code there, after
        // initializing the socket, so the thread started above will end as soon as the server is
        // ready to serve connections.
        thread.join()

    在启动GatewayServer后,再通过ProcessBuilder构造子进程执行Python脚本,等待Python脚本执行完成后,根据exitCode判断是否执行成功,若执行失败则抛出异常,最后关闭gatewayServer。

     // Launch Python process
        val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
        try {
          val process = builder.start()
     
          new RedirectThread(process.getInputStream, System.out, "redirect output").start()
     
          val exitCode = process.waitFor()
          if (exitCode != 0) {
            throw new SparkUserAppException(exitCode)
          }
        } finally {
          gatewayServer.shutdown()
        }

    2.调用方法

    2.1 调用代码

    PythonRunner的main方法中需要传入三个参数:

    • pythonFile:执行的python脚本
    • pyFiles:需要添加到PYTHONPATH的其他python脚本
    • otherArgs:传入python脚本的参数数组
    val pythonFile = args(0)
        val pyFiles = args(1)
        val otherArgs = args.slice(2, args.length)

    具体样例代码如下,scala样例代码:

    package com.huawei.bigdata.spark.examples
     
    import org.apache.spark.deploy.PythonRunner
    import org.apache.spark.sql.SparkSession
     
    object RunPythonExample {
      def main(args: Array[String]) {
        val pyFilePath = args(0)
        val pyFiles = args(1)
        val spark = SparkSession
          .builder()
          .appName("RunPythonExample")
          .getOrCreate()
     
        runPython(pyFilePath, pyFiles)
     
        spark.stop()
      }
     
      def runPython(pyFilePath: String, pyFiles :String) : Unit = {
        val inputPath = "-i /input"
        val outputPath = "-o /output"
        PythonRunner.main(Array(pyFilePath, pyFiles, inputPath, outputPath))
      }
    }

    python样例代码:

    #!/usr/bin/env python
    # coding: utf-8
    import sys
    import argparse
     
    argparser = argparse.ArgumentParser(description="ParserMainEntrance")
    argparser.add_argument('--input', '-i', help="input path", default=list(), required=True)
    argparser.add_argument('--output', '-o', help="output path", default=list(), required=True)
    arglist = argparser.parse_args()
     
    def getTargetPath(input_path, output_path):
        try:
            print("input path: {}".format(input_path))
            print("output path: {}".format(output_path))
            return True
        except Exception as ex:
            print("error with: {}".format(ex))
            return False
     
    if __name__ == "__main__":
        ret = getTargetPath(arglist.input, arglist.output)
        if ret:
            sys.exit(0)
        else:
            sys.exit(1)

    2.2 运行命令

    ​ 执行python脚本需要设置pythonExec,即执行python脚本所使用的执行环境。默认情况下,使用的执行器为python(Spark 2.4 及以下)或 python3 (Spark 3.0 及以上)。

    //Spark 2.4.5
        val sparkConf = new SparkConf()
        val secret = Utils.createSecret(sparkConf)
        val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
          .orElse(sparkConf.get(PYSPARK_PYTHON))
          .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
          .orElse(sys.env.get("PYSPARK_PYTHON"))
          .getOrElse("python")
     
        //Spark 3.1.1
        val sparkConf = new SparkConf()
        val secret = Utils.createSecret(sparkConf)
        val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
          .orElse(sparkConf.get(PYSPARK_PYTHON))
          .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
          .orElse(sys.env.get("PYSPARK_PYTHON"))
          .getOrElse("python3")

    如果要手动指定pythonExec,需要在执行前设置环境变量(无法通过spark-defaults传入)。在cluster模式下,可以通过 --conf “spark.executorEnv.PYSPARK_PYTHON=python3” --conf “spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3” 设置。driver端还可以通过export PYSPARK_PYTHON=python3 设置环境变量。
    ​ 若需要上传pyhton包,可以通过 --archive python.tar.gz 的方式上传。

    ​ 为了使应用能够获取到py脚本文件,还需要在启动命令中添加 --file pythonFile.py 将python脚本上传到 yarn 上。

    ​ 运行命令参考如下:

    spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --conf "spark.executorEnv.PYSPARK_PYTHON=python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3" /usr/local/test.jar test.py test.py
    如果需要使用其他python环境,而非节点上已安装的,可以通过 --archives 上传python压缩包,再通过环境变量指定pythonExec,例如:
    spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExam
     

    点击关注,第一时间了解华为云新鲜技术~

  • 相关阅读:
    Oracle 按一行里某个字段里的值分割成多行进行展示
    Property or method "openPageOffice" is not defined on the instance but referenced during render. Make sure that this property is reactive, either in the data option, or for class-based components, by
    SpringBoot 项目启动 Failed to convert value of type 'java.lang.String' to required type 'cn.com.goldenwater.dcproj.dao.TacPageOfficePblmListDao';
    Maven 设置阿里镜像
    JS 日期格式化,留作参考
    JS 过滤数组里对象的某个属性
    原生JS实现简单富文本编辑器2
    Chrome控制台使用详解
    android权限(permission)大全
    不借助第三方网站四步实现手机网站转安卓APP
  • 原文地址:https://www.cnblogs.com/huaweiyun/p/15524796.html
Copyright © 2011-2022 走看看