pyspark运行优化

  • pyspark工作原理
  • 工作中的联想
    • 案例借鉴
  • 总结

pyspark工作原理


在Driver端,通过Py4j实现在Python中调用Java的方法,即将用户写的PySpark程序”映射”到JVM中,例如,用户在PySpark中实例化一个Python的SparkContext对象,最终会在JVM中实例化Scala的SparkContext对象;在Executor端,则不需要借助Py4j,因为Executor端运行的Task逻辑是由Driver发过来的,那是序列化后的字节码,虽然里面可能包含有用户定义的Python函数或Lambda表达式,Py4j并不能实现在Java里调用Python的方法,为了能在Executor端运行用户定义的Python函数或Lambda表达式,则需要为每个Task单独启一个Python进程,通过socket通信方式将Python函数或Lambda表达式发给Python进程执行。语言层面的交互总体流程如下图所示,实线表示方法调用,虚线表示结果返回。

  1. driver: pyspark脚本和sparkContext的jvm使用py4j相互调用;
  2. 由于driver帮忙把spark算子封装好了,执行计划也生成了字节码,一般情况下不需要python进程参与;
  3. 仅当需要运行UDF(含lambda表达式形式)时,将它委托给python进程处理,此时JVM和python进程使用socket通信。

工作中的联想

pyspark可以把很多常见的运算封装到JVM中,但是显然对于很多定制化工作,需要写好代码封装到JVM中,实现UDF的调用,加速数据的处理工作。

案例借鉴

首先我们需要用scala重写一下UDF:

object UdfUtils extends java.io.Serializable {case class Idfa(idfa: String, idfv: String) {private def coalesce(V: String, defV: String) =if (V == null) defV else Voverride def toString: String = coalesce(idfa, "-1") + "#" + coalesce(idfv, "-1")}def str2idfa(txt: String): Option[String] = {try {val decodeTxt: Array[Byte] = Base64.getDecoder.decode(txt)// TODO 省略一些处理逻辑val str = "after_some_time"val gson = new Gson()val reader = new JsonReader(new StringReader(str))reader.setLenient(true)val idfaType: Type = new TypeToken[Idfa]() {}.getTypeSome(gson.fromJson(reader, idfaType).toString)}catch {case e: Throwable =>println(txt)e.printStackTrace()None}}// 关键是这里把普通函数转成UDF:def str2idfaUDF: UserDefinedFunction = udf(str2idfa _)

然后在pyspark脚本里调用jar包中的UDF:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
from pytoolkit import TDWSQLProvider, TDWUtil, TDWProvider
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, LongType, StringType, StructField, IntegerType
from pyspark.sql.functions import udf, struct, array
from pyspark.sql.column import Column
from pyspark.sql.column import _to_java_column
from pyspark.sql.column import _to_seq
from pyspark.sql.functions import coldef str2idfa(col):_str2idfa = sc._jvm.com.tencent.kandian.utils.UdfUtils.str2idfaUDF()return Column(_str2idfa.apply(_to_seq(sc, [col], _to_java_column)))spark = SparkSession.builder.appName(app_name).getOrCreate()
sc = spark.sparkContext
if __name__ == '__main__':in_provider = TDWSQLProvider(spark, user=user, passwd=passwd, db=db_name)in_df = in_provider.table('t_dw_dcxxxx', ['p_2019042100'])  # 分区数组print(in_df.columns)in_df.createOrReplaceTempView("t1")out_t1 = in_df.select(col('uin'), str2idfa(col("value"))) # 直接使用scala的udf,节省43%时间,减少两个transformprint(out_t1.columns)print(out_t1.take(10))

其中_jvm变量是sparkContext中JVMView对象的名字,此外sc中还有_gateway变量以连接JVM中的GatawayServer。
提交时,在tesla上的配置spark-confjar包路径:

spark.driver.extraClassPath=pipe-udf-1.0-SNAPSHOT-jar-with-dependencies.jar
spark.executor.extraClassPath=pipe-udf-1.0-SNAPSHOT-jar-with-dependencies.jar

总结

  • 在pyspark中尽量使用spark算子和spark-sql,同时尽量将UDF(含lambda表达式形式)封装到一个地方减少JVM和python脚本的交互。
  • 可以把UDF部分用scala重写打包成jar包,其他部分则保持python脚本以获得不用编译随时修改的灵活性,以兼顾性能和开发效率

pyspark运行加速方法思考(一)相关推荐

  1. OpenCV resize函数源码解析——加速方法

