在实践中,性能影响几乎与您省略了partitionBy子句相同.所有记录将被洗牌到一个分区,在本地排序并逐个顺序迭代.

差异仅在于总共创建的分区数.让我们举例说明使用包含10个分区和1000个记录的简单数据集的示例:

df = spark.range(0, 1000, 1, 10).toDF("index").withColumn("col1", f.randn(42))

如果您定义没有partition by子句的框架

w_unpart = Window.orderBy(f.col("index").asc())

并使用滞后

df_lag_unpart = df.withColumn(

"diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")

)

总共只有一个分区:

df_lag_unpart.rdd.glom().map(len).collect()

[1000]

与具有虚拟索引的帧定义相比(与您的代码相比简化了一点:

w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc())

将使用等于spark.sql.shuffle.partitions的分区数:

spark.conf.set("spark.sql.shuffle.partitions", 11)

df_lag_part = df.withColumn(

"diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1")

)

df_lag_part.rdd.glom().count()

11

只有一个非空分区:

df_lag_part.rdd.glom().filter(lambda x: x).count()

1

遗憾的是,没有通用的解决方案可以用来解决PySpark中的这个问题.这只是实现的固有机制与分布式处理模型相结合.

由于索引列是顺序的,因此您可以生成每个块具有固定数量记录的人工分区键:

rec_per_block = df.count() // int(spark.conf.get("spark.sql.shuffle.partitions"))

df_with_block = df.withColumn(

"block", (f.col("index") / rec_per_block).cast("int")

)

并用它来定义框架规范:

w_with_block = Window.partitionBy("block").orderBy("index")

df_lag_with_block = df_with_block.withColumn(

"diffs_col1", f.lag("col1", 1).over(w_with_block) - f.col("col1")

)

这将使用预期的分区数:

df_lag_with_block.rdd.glom().count()

11

大致统一的数据分布(我们无法避免哈希冲突):

df_lag_with_block.rdd.glom().map(len).collect()

[0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270]

但是在块边界上有许多空白:

df_lag_with_block.where(f.col("diffs_col1").isNull()).count()

12

由于边界易于计算:

from itertools import chain

boundary_idxs = sorted(chain.from_iterable(

# Here we depend on sequential identifiers

# This could be generalized to any monotonically increasing

# id by taking min and max per block

(idx - 1, idx) for idx in

df_lag_with_block.groupBy("block").min("index")

.drop("block").rdd.flatMap(lambda x: x)

.collect()))[2:] # The first boundary doesn't carry useful inf.

你总是可以选择:

missing = df_with_block.where(f.col("index").isin(boundary_idxs))

并分别填写:

# We use window without partitions here. Since number of records

# will be small this won't be a performance issue

# but will generate "Moving all data to a single partition" warning

missing_with_lag = missing.withColumn(

"diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")

).select("index", f.col("diffs_col1").alias("diffs_fill"))

并加入:

combined = (df_lag_with_block

.join(missing_with_lag, ["index"], "leftouter")

.withColumn("diffs_col1", f.coalesce("diffs_col1", "diffs_fill")))

获得理想的结果:

mismatched = combined.join(df_lag_unpart, ["index"], "outer").where(

combined["diffs_col1"] != df_lag_unpart["diffs_col1"]

)

assert mismatched.count() == 0

