从这篇文章开始,我们专栏开始一个新的读 paper 系列。

今天要介绍的 paper 是 Towards Scalable Dataframe Systems,目前还是预印本。作者 Devin Petersohn 来自 Riselab,该实验室的前身是大名鼎鼎的 APMLab,诞生了 Apache Spark、Apache Mesos 等一系列著名开源项目。

个人觉得这篇 paper 蛮有意义的,第一次(据我所知)试图在学术上对 DataFrame 做定义,给了很好的理论指导意义。

这篇文章我不会拘泥于原 paper,我会加入自己的理解。本篇文章会大致分三部分:

  1. 什么是真正的 DataFrame?
  2. 为什么现在的所谓 DataFrame 系统,典型的如 Spark DataFrame,有可能正在杀死 DataFrame 的原本含义。
  3. 从 Mars DataFrame 的角度来看这个问题。

什么是真正的 DataFrame?

起源

最早的 "DataFrame" (开始被称作 "data frame"),来源于贝尔实验室开发的 S 语言。"data frame" 在 1990 年就发布了,书《S 语言统计模型》第3章里详述了它的概念,书里着重强调了 dataframe 的矩阵起源。

书中描述 DataFrame 看上去很像矩阵,且支持类似矩阵的操作;同时又很像关系表。

R 语言,作为 S 语言的开源版本,于 2000 年发布了第一个稳定版本,并且实现了 dataframe。pandas 于 2009 年被开发,Python 中于是也有了 DataFrame 的概念。这些 DataFrame 都同宗同源,有着相同的语义和数据模型。

DataFrame 数据模型

DataFrame 的需求来源于把数据看成矩阵和表。但是,矩阵中只包含一种数据类型,未免过于受限;同时,关系表要求数据必须要首先定义 schema。对于 DataFrame 来说,它的列类型可以在运行时推断,并不需要提前知晓,也不要求所有列都是一个类型。因此,DataFrame 可以理解成是关系系统、矩阵、甚至是电子表格程序(典型如 Excel)的合体。

跟关系系统相比,DataFrame 有几个特别有意思的属性,让 DataFrame 因此独一无二。

保证顺序,行列对称

首先,无论在行还是列方向上,DataFrame 都是有顺序的;且行和列都是一等公民,不会区分对待。

拿 pandas 举例子,当创建了一个 DataFrame 后,无论行和列上数据都是有顺序的,因此,在行和列上都可以使用位置来选择数据。

In [1]: import pandas as pd                                                     In [2]: import numpy as np                                                      In [3]: df = pd.DataFrame(np.random.rand(5, 4))                                 In [4]: df
Out[4]: 0         1         2         3
0  0.736385  0.271232  0.940270  0.926548
1  0.319533  0.891928  0.471176  0.583895
2  0.440825  0.500724  0.402782  0.109702
3  0.300279  0.483571  0.639299  0.778849
4  0.341113  0.813870  0.054731  0.059262In [5]: df.iat[2, 2]  # 第二行第二列元素
Out[5]: 0.40278182653648853

因为行和列的对称关系,因此聚合函数在两个方向上都可以计算,只需指定 axis 即可。

In [6]: df.sum()  # 默认 axis == 0,在行方向上做聚合,因此结果是4个元素
Out[6]:
0    2.138135
1    2.961325
2    2.508257
3    2.458257
dtype: float64In [7]: df.sum(axis=1)  # axis == 1,在列方向上做聚合,因此是5个元素
Out[7]:
0    2.874434
1    2.266533
2    1.454032
3    2.201998
4    1.268976
dtype: float64

如果熟悉 numpy(数值计算库,包含多维数组和矩阵的定义),可以看到这个特性非常熟悉,从而可以看出 DataFrame 的矩阵本质。

丰富的 API

DataFrame 的 API 非常丰富,横跨关系(如 filter、join)、线性代数(如 transpose、dot)以及类似电子表格(如 pivot)的操作。

还是以 pandas 为例,一个 DataFrame 可以做转置操作,让行和列对调。

In [8]: df.transpose()
Out[8]: 0         1         2         3         4
0  0.736385  0.319533  0.440825  0.300279  0.341113
1  0.271232  0.891928  0.500724  0.483571  0.813870
2  0.940270  0.471176  0.402782  0.639299  0.054731
3  0.926548  0.583895  0.109702  0.778849  0.059262

