摘要:本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java程序调用的过程也大体相同。

本文分享自华为云社区《【Spark】如何在Spark Scala/Java应用中调用Python脚本》,作者: 小兔子615 。

1.PythonRunner

对于运行与 JVM 上的程序(即Scala、Java程序),Spark 提供了 PythonRunner 类。只需要调用PythonRunner 的main方法,就可以在Scala或Java程序中调用Python脚本。在实现上,PythonRunner 基于py4j ,通过构造GatewayServer实例让python程序通过本地网络socket来与JVM通信。

// Launch a Py4J gateway server for the process to connect to; this will let it see our// Java system properties and suchval localhost = InetAddress.getLoopbackAddress()val gatewayServer = new py4j.GatewayServer.GatewayServerBuilder().authToken(secret).javaPort(0).javaAddress(localhost).callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret).build()val thread = new Thread(new Runnable() {override def run(): Unit = Utils.logUncaughtExceptions {gatewayServer.start()}})thread.setName("py4j-gateway-init")thread.setDaemon(true)thread.start()// Wait until the gateway server has started, so that we know which port is it bound to.// `gatewayServer.start()` will start a new thread and run the server code there, after// initializing the socket, so the thread started above will end as soon as the server is// ready to serve connections.thread.join()

在启动GatewayServer后,再通过ProcessBuilder构造子进程执行Python脚本,等待Python脚本执行完成后,根据exitCode判断是否执行成功,若执行失败则抛出异常,最后关闭gatewayServer。

    // Launch Python processval builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)try {val process = builder.start()new RedirectThread(process.getInputStream, System.out, "redirect output").start()val exitCode = process.waitFor()if (exitCode != 0) {throw new SparkUserAppException(exitCode)}} finally {gatewayServer.shutdown()}

2.调用方法

2.1 调用代码

PythonRunner的main方法中需要传入三个参数:

  • pythonFile:执行的python脚本
  • pyFiles:需要添加到PYTHONPATH的其他python脚本
  • otherArgs:传入python脚本的参数数组
    val pythonFile = args(0)val pyFiles = args(1)val otherArgs = args.slice(2, args.length)

具体样例代码如下,scala样例代码:

package com.huawei.bigdata.spark.examplesimport org.apache.spark.deploy.PythonRunner
import org.apache.spark.sql.SparkSessionobject RunPythonExample {def main(args: Array[String]) {val pyFilePath = args(0)val pyFiles = args(1)val spark = SparkSession.builder().appName("RunPythonExample").getOrCreate()runPython(pyFilePath, pyFiles)spark.stop()}def runPython(pyFilePath: String, pyFiles :String) : Unit = {val inputPath = "-i /input"val outputPath = "-o /output"PythonRunner.main(Array(pyFilePath, pyFiles, inputPath, outputPath))}
}

python样例代码:

#!/usr/bin/env python
# coding: utf-8
import sys
import argparseargparser = argparse.ArgumentParser(description="ParserMainEntrance")
argparser.add_argument('--input', '-i', help="input path", default=list(), required=True)
argparser.add_argument('--output', '-o', help="output path", default=list(), required=True)
arglist = argparser.parse_args()def getTargetPath(input_path, output_path):try:print("input path: {}".format(input_path))print("output path: {}".format(output_path))return Trueexcept Exception as ex:print("error with: {}".format(ex))return Falseif __name__ == "__main__":ret = getTargetPath(arglist.input, arglist.output)if ret:sys.exit(0)else:sys.exit(1)

2.2 运行命令

执行python脚本需要设置pythonExec,即执行python脚本所使用的执行环境。默认情况下,使用的执行器为python(Spark 2.4 及以下)或 python3 (Spark 3.0 及以上)。

    //Spark 2.4.5val sparkConf = new SparkConf()val secret = Utils.createSecret(sparkConf)val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON).orElse(sparkConf.get(PYSPARK_PYTHON)).orElse(sys.env.get("PYSPARK_DRIVER_PYTHON")).orElse(sys.env.get("PYSPARK_PYTHON")).getOrElse("python")//Spark 3.1.1val sparkConf = new SparkConf()val secret = Utils.createSecret(sparkConf)val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON).orElse(sparkConf.get(PYSPARK_PYTHON)).orElse(sys.env.get("PYSPARK_DRIVER_PYTHON")).orElse(sys.env.get("PYSPARK_PYTHON")).getOrElse("python3")

如果要手动指定pythonExec,需要在执行前设置环境变量(无法通过spark-defaults传入)。在cluster模式下,可以通过 --conf “spark.executorEnv.PYSPARK_PYTHON=python3” --conf “spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3” 设置。driver端还可以通过export PYSPARK_PYTHON=python3 设置环境变量。
​ 若需要上传pyhton包,可以通过 --archive python.tar.gz 的方式上传。

为了使应用能够获取到py脚本文件,还需要在启动命令中添加 --file pythonFile.py 将python脚本上传到 yarn 上。

​ 运行命令参考如下:

spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --conf "spark.executorEnv.PYSPARK_PYTHON=python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3" /usr/local/test.jar test.py test.py