    相信大家应该经常会用到OpenCV中的函数resize(),当我们想放大或者缩小图像的时候,会用到这个函数进行图像缩放,其中最核心的便是对图像的像素进行插值处理. 这里的插值interpolation ...

  2. 成功解决:Win系统下的Tensorflow使用CPU而不使用GPU运行加速

    成功解决:Win系统下的Tensorflow使用CPU而不使用GPU运行加速 目录 解决问题 解决思路 解决方法 解决问题 Win系统下的Tensorflow使用CPU而不使用GPU运行加速 解决思路 ...

  3. 混合精度训练、分布式训练等训练加速方法

    以Pytorch为例 混合精度训练 Pytorch自动混合精度(AMP)训练 Pytorch自动混合精度(AMP)介绍与使用 1. 理论基础 pytorch从1.6版本开始,已经内置了torch.cu ...

  4. 真正的宽带上网加速方法

    真正的宽带加速方法!!!!!!!!!!! (修正版) 我自己使用的宽带是 : 3MB ADSL/Cable Modem 理想下载速度是 375 (KB/sec) 经过破解后我到 http://www. ...

  5. java 下载加速_一种基于Java的大文件下载加速方法与流程

    本发明涉及java/多线程技术领域,涉及一种加速文件下载装置,具体提供一种基于java的大文件下载加速方法. 背景技术: 现有的常用下载方式是基于浏览器的单线程下载.这种单线程下载的方式,是通过htt ...

  6. ae怎么设置gpu渲染_AECS6 显卡GPU加速方法

    显卡 GPU 加速方法 显卡 GPU 加速在预览高清素材和加入特效的时候,可以加快预览速度,做到比较好的即 时预览.会有人问开启 GPU 为什么会加速呢?简单来说,就是显卡自带了内存,会自行去进 行运 ...

  7. 滑动窗加速方法——程明明bing算法【论文翻译】

    BING: Binarized Normed Gradients for Objectness Estimation at 300fps Ming-Ming Cheng, Ziming Zhang, ...

  8. pySpark 运行时出现 Permission denied: (权限问题)

    pySpark 运行时出现 Permission denied: (权限问题) 我们在本地测试pyspark代码,远程连接服务器上的HDFS时可能会出现权限问题. 有两种解决方法: 1. 更改DHFS ...

  9. Linux后台任务运行的方法

    Linux后台任务运行的方法 最近在远程服务器跑算法模型,远程服务器的工具我用的是MobaXterm,推荐推荐,很好用~ 言归正传,跑算法模型要好久,因为服务器性能不是很好,我跑一次要好几天,那就难免 ...

最新文章

  1. C++ OJ 中多行数据输入(大小写转换、通过移位运算实现2的n次方、多组输入,每行输入数量不一样)
  2. 黄金三月,掌握这些,在六十万iOS程序员中脱颖而出
  3. ubuntu|利用system来新建文件夹
  4. pycharm如何修改背景成护眼色和字体大小
  5. 微软.NET各技术应用前景 针对vs.net2010
  6. kubernetes视频教程笔记 (37)-部署 EFK 平台 日志收集 展示
  7. tp3.2 缓存cache
  8. 客户说发货慢怎么回复_给客户发完报价没回复,怎么办?
  9. cannon linux驱动下载
  10. html用记事本打字显示问号,电脑记事本问号怎么办
  11. 电子设计教程19:晶体管负反馈放大电路的原理设计
  12. 图书条码mysql数据库_中琅条码软件如何连接MySQL批量生成128码
  13. 08-CSS属性:定位属性
  14. numpy loadtxt错误ValueError: Wrong number of columns at line ***
  15. 520,冰河亲自整理的Git命令汇总升级版,悄悄努力,然后惊艳所有人(升级版)
  16. 涨知识!芯片是怎么做出来的,今天终于看懂了
  17. 操作系统原理学习笔记(基础概念与进程)
  18. 如何用技术手段将图片背景变为透明?
  19. Junit4 Test Suit使用
  20. java求100的阶乘

热门文章

  1. 微信支付结算费率怎么降低至0.2~0.35操作方法
  2. GitHub使用之路
  3. Python下的自然语言处理利器-LTP语言技术平台 pyltp 学习手札
  4. 代码随想录01 | 704二分查找和27移除元素
  5. python读取20万数据Excel文件+拆分数据
  6. 安装系统遇到MBR的硬盘,EFI系统只能安装GPT硬盘
  7. PHP 图片处理类(水印、透明度、缩放、相框、锐化、旋转、翻转、剪切、反色)...
  8. 简单 html 分页标签
  9. mysql8.0.20忘记密码_mysql8.0版本忘记密码修改密码
  10. Windows中文键盘消失