直观的语法,适合交互式分析

用户可以对 DataFrame 数据不断进行探索,查询结果可以被后续的结果复用,可以非常方便地用编程的方式组合非常复杂的操作,很适合交互式的分析。

列中允许异构数据

DataFrame 的类型系统允许一列中有异构数据的存在,比如,一个 int 列中允许有 string 类型数据存在,它可能是脏数据。这点看出 DataFrame 非常灵活。

In [10]: df2 = df.copy()                                                        In [11]: df2.iloc[0, 0] = 'a'                                                   In [12]: df2
Out[12]: 0         1         2         3
0         a  0.271232  0.940270  0.926548
1  0.319533  0.891928  0.471176  0.583895
2  0.440825  0.500724  0.402782  0.109702
3  0.300279  0.483571  0.639299  0.778849
4  0.341113  0.813870  0.054731  0.059262

数据模型

现在我们可以对什么是真正的 DataFrame 正式下定义:

DataFrame 由二维混合类型的数组、行标签、列标签、以及类型(types 或者 domains)组成。在每列上,这个类型是可选的,可以在运行时推断。从行上看,可以把 DataFrame 看做行标签到行的映射,且行之间保证顺序;从列上看,可以看做列类型到列标签到列的映射,同样,列间同样保证顺序。

行标签和列标签的存在,让选择数据时非常方便。

In [13]: df.index = pd.date_range('2020-4-15', periods=5)                       In [14]: df.columns = ['c1', 'c2', 'c3', 'c4']                                  In [15]: df
Out[15]: c1        c2        c3        c4
2020-04-15  0.736385  0.271232  0.940270  0.926548
2020-04-16  0.319533  0.891928  0.471176  0.583895
2020-04-17  0.440825  0.500724  0.402782  0.109702
2020-04-18  0.300279  0.483571  0.639299  0.778849
2020-04-19  0.341113  0.813870  0.054731  0.059262In [16]: df.loc['2020-4-16': '2020-4-18', 'c2': 'c3']  # 注意这里的切片是闭区间
Out[16]: c2        c3
2020-04-16  0.891928  0.471176
2020-04-17  0.500724  0.402782
2020-04-18  0.483571  0.639299

这里的 indexcolumns 就分别是行和列标签。我们可以很容易选择一段时间(行上选择)和几列(列上选择)数据。当然这些建立在数据是按顺序存储的基础上。

按顺序存储的特性让 DataFrame 非常适合用来做统计方面的工作。

In [17]: df3 = df.shift(1)  # 把 df 的数据整体下移一格,行列索引保持不变                                                      In [18]: df3
Out[18]: c1        c2        c3        c4
2020-04-15       NaN       NaN       NaN       NaN
2020-04-16  0.736385  0.271232  0.940270  0.926548
2020-04-17  0.319533  0.891928  0.471176  0.583895
2020-04-18  0.440825  0.500724  0.402782  0.109702
2020-04-19  0.300279  0.483571  0.639299  0.778849In [19]: df - df3  # 数据减法会自动按标签对齐,因此这一步可以用来计算环比
Out[19]: c1        c2        c3        c4
2020-04-15       NaN       NaN       NaN       NaN
2020-04-16 -0.416852  0.620697 -0.469093 -0.342653
2020-04-17  0.121293 -0.391205 -0.068395 -0.474194
2020-04-18 -0.140546 -0.017152  0.236517  0.669148
2020-04-19  0.040834  0.330299 -0.584568 -0.719587In [21]: (df - df3).bfill()  # 第一行的空数据按下一行填充
Out[21]: c1        c2        c3        c4
2020-04-15 -0.416852  0.620697 -0.469093 -0.342653
2020-04-16 -0.416852  0.620697 -0.469093 -0.342653
2020-04-17  0.121293 -0.391205 -0.068395 -0.474194
2020-04-18 -0.140546 -0.017152  0.236517  0.669148
2020-04-19  0.040834  0.330299 -0.584568 -0.719587

