在即将发布的 Apache Spark™ 3.2 版本中 pandas API 将会成为其中的一部分。Pandas 是一个强大、灵活的库,并已迅速发展成为标准的数据科学库之一。现在,pandas 的用户将能够在他们现有的 Spark 集群上利用 pandas API。

几年前,我们启动了 Koalas 这个开源项目,它在 Spark 之上实现了 Pandas DataFrame API,并被数据科学家广泛采用。最近,Koalas 作为 Project Zen 的一部分被正式合并到 PySpark 中,具体参见 SPIP: Support pandas API layer on PySpark,也可以参见 Data + AI Summit 2021 中的 Project Zen: Making Data Science Easier in PySpark 议题分享。

在即将发布的 Spark 3.2 版本中,pandas 用户仅需要修改一行就可以以分布式的方式使用现有工作负载:

from pandas import read_csv
from pyspark.pandas import read_csv
pdf = read_csv("data.csv")
修改为
from pyspark.pandas import read_csv
pdf = read_csv("data.csv")

本文总结了 Spark 3.2 上的 Pandas API 支持,并重点介绍了值得注意的特性、变化和路线图。

更好的扩展性

众所周知,pandas 的一个限制是只能单机处理,它不能随数据量线性伸缩。例如,如果 pandas 试图读取的数据集大于一台机器的可用内存,则会因内存不足而失败:

Spark 上的 pandas API 克服了这个限制,使用户能够通过利用 Spark 来处理大型数据集:

如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:过往记忆大数据

Spark 上的 pandas API 也可以很好地扩展到大型节点集群。下图显示了使用部分规模的集群分析 15TB 大小的 Parquet 数据集时的性能。集群中的每台机器都有 8 个 vCPU 和 61 GiBs 内存。

在此测试中,Pandas API 在 Spark 上的分布式执行几乎呈线性扩展。当集群中的机器数量增加一倍时,运行时间减少一半。与单台机器相比,加速也很显着。例如,在标准偏差基准(Standard deviation benchmark)测试中,由 256 台机器组成的集群可以在大致相同的时间内处理比单台机器多 250 倍的数据(每台机器有 8 个 vCPU 和 61 GiB 内存):

Single machine Cluster of 256 machines
Parquet Dataset 60GB 60GB x 250 (15TB)
Elapsed time (sec) of Standard deviation 12s 10s

优化单机性能

由于 Spark 引擎中的优化,Spark 上的 pandas API 甚至在单台机器上的性能都优于 pandas。下图展示了在一台机器(具有 96 个 vCPU 和 384 GiBs 内存)上运行 Spark 和单独调用 pandas 分析 130GB 的 CSV 数据集的性能对比。

多线程和 Spark SQL Catalyst Optimizer 都有助于优化性能。例如,Join count 操作在整个阶段代码生成时快 4 倍:没有代码生成时为 5.9 秒,代码生成时为 1.6 秒。

Spark 在链式操作(chaining operations)中具有特别显着的优势。Catalyst 查询优化器可以识别过滤器以明智地过滤数据并可以应用基于磁盘的连接(disk-based joins),而 Pandas 倾向于每一步将所有数据加载到内存中。

考虑两个过滤数据进行 JOIN,然后计算数据集的平均值的查询,Spark 上的 Pandas API 在 4.5 秒内成功,而 Pandas 由于 OOM(内存不足)错误而失败,如下所示:

以可视化的方式交互式操作数据

pandas 默认使用 matplotlib,它提供静态绘图图表。例如,下面的代码生成一个静态图表:

# Area
pandas.DataFrame(np.random.rand(100, 4), columns=list("abcd")).plot.area()

相反,Spark 上的 Pandas API 默认使用 plotly,它提供交互式图表。例如,它允许用户交互地放大和缩小。根据图的类型,Spark 上的 pandas API 在生成交互式图表时会自动确定在内部执行计算的最佳方式:

# Area
pandas.DataFrame(np.random.rand(100, 4), columns=list("abcd")).plot.area()

利用 Spark 中的统一分析功能

pandas 是为 Python 数据科学的批处理而设计的,而 Spark 是为统一分析而设计的,包括 SQL、流处理和机器学习。为了填补它们之间的空白,Spark 上的 Pandas API 为高级用户提供了许多不同的方式来利用 Spark 引擎,例如:

用户可以使用 Spark 优化后的 SQL 引擎直接通过 SQL 查询数据,如下图:

>>> import pandas as pd
>>> import pyspark.pandas as ps
>>> pdf = pd.DataFrame({"a": [1, 3, 5]})  # pandas DataFrame
>>> sdf = spark.createDataFrame(pdf)  # PySpark DataFrame
>>> psdf = sdf.to_pandas_on_spark()  # pandas-on-Spark DataFrame
>>> # Query via SQL
... ps.sql("SELECT count(*) as num FROM {psdf}")

它还支持字符串插值语法(string interpolation syntax)以自然地与 Python 对象交互:

>>> pred = range(4)
>>> # String interpolation with Python instances
... ps.sql("SELECT * from {psdf} WHERE a IN {pred}")

Spark 上的 pandas API 也支持流处理:

>>> def func(sdf, _):
...     # pandas-on-Spark DataFrame
...     psdf = sdf.to_pandas_on_spark()
...     psdf.describe()
...
>>> spark.readStream.format(
...     "kafka").load().writeStream.foreachBatch(func).start()

用户可以轻松调用 Spark 中可扩展的机器学习库:

>>> from pyspark.ml.feature import StringIndexer
>>> sdf = psdf.to_spark()  # PySpark DataFrame
>>> indexer = StringIndexer(
...     inputCol="category", outputCol="categoryIndex")
>>> indexed = indexer.fit(sdf).transform(sdf)
>>> indexed.show()

