python partition by函数_python – 避免Spark窗口函数中单个分区模式的性能影响
在实践中,性能影响几乎与您省略了partitionBy子句相同.所有记录将被洗牌到一个分区,在本地排序并逐个顺序迭代.
差异仅在于总共创建的分区数.让我们举例说明使用包含10个分区和1000个记录的简单数据集的示例:
df = spark.range(0, 1000, 1, 10).toDF("index").withColumn("col1", f.randn(42))
如果您定义没有partition by子句的框架
w_unpart = Window.orderBy(f.col("index").asc())
并使用滞后
df_lag_unpart = df.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
)
总共只有一个分区:
df_lag_unpart.rdd.glom().map(len).collect()
[1000]
与具有虚拟索引的帧定义相比(与您的代码相比简化了一点:
w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc())
将使用等于spark.sql.shuffle.partitions的分区数:
spark.conf.set("spark.sql.shuffle.partitions", 11)
df_lag_part = df.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1")
)
df_lag_part.rdd.glom().count()
11
只有一个非空分区:
df_lag_part.rdd.glom().filter(lambda x: x).count()
1
遗憾的是,没有通用的解决方案可以用来解决PySpark中的这个问题.这只是实现的固有机制与分布式处理模型相结合.
由于索引列是顺序的,因此您可以生成每个块具有固定数量记录的人工分区键:
rec_per_block = df.count() // int(spark.conf.get("spark.sql.shuffle.partitions"))
df_with_block = df.withColumn(
"block", (f.col("index") / rec_per_block).cast("int")
)
并用它来定义框架规范:
w_with_block = Window.partitionBy("block").orderBy("index")
df_lag_with_block = df_with_block.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_with_block) - f.col("col1")
)
这将使用预期的分区数:
df_lag_with_block.rdd.glom().count()
11
大致统一的数据分布(我们无法避免哈希冲突):
df_lag_with_block.rdd.glom().map(len).collect()
[0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270]
但是在块边界上有许多空白:
df_lag_with_block.where(f.col("diffs_col1").isNull()).count()
12
由于边界易于计算:
from itertools import chain
boundary_idxs = sorted(chain.from_iterable(
# Here we depend on sequential identifiers
# This could be generalized to any monotonically increasing
# id by taking min and max per block
(idx - 1, idx) for idx in
df_lag_with_block.groupBy("block").min("index")
.drop("block").rdd.flatMap(lambda x: x)
.collect()))[2:] # The first boundary doesn't carry useful inf.
你总是可以选择:
missing = df_with_block.where(f.col("index").isin(boundary_idxs))
并分别填写:
# We use window without partitions here. Since number of records
# will be small this won't be a performance issue
# but will generate "Moving all data to a single partition" warning
missing_with_lag = missing.withColumn(
"diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
).select("index", f.col("diffs_col1").alias("diffs_fill"))
并加入:
combined = (df_lag_with_block
.join(missing_with_lag, ["index"], "leftouter")
.withColumn("diffs_col1", f.coalesce("diffs_col1", "diffs_fill")))
获得理想的结果:
mismatched = combined.join(df_lag_unpart, ["index"], "outer").where(
combined["diffs_col1"] != df_lag_unpart["diffs_col1"]
)
assert mismatched.count() == 0
python partition by函数_python – 避免Spark窗口函数中单个分区模式的性能影响相关推荐
- python编写自定义函数计算一维numpy数组中与指定目标数值最接近(距离最近)的数值(find closest value in numpy array to a certain value)
python编写自定义函数计算一维numpy数组中与指定目标数值最接近(距离最近)的数值(find closest value in numpy array to a certain value) 目 ...
- 以下哪个不是python的内置函数_以下哪个 Python 内置函数可以返回列表对象中元素个数。...
[多选题]假设 x=[0,1,2,3],执行哪些语句之后,x 的值为[0, 1, 2]. [多选题]以下哪些对象的分隔符为逗号. [单选题]已知列表 x=[0,1,2,1,4],那么执行语句 del ...
- python拟合三元函数_python基础教程之常用内置函数、三元运算、递归
目录 常用内置函数 abs/round/sum eval/exec enumerate max/min sorted zip map filter 补充:reduce lambda 初识递归 再谈递归 ...
- python数据处理常用函数_Python中常用操作字符串的函数与方法总结
Python中常用操作字符串的函数与方法总结 这篇文章主要介绍了Python中常用操作字符串的函数与方法总结,包括字符串的格式化输出与拼接等基础知识,需要的朋友可以参考下 例如这样一个字符串 Pyth ...
- python数字类型转换函数_Python的数据类型转换函数
玩蛇网这篇文章给大家介绍关于,Python数据类型的转换函数. Python提供了一些可以把某个值从一种数据类型,转换成为另一种数据类型的内置函数和方法.int函数可以将任何可以转换为整型的值转换为整 ...
- python重复执行函数_Python threading 单线程 timer重复调用函数
项目中需要使用定时器,每次都使用构造器函数调用: timer = threading.Timer(timerFlag, upload_position) timer.start() 打印线程后发现,每 ...
- python用psf函数_Python 嵌套函数(高级用法)
Python 嵌套函数(高级用法) 一.嵌套函数(高级用法) 1.嵌套函数 函数的嵌套调用是在"函数调用中再调用其他函数".也就是说:函数嵌套允许在一个函数中调用另外一个函数.如下 ...
- python的高级函数_Python的高级函数
Python的高级函数 1. map函数 map(func, *itertables) 对itertables中的每个元素使用func,当最短的itertable结束时便停止.map可以传入多个ite ...
- python self调用函数_Python之self
python类的函数中,第一个参数必须为实例对象本身,约定俗称记为self.self是实例本身,在外部调用类中的函数时,不需要给self赋值,但需要给其他的参数赋值. 与普通的函数相比,在类中定义的函 ...
最新文章
- repmgr 4.3 发布,PostgreSQL 复制与故障转移管理工具
- one-hot(独热)、bag of word(词袋)、word-Embedding(词嵌入)浅析
- 命令 结构_只需一个命令!从你的U盘里读出更多内容
- Handshake failed due to invalid Upgrade header: null 解决方案
- PHP程序员最常犯的11个MySQL错误
- Experimental Release #2: Multiple Device Support
- 郑州大学和中国地质大学计算机学院,2020最新版研究生录取通知书!54所院校,一定有你喜欢的风格!...
- VS2017 CUDA编程学习实例1:CUDA实现向量点乘
- R数据分析:二分类因变量的混合效应,多水平logistics模型介绍
- 服务器gpu芯片排行,GPU云服务器排行榜
- 数据采集(三):用XPath爬取腾讯新闻
- 三维重建(知识点详细解读、主要流程)
- php发出声音,电容也会发出声音!电容啸叫是怎么产生的?如何解决?
- 短视频创业,如何在技术上节省100万启动资金?
- 什么是 STL 文件
- ISO 2675:2021汽车仪表盘,操控台各符号意义,国际标准Road vehicles — Symbols for controls, indicators and tell-tales
- Python Selenium破解滑块验证码最新版!
- 深度学习21_李宏毅_02_Regression_P2
- IE浏览器设置兼容性、清除缓存,重置浏览器、Edge浏览器设置兼容性
- 留学日本专业比较: 理工科、文科、与研究
热门文章
- kubeadm 部署全过程问题解决记录
- Android adb远程调试
- ROS2学习(六).ROS概念 - 服务质量设置
- git 检查更新文件_Git通过差异比对快速打包待更新文件(SQL)
- 【计算机组成原理】存储器简述
- 计算机专业实训计划,计算机专业实习计划范文
- 深度学习之自编码器(2)Fashion MNIST图片重建实战
- linux 文件系统---类型、创建、
- python图片二值化处理百度图api_Python+百度AI实现图像处理-附源码
- python size和count_python中size和count的区别