从例子看到,正因为数据是按顺序存放的,因此我们可以索引保持不变,整体下移一行,这样,昨天的数据就到了今天的行上,然后拿原数据减去位移后的数据时,因为 DataFrame 会自动按标签做对齐,因此,对于一个日期,相当于用当天的数据减去了前天的数据,这样就可以做类似于环比的操作。这简直太方便了。试想,对于关系系统来说,恐怕需要想办法找一列作为 join 的条件,然后再做减法等等。最后,对于空数据,我们还可以填充上一行(ffill)或者下一行的数据(bfill)。想在关系系统里想达到同样效果,想必是需要大费周章的。

DataFrame 的真正含义正在被杀死

近几年,DataFrame 系统如同雨后春笋般出现,然而,这其中的绝大多数系统只包含了关系表的语义,并不包含我们之前说的矩阵方面的意义,且它们大多也并不保证数据顺序,因此真正 DataFrame 所拥有的统计和机器学习方面的特质也不复存在。这些 “DataFrame” 系统的出现,让 “DataFrame” 这个词本身几乎变得没有意义。数据科学家们为了处理大规模的数据,思维方式不得不作出改变,这其中必然存在风险。

Spark DataFrame 和 Koalas 不是真正的 DataFrame

这些 DataFrame 系统的代表是 Spark DataFrame, Spark 当然是伟大的,它解决了数据规模的问题;同时又首次把 ”DataFrame“ 的概念带到了大数据的领域。但其实它只是 spark.sql的另一种形式(当然 Spark DataFrame 确实在 spark.sql 下)。Spark DataFrame 只包含了关系表的语义,schema 需要确定,数据也并不保证顺序。

那么会有同学说 Koalas 呢?Koalas 提供了 pandas API,用 pandas 的语法就可以在 spark 上分析了。实际上,因为 Koalas 也是将 pandas 的操作转成 Spark DataFrame 来执行,因为 Spark DataFrame 内核本身的特性,注定 Koalas 只是看上去和 pandas 一致。

为了说明这点,我们使用 数据集(Hourly Ridership by Origin-Destination Pairs),只取 2019 年的数据。

对于 pandas,我们按天聚合,并按 30 天滑动窗口来计算平均值。

In [22]: df = pd.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', ...: names=['Date','Hour','Origin','Destination','Trip Count'])             In [23]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot()
Out[23]: <matplotlib.axes._subplots.AxesSubplot at 0x118077d90>

如果是 Koalas,因为它的 API 看上去和 pandas 一致,因此,我们按照 Koalas 的文档做 import 替换。

In [1]: import databricks.koalas as ksIn [2]: df = ks.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', names=['Date','Hour','Origin','Destination','Trip Count'])In [3]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot()

然后令人惊讶的是,结果并不一致。大费周章后才查到,原因是顺序问题,聚合的结果后并不保证排序,因此要得到一样的结果需要在 rolling 前加 sort_index(),确保 groupby 后的结果是排序的。

In [4]: df.groupby('Date').mean()['Trip Count'].sort_index().rolling(30).mean().plot()

默认的排序规则非常重要,这对以时间作为索引的数据尤其关键,而且这让数据科学家更容易观察数据,也更容易复现结果。

所以,在使用 Koalas 时请小心,要时刻关注你的数据在你心中是不是排序的,因为 Koalas 很可能表现地和你想的不一致。

让我们再看 shift,它能工作的一个前提就是数据是排序的,那么在 Koalas 中调用会发生什么呢?