python partition by函数_python – 避免Spark窗口函数中单个分区模式的性能影响相关推荐

  1. python编写自定义函数计算一维numpy数组中与指定目标数值最接近(距离最近)的数值(find closest value in numpy array to a certain value)

    python编写自定义函数计算一维numpy数组中与指定目标数值最接近(距离最近)的数值(find closest value in numpy array to a certain value) 目 ...

  2. 以下哪个不是python的内置函数_以下哪个 Python 内置函数可以返回列表对象中元素个数。...

    [多选题]假设 x=[0,1,2,3],执行哪些语句之后,x 的值为[0, 1, 2]. [多选题]以下哪些对象的分隔符为逗号. [单选题]已知列表 x=[0,1,2,1,4],那么执行语句 del ...

  3. python拟合三元函数_python基础教程之常用内置函数、三元运算、递归

    目录 常用内置函数 abs/round/sum eval/exec enumerate max/min sorted zip map filter 补充:reduce lambda 初识递归 再谈递归 ...

  4. python数据处理常用函数_Python中常用操作字符串的函数与方法总结

    Python中常用操作字符串的函数与方法总结 这篇文章主要介绍了Python中常用操作字符串的函数与方法总结,包括字符串的格式化输出与拼接等基础知识,需要的朋友可以参考下 例如这样一个字符串 Pyth ...

  5. python数字类型转换函数_Python的数据类型转换函数

    玩蛇网这篇文章给大家介绍关于,Python数据类型的转换函数. Python提供了一些可以把某个值从一种数据类型,转换成为另一种数据类型的内置函数和方法.int函数可以将任何可以转换为整型的值转换为整 ...

  6. python重复执行函数_Python threading 单线程 timer重复调用函数

    项目中需要使用定时器,每次都使用构造器函数调用: timer = threading.Timer(timerFlag, upload_position) timer.start() 打印线程后发现,每 ...

  7. python用psf函数_Python 嵌套函数(高级用法)

    Python 嵌套函数(高级用法) 一.嵌套函数(高级用法) 1.嵌套函数 函数的嵌套调用是在"函数调用中再调用其他函数".也就是说:函数嵌套允许在一个函数中调用另外一个函数.如下 ...

  8. python的高级函数_Python的高级函数

    Python的高级函数 1. map函数 map(func, *itertables) 对itertables中的每个元素使用func,当最短的itertable结束时便停止.map可以传入多个ite ...

  9. python self调用函数_Python之self

    python类的函数中,第一个参数必须为实例对象本身,约定俗称记为self.self是实例本身,在外部调用类中的函数时,不需要给self赋值,但需要给其他的参数赋值. 与普通的函数相比,在类中定义的函 ...

最新文章

  1. repmgr 4.3 发布,PostgreSQL 复制与故障转移管理工具
  2. one-hot(独热)、bag of word(词袋)、word-Embedding(词嵌入)浅析
  3. 命令 结构_只需一个命令!从你的U盘里读出更多内容
  4. Handshake failed due to invalid Upgrade header: null 解决方案
  5. PHP程序员最常犯的11个MySQL错误
  6. Experimental Release #2: Multiple Device Support
  7. 郑州大学和中国地质大学计算机学院,2020最新版研究生录取通知书!54所院校,一定有你喜欢的风格!...
  8. VS2017 CUDA编程学习实例1:CUDA实现向量点乘
  9. R数据分析:二分类因变量的混合效应,多水平logistics模型介绍
  10. 服务器gpu芯片排行,GPU云服务器排行榜
  11. 数据采集(三):用XPath爬取腾讯新闻
  12. 三维重建(知识点详细解读、主要流程)
  13. php发出声音,电容也会发出声音!电容啸叫是怎么产生的?如何解决?
  14. 短视频创业,如何在技术上节省100万启动资金?
  15. 什么是 STL 文件
  16. ISO 2675:2021汽车仪表盘,操控台各符号意义,国际标准Road vehicles — Symbols for controls, indicators and tell-tales
  17. Python Selenium破解滑块验证码最新版!
  18. 深度学习21_李宏毅_02_Regression_P2
  19. IE浏览器设置兼容性、清除缓存,重置浏览器、Edge浏览器设置兼容性
  20. 留学日本专业比较: 理工科、文科、与研究

热门文章

  1. kubeadm 部署全过程问题解决记录
  2. Android adb远程调试
  3. ROS2学习(六).ROS概念 - 服务质量设置
  4. git 检查更新文件_Git通过差异比对快速打包待更新文件(SQL)
  5. 【计算机组成原理】存储器简述
  6. 计算机专业实训计划,计算机专业实习计划范文
  7. 深度学习之自编码器(2)Fashion MNIST图片重建实战
  8. linux 文件系统---类型、创建、
  9. python图片二值化处理百度图api_Python+百度AI实现图像处理-附源码
  10. python size和count_python中size和count的区别