下一步

对于下一个 Spark 版本,重点关注以下几个方向:

更多类型提示

Spark 上的 Pandas API 中的代码目前是部分类型化的,它仍然支持静态分析和自动完成。将来,所有代码都将是完全类型化的。

性能提升

Spark 上的 Pandas API 有几个地方,我们可以通过与引擎和 SQL 优化器更密切的交互来进一步提高性能。

稳定性

有几个地方需要修复,特别是与缺失值相关的地方,例如 NaN 和 NA 具有行为差异的极端情况。

此外,在这些情况下,Spark 上的 Pandas API 将遵循并将其行为与最新版本的 Pandas 匹配。

更多 API 覆盖

Spark 上的 Pandas API 达到了 Pandas API 的 83% 覆盖率,并且这个数字还在继续增加,现在目标高达 90%。

即将发布的 Apache Spark 3.2 将内置 Pandas API相关推荐

  1. 微信QQ发布新版,接近一个G?内置4个虚拟引擎,这才叫做“大”更新

    嗨,大家好呀,我是柚妹!如今的手机内存越来越大,但依然很容易出现不够用的情况.归根结底,就是因为现在各个 App 所占用的空间也在日益增长中. 毫无疑问的说,我们正处于 APP 功能大爆炸的时代,几乎 ...

  2. class没有发布到tomcat_总在说SpringBoot内置了tomcat启动,那它的原理你说的清楚吗?

    本文同步Java知音社区,专注于Java 作者:歪头儿在帝都 http://cnblogs.com/sword-successful/p/11383723.html 前言 不得不说SpringBoot ...

  3. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十八):kafka0.10.1 内置性能测试API用法示例...

    消费者测试: ./kafka-consumer-perf-test.sh --zookeeper vm10.60.0.11.com.cn:2181,vm10.60.0.7.com.cn:2181,vm ...

  4. spark sql 上个月_SPARK-SQL内置函数之时间日期类

    一.获取当前时间 1.current_date获取当前日期 2018-04-09 2.current_timestamp/now()获取当前时间 2018-04-09 15:20:49.247 二.从 ...

  5. Apache Spark 3.0 预览版正式发布,多项重大功能发布

    今天早上 06:53(2019年11月08日 06:53) 数砖的 Xingbo Jiang 大佬给社区发了一封邮件,宣布 Apache Spark 3.0预览版正式发布,这个版本主要是为了对即将发布 ...

  6. Apache Spark 2.0预览: 机器学习模型持久化

    在即将发布的Apache Spark 2.0中将会提供机器学习模型持久化能力.机器学习模型持久化(机器学习模型的保存和加载)使得以下三类机器学习场景变得容易: \\ 数据科学家开发ML模型并移交给工程 ...

  7. Apache Spark 2.0: 机器学习模型持久化

    在即将发布的Apache Spark 2.0中将会提供机器学习模型持久化能力.机器学习模型持久化(机器学习模型的保存和加载)使得以下三类机器学习场景变得容易: 数据科学家开发ML模型并移交给工程师团队 ...

  8. apache.camel_在即将发布的Camel 2.21版本中改进了使用Apache Camel和ActiveMQ Artemis处理大型消息的功能...

    apache.camel 从历史上看, Apache ActiveMQ消息代理最初是在大型消息以MB为单位而不是GB的情况下创建的,就像您今天所做的那样. 下一代代理Apache ActiveMQ A ...

  9. 在即将发布的Camel 2.21版本中改进了使用Apache Camel和ActiveMQ Artemis处理大型消息的功能...

    从历史上看, Apache ActiveMQ消息代理最初是在大型消息以MB为单位而不是GB的情况下创建的,就像您今天所做的那样. 下一代代理Apache ActiveMQ Artemis(或仅是Art ...

最新文章

  1. Javascript 装饰器极速指南
  2. Tungsten Fabric SDN — 与 OpenStack 的集成部署
  3. python 编程一日一练-「每日一练」巧用python生成随机数
  4. python处理excel的工具-基于Python的Excel处理工具
  5. ASP.NET MVC 3 Beta 发布了
  6. 实验6 实践课程中的程序
  7. apache camel 相关配置_Web基础配置篇(二): Maven配置及使用
  8. MySQL优化详解(二)——数据库架构和使用优化
  9. 51蜂鸣器播放提示音和音乐(完整代码可附赠Proteus仿真)
  10. 游戏动作3d模型素材推荐 精品 小众
  11. 物联网智能垃圾回收源码 智慧分类回收源码 物联网应用
  12. Python可视化 | 风玫瑰图可视化示例
  13. 微信小程序开发之——开发者工具介绍(1.7)
  14. 硅谷课堂的所有流程图和总结
  15. 3.19字节懂车帝一面
  16. 财经365零基础学投资:用江恩展望下半年股市
  17. STATA regress回归结果分析
  18. 【658. 找到 K 个最接近的元素】
  19. 思科ASA防火墙:控制防火墙不让访问指定网站
  20. saiku 部署运行

热门文章

  1. Qt 实现Unicode字符表情包显示到界面 Emoji
  2. linux登录提示文件 motd 有趣的motd
  3. Android Studio 截屏
  4. Vim - 扩展命令(末行命令)模式
  5. Kotlin泛型上界与扩展函数
  6. 大数据分析师工资待遇
  7. python大数据工程师薪资待遇_2019年就业薪资,凭什么大数据工程师遥遥领先?...
  8. 如何定位前后端Bug
  9. 联邦学习首次被纳入Gartner隐私计算技术成熟度曲线
  10. 为什么国外程序员的创造力比中国程序员强?