In [6]: df.shift(1)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.py in deco(*a, **kw)62         try:
---> 63             return f(*a, **kw)64         except py4j.protocol.Py4JJavaError as e:/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)327                     "An error occurred while calling {0}{1}{2}.n".
--> 328                     format(target_id, ".", name), value)329             else:Py4JJavaError: An error occurred while calling o110.select.
: org.apache.spark.sql.AnalysisException: cannot resolve 'isnan(lag(`Date`, 1, NULL) OVER (ORDER BY `__natural_order__` ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING))' due to data type mismatch: argument 1 requires (double or float) type, however, 'lag(`Date`, 1, NULL) OVER (ORDER BY `__natural_order__` ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)' is of timestamp type.;;
'Project [__index_level_0__#41, CASE WHEN (isnull(lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)))) THEN null ELSE lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Date#87, CASE WHEN (isnull(lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as int) ELSE lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Hour#88, CASE WHEN (isnull(lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as string) ELSE lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Origin#89, CASE WHEN (isnull(lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as string) ELSE lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Destination#90, CASE WHEN (isnull(lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as int) ELSE lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Trip Count#91, __natural_order__#50L]
+- Project [__index_level_0__#41, Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, monotonically_increasing_id() AS __natural_order__#50L]+- Project [__index_level_0__#41, Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34]+- Project [Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, _w0#42L, _we0#43, (_we0#43 - 1) AS __index_level_0__#41]+- Window [row_number() windowspecdefinition(_w0#42L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#43], [_w0#42L ASC NULLS FIRST]+- Project [Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, monotonically_increasing_id() AS _w0#42L]+- Project [0#20 AS Date#30, 1#21 AS Hour#31, 2#22 AS Origin#32, 3#23 AS Destination#33, 4#24 AS Trip Count#34]+- Project [_c0#10 AS 0#20, _c1#11 AS 1#21, _c2#12 AS 2#22, _c3#13 AS 3#23, _c4#14 AS 4#24]+- Relation[_c0#10,_c1#11,_c2#12,_c3#13,_c4#14] csvat org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:116)at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:108)at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)at org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$mapChild$2(TreeNode.scala:306)at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$13.apply(TreeNode.scala:356)at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)at scala.collection.immutable.List.foreach(List.scala:392)at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)at scala.collection.immutable.List.map(List.scala:296)at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:356)at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)at ...  # 后面还有一大段报错信息,此处省略

这个报错可能会让数据科学家们震惊,什么,我就做了个 shift 啊,报错里掺杂着 Java 异常栈和一大堆看不懂的错误。

这里真正的错误和 Date 是时间戳有关,那么我们只取 int 类型的字段做 shift 总可以了吧。

In [10]: df['Hour'].shift(1)
Out[10]: 20/04/20 17:22:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
20/04/20 17:22:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

确实可以运行,但却看到一句话,大意是数据会被放到一个分区来执行,这正是因为数据本身之间并不保证顺序,因此只能把数据收集到一起,排序,再调用 shift。这样就不再是一个分布式的程序了,甚至比 pandas 本身更慢。

DataFrame.dot 等矩阵相关的操作在 Koalas 里也不包含,这些操作已经很难用关系代数来表达了。

PyODPS DataFrame

相信用过 MaxCompute(原名 ODPS,阿里云自研的大数据系统),应该会听说过 PyODPS。这个库是我们前几年的产品,PyODPS 里也包含一个 DataFrame,而 PyODPS DataFrame 在执行的时候会被编译到 ODPS SQL 来执行。

提 PyODPS DataFrame 的原因是,我们在几年前就发现,虽然它提供了 pandas-like 的接口,一定程度上让用户能用类似 pandas 的思维解决问题,然而,当用户问我们,如何向后填充数据?如何通过索引获取数据?答案都是不能。原因也是一样的,因为 PyODPS DataFrame 只是将计算代理给不保证有序、只有关系代数算子的引擎来执行。

如果系统本身的数据模型不是真正的 DataFrame 模型,仅仅让接口看起来像是远远不够的。

Mars DataFrame

因此这里要说到 Mars DataFrame,其实我们做 Mars 的初衷和这篇 paper 的想法是一致的,因为现有的系统虽然能很好地解决规模问题,但那些传统数据科学包中好的部分却被人遗忘了,我们希望 Mars 能保留这些库中好的部分,又能解决规模问题,也能充分利用新硬件。

Mars DataFrame 会自动将 DataFrame 分割成很多小的 chunk,每个 chunk 也是一个 DataFrame,而无论是 chunk 间还是 chunk 内的数据,都保证顺序。

图里的示例中,一个行数 380、列数 370 的 DataFrame,被 Mars 分成 3x3 一共 9 个 chunk,根据计算在 CPU 还是 NVIDIA GPU 上进行,用 pandas DataFrame 或者 cuDF DataFrame 来存储数据和执行真正的计算。可以看到,Mars 既会在行上,也会在列上进行分割,这种在行上和列上的对等性,让 DataFrame 的矩阵本质能得以发挥。

在单机真正执行时,根据初始数据的位置,Mars 会自动把数据分散到多核或者多卡执行;对于分布式,会将计算分散到多台机器执行。

Mars DataFrame 保留了行标签、列标签和类型的概念。因此能够想象如同 pandas 一样,可以在比较大的数据集上根据标签进行筛选。

In [1]: import mars.dataframe as md                                             In [2]: import mars.tensor as mtIn [8]: df = md.DataFrame(mt.random.rand(10000, 10, chunk_size=1000), ...:                   index=md.date_range('2020-1-1', periods=10000))       In [9]: df.loc['2020-4-15'].execute()
Out[9]:
0    0.622763
1    0.446635
2    0.007870
3    0.107846
4    0.288893
5    0.219340
6    0.228806
7    0.969435
8    0.033130
9    0.853619
Name: 2020-04-15 00:00:00, dtype: float64

Mars 会保持和 pandas 一致的排序特性,因此对于 groupby 等操作,无需担心结果和所想不一致。

In [6]: import mars.dataframe as md                                             In [7]: df = md.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', n...: ames=['Date','Hour','Origin','Destination','Trip Count'])               In [8]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot() # 结果正确
Out[8]: <matplotlib.axes._subplots.AxesSubplot at 0x11ff8ab90>

对于 shift,不光结果正确,而且执行时能利用多核、多卡和分布式的能力。

In [3]: df.shift(1).head(10).execute()
Out[3]: Date  Hour Origin Destination  Trip Count
0         NaN   NaN    NaN         NaN         NaN
1  2019-01-01   0.0   12TH        16TH         4.0
2  2019-01-01   0.0   12TH        ANTC         1.0
3  2019-01-01   0.0   12TH        BAYF         1.0
4  2019-01-01   0.0   12TH        CIVC         2.0
5  2019-01-01   0.0   12TH        COLM         1.0
6  2019-01-01   0.0   12TH        COLS         1.0
7  2019-01-01   0.0   12TH        CONC         1.0
8  2019-01-01   0.0   12TH        DALY         1.0
9  2019-01-01   0.0   12TH        DELN         2.0

不只是 DataFrame

Mars 还包含 tensor 模块来支持并行和分布式化 numpy,以及 learn 模块来并行和分布式化 scikit-learn,因此可以想象,如 mars.tensor.linalg.svd 可以直接作用在 Mars DataFrame 上,这就赋予了 Mars 超越 DataFrame 本身的语义。

In [1]: import mars.dataframe as md                                             In [2]: import mars.tensor as mt                                                In [3]: df = md.DataFrame(mt.random.rand(10000, 10, chunk_size=1000))           In [5]: mt.linalg.svd(df).execute()

总结

《Towards Scalable DataFrame Systems》赋予了 DataFrame 学术定义。而要做到可扩展的 DataFrame,首先必须是真正的 DataFrame,其次才是可扩展。

在我们看来,Mars 是真正的 DataFrame,它生来目标就是可扩展,而 Mars 又不仅仅是 DataFrame。在我们看来,Mars 在数据科学领域大有可为。

参考

  • Towards Scalable Dataframe Systems:https://arxiv.org/abs/2001.00888
  • Preventing the Death of the DataFrame:https://towardsdatascience.com/preventing-the-death-of-the-dataframe-8bca1c0f83c8

dataframe两个表合并_Spark DataFrame 不是真正的 DataFrame相关推荐

  1. dataframe两个表合并_Spark实战第二版(涵盖Spark3.0)第三章 宏伟的角色dataframe

    关注公众号: 登峰大数据 ,阅读Spark实战第二版(完整中文版),系统学习Spark3.0大数据框架! 如果您觉得作者翻译的内容有帮助,请分享给更多人.您的分享,是作者翻译的动力! 本章涵盖了 使用 ...

  2. dataframe两个表合并_DataFrame踩坑整理(一)

    不得不说,工作中几乎天天跟DataFrame打交道,真的很有总结和记住的必要,话不多说,直接上干货.输入以下语句前别忘记: import pandas as pd import numpy as np ...

  3. dataframe两个表合并_史上代码最少的工作表拆分,仅需5行,不可思议

    工作中,很多小伙伴都会遇到一些需求,将一份Excel文档按照部门进行拆分,每个部门是一个单独的工作表,或者每个部门整理的工作表汇总为一份总的工作表.读者需要注意的是,多个工作表的拆分与合并,始终在一个 ...

  4. dataframe两个表合并_Part25:Pandas基础(Series,DataFrame类的创建、索引、切片、算术方法)...

    一.为什么学习pandas numpy已经可以帮助我们进行数据的处理了,那么学习pandas的目的是什么呢? numpy能够帮助我们处理的是数值型的数据,当然在数据分析中除了数值型的数据还有好多其他类 ...

  5. dataframe两个表合并_R语言读取多个excel文件后合并:rbind/merge/cmd合并

    问题: 该文件夹下的三个文件理论上列名完全一致,但是在用rbind合并时却提示上述错误,(只能理解为列名确实是不一致导致的问题)解决如下: (1)首先,rbind和cbind()也适用于datafra ...

  6. dataframe两个表合并_DAXSQLPython实现报表项目存在串行的财务报表合并

       点击"数字化审计",可以关注哦! 开篇啰嗦话 在前文"Dax实战-多年财务报表项目存在串行的合并"中,我为你介绍了多年财务报表合并过程中问题的起因和解决串 ...

  7. pandas计算dataframe两列数据值相等的行号、取出DataFrame中两列值相等的行号

    pandas计算dataframe两列数据值相等的行号.取出DataFrame中两列值相等的行号 目录 pandas计算dataframe两列数据值相等的行号.取出DataFrame中两列值相等的行号

  8. oracle横向合并两张表,SQL中将两个表合并成一个新表 SQL如何合并两个表并生成一个新表?...

    sql把两个表合并成一个新表的语句应该怎么写SQL SERVER: select * into 表3 from ( select 字段列表1 from 表1 union all select 字段列表 ...

  9. mysql 两张表合并查询_mysql中的分区表和合并表详解(一个常见知识点)

    分区表是mysql5.1之后的新特性,合并表已经存在很长时间了.这篇文章主要介绍这两个概念以及他们基本的操作. 一.合并表 合并表说实话是一种将要被淘汰的技术,但是掌握了合并表的概念再去看分区表就比较 ...

最新文章

  1. spring boot 转xml格式报错解决方法_芋道 Spring Boot MyBatis 入门(一)之 MyBatis + XML...
  2. 【若依(ruoyi)】打印bootstrapTable数据
  3. 【Linux系统编程】线程与进程的比较
  4. 修改mysql锁空间大小
  5. Java关键字——final的用法
  6. DllRegisterServer的调用失败的问题解决方法
  7. 全球嵌入式技术与 IoT 产业回顾与展望 | 技术头条
  8. Ext JS学习第八天 Ext基础之 认识Ext.js 和Ext-more.js
  9. finereport9.0破解版|finereport10.0破解并发数|finereport授权注册|FineBI5.0破解lic
  10. RS232标准DB9接口定义
  11. 高手的思维与打法:灰度思维,黑白决策
  12. jpa findOne()用法
  13. c1能力认证考试训练任务03-web基础与布局
  14. Zimbra部署https证书的操作方法
  15. 大数据世界中的新技术
  16. 函数型数据主成分分析(FPCA)
  17. 前端跨域问题—解决Firefox浏览器显示“已阻止载入混合活动内容”的方法
  18. spring定时任务:@Scheduled
  19. 读书笔记-企业的股权结构
  20. Solidworks模型导入Vrep(CoppeliaSim Edu)

热门文章

  1. 贪心算法——国王游戏
  2. dvcs-ripper安装教程
  3. 信息系统监理师视频教程
  4. HPUX系统启动后主机名为unknown的解决办法
  5. 表单验证-通用表单验证大全/通用表单验证函数收集
  6. 系统集成项目管理(软考中级)第一章信息化知识
  7. Cocos Creator用cc.loader加载预制体资源和删除预制体资源
  8. HTML5期末大作业:旅游网页设计与实现——旅游风景区网站HTML+CSS+JavaScript 景点静态网页设计 学生DW静态网页设计...
  9. 扩展activiti 支持任意属性扩展
  10. 什么是Vue,Vue的作用与原理?