如果需要使用其他python环境,而非节点上已安装的,可以通过 --archives 上传python压缩包,再通过环境变量指定pythonExec,例如:

spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --archives /usr/local/python.tar.gz#myPython --conf "spark.executorEnv.PYSPARK_PYTHON=myPython/bin/python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=myPython/bin/python3" /usr/local/test.jar test.py test.py

点击关注,第一时间了解华为云新鲜技术~

在Spark Scala/Java应用中调用Python脚本,会么?相关推荐

  1. 教你如何在Spark Scala/Java应用中调用Python脚本

    摘要:本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java程序调用的过程也大体相同. 本文分享自华为云社区<[Spark]如何在Spark Scala/ ...

  2. Mac笔记本中是用Idea开发工具在Java项目中调用python脚本遇到的环境变量问题解决...

    问题描述: mac笔记本本身会自带几个python版本,比如python2.7版本,我没有改动mac默认的python版本,只是安装了python3.7版本. 使用Pycharm开发Python项目没 ...

  3. java执行python脚本_使用Runtime.getRuntime().exec()在java中调用python脚本

    举例有一个Python脚本叫test.py,现在想要在Java里调用这个脚本.假定这个test.py里面使用了拓展的包,使得pythoninterpreter之类内嵌的编译器无法使用,那么只能采用ja ...

  4. 【Python】如何在Excel中调用Python脚本,实现数据自动化处理

    这次我们会介绍如何使用xlwings将Python和Excel两大数据工具进行集成,更便捷地处理日常工作. 说起Excel,那绝对是数据处理领域王者般的存在,尽管已经诞生三十多年了,现在全球仍有7.5 ...

  5. 如何在Excel中调用Python脚本,实现数据自动化处理!

    大家好, 这次我们会介绍如何使用xlwings将Python和Excel两大数据工具进行集成,更便捷地处理日常工作. 说起Excel,那绝对是数据处理领域王者般的存在,尽管已经诞生三十多年了,现在全球 ...

  6. 如何在Excel中调用Python脚本,实现数据自动化处理

    这次我们会介绍如何使用xlwings将Python和Excel两大数据工具进行集成,更便捷地处理日常工作. 说起Excel,那绝对是数据处理领域王者般的存在,尽管已经诞生三十多年了,现在全球仍有7.5 ...

  7. 在Excel中调用Python脚本,实现数据自动化处理

    说起Excel,那绝对是数据处理领域王者般的存在,尽管已经诞生三十多年了,现在全球仍有7.5亿忠实用户,而作为网红语言的Python,也仅仅只有700万的开发人员. Excel是全世界最流行的编程语言 ...

  8. c++中调用python脚本提示 error LNK2001: 无法解析的外部符号 __imp_Py_Initialize等错误的解决方法

    c++中调用python脚本提示 error LNK2001: 无法解析的外部符号 __imp_Py_Initialize等错误的解决方法 时间:2017-05-09 12:32:06阅读:234评论 ...

  9. excel调用python编程-如何在excel中调用python脚本

    如何在excel中调用python脚本 发布时间:2020-07-03 14:15:28 来源:亿速云 阅读:155 如何在excel中调用python脚本?针对这个问题,这篇文章详细介绍了相对应的分 ...

最新文章

  1. 做技术到底可以做到哪种地步-技术为什么越走越窄 (转)
  2. 中科院自动化所招AI算法实习生!
  3. CLEARTEXT communication to xxx not permitted by network security policy
  4. iOS逆向之深入解析如何使用Theos开发插件
  5. 如何给Wordpress安装插件
  6. Python下基于requests及BeautifulSoup构建网络爬虫
  7. 数据库系统原理选择题
  8. 多生产者_【并发那些事】生产者消费者问题
  9. verilog中的initial块、always块详细解释
  10. Ubuntu16.04安装卸载MongoDB
  11. centos7 firewalld
  12. 如何正确获取安卓外置SD卡的路径
  13. Netty5+Jboss(Marshalling)完成对象序列化传输
  14. Matlab imfilter函数
  15. 理解Mybatis一级缓存,以及如何真正使用到一级缓存
  16. 英文文献检索网站(转)
  17. try {}里有一个return语句,那么紧跟在这个try后的finally {}里的code会不会被执行,什么时候被执行,在return前还是后
  18. [软件工程] 面向对象设计
  19. UUID太长怎么办?快来试试NanoId(Java版本)
  20. [jvm-sandbox] 多个agent并用

热门文章

  1. 1:1 人脸比对 开源_在开源周宣布青年:1月13日至17日
  2. es6 实例:Web 服务的客户端
  3. gitlab nginx php解析,GitLab-webhook-PHP 详解 GitLab Webhooks 自动部署应用服务器
  4. Python笔记(6) 数字
  5. python repair修复功能_详解Python修复遥感影像条带的两种方式
  6. c语言二分法查找一个数_算法竞赛小专题系列(1):二分法、三分法
  7. lisp读取天正轴号_第2天:Python 基础语法
  8. cefSharp 开发随笔
  9. [ubuntu]deb软件源
  10. Color Cube – 国产的优秀配色取色工具