而且,它们都是有用的

> Image by nickgesell from Pixabay

每天都在生成太多数据。

尽管有时我们可以使用Rapids或Parallelization等工具来管理大数据,但如果您使用的是TB级数据,Spark是一个很好的工具。

在上一篇关于Spark的文章中,我解释了如何使用PySpark RDD和Dataframe。

尽管这篇文章解释了如何使用RDD和基本的Dataframe操作,但是我在使用PySpark Dataframes时错过了很多东西。

只有当我需要更多功能时,我才阅读并提出多种解决方案来做一件事情。

如何在Spark中创建新列?

现在,这听起来微不足道,但请相信我,事实并非如此。 您可能想要处理这么多数据,所以我很确定您最终将在工作流中使用大多数这些列创建过程。 有时使用Pandas功能,有时使用基于RDD的分区,有时使用成熟的python生态系统。

这篇文章将是关于"在Pyspark Dataframe中创建新列的多种方法"。

如果您安装了PySpark,则可以跳过下面的"入门"部分。

Spark入门

我知道很多人不会在系统中安装Spark来尝试和学习。 但是安装Spark本身就是一件令人头疼的事情。

由于我们想了解它是如何工作的以及如何使用它,因此建议您在此处与社区版一起在线使用Databricks上的Spark。 不用担心,它是免费的,尽管资源较少,但是对于我们来说,出于学习目的,它现在就适用。

一旦注册并登录,将显示以下屏幕。

您可以在此处启动新笔记本。

选择Python笔记本,并为笔记本命名。

启动新笔记本并尝试执行任何命令后,笔记本将询问您是否要启动新群集。 做吧

下一步将检查sparkcontext是否存在。 要检查sparkcontext是否存在,您必须运行以下命令:

sc

这意味着我们已经设置了可以运行Spark的笔记本。

数据

在这里,我将处理Movielens ml-100k.zip数据集。 1000位用户观看1700部电影时获得100,000个评分。 在此压缩文件夹中,我们将专门使用的文件是评估文件。 该文件名保留为" u.data"

如果要上载此数据或任何数据,可以单击左侧的"数据"选项卡,然后使用提供的GUI添加数据。

然后,我们可以使用以下命令加载数据:

ratings = spark.read.load("/FileStore/tables/u.data",format="csv", sep="", inferSchema="true", header="false")ratings = ratings.toDF(*['user_id', 'movie_id', 'rating', 'unix_timestamp'])

外观如下:

ratings.show()

好的,现在我们准备开始我们感兴趣的部分。 如何在PySpark Dataframe中创建一个新列?

1.使用Spark本机函数

> Photo by Andrew James on Unsplash

在PySpark DataFrame中创建新列的最pysparkish方法是使用内置函数。 这是创建新列的最高效的编程方式,因此,这是我想进行某些列操作时首先要去的地方。

我们可以将.withcolumn与PySpark SQL函数一起使用来创建新列。 本质上,您可以找到已经使用Spark函数实现的String函数,Date函数和Math函数。 我们可以将spark函数导入为:

import pyspark.sql.functions as F

我们的第一个函数F.col函数使我们可以访问列。 因此,如果我们想将一栏乘以2,可以将F.col用作:

ratings_with_scale10 = ratings.withColumn("ScaledRating", 2*F.col("rating"))ratings_with_scale10.show()

我们还可以使用数学函数,例如F.exp函数:

ratings_with_exp = ratings.withColumn("expRating", 2*F.exp("rating"))ratings_with_exp.show()

此模块中提供了许多其他功能,足以满足大多数简单的用例。 您可以在此处查看功能列表。

2. Spark UDF

> Photo by Divide By Zero on Unsplash

有时我们想对一列或多列做复杂的事情。 可以将其视为对PySpark数据框到单列或多列的映射操作。 尽管Spark SQL函数确实解决了许多有关创建列的用例,但只要我想使用更成熟的Python功能时,我都会使用Spark UDF。

要使用Spark UDF,我们需要使用F.udf函数将常规的python函数转换为Spark UDF。 我们还需要指定函数的返回类型。 在此示例中,返回类型为StringType()

import pyspark.sql.functions as Ffrom pyspark.sql.types import *  def somefunc(value): if value < 3:   return 'low' else:   return 'high'#convert to a UDF Function by passing in the function and return type of function udfsomefunc = F.udf(somefunc, StringType())ratings_with_high_low = ratings.withColumn("high_low", udfsomefunc("rating"))ratings_with_high_low.show()

3.使用RDD

> Photo by Ryan Quintal on Unsplash

有时,Spark UDF和SQL函数对于特定用例而言都是不够的。 您可能想利用Spark RDD获得的更好的分区。 或者,您可能想在Spark RDD中使用组函数。 您可以使用此方法,主要是在需要访问python函数内部spark数据框中的所有列时。

无论如何,我发现使用RDD创建新列的这种方式对于有经验的RDD(这是Spark生态系统的基本组成部分)的人们非常有用。

下面的过程利用该功能在Row和pythondict对象之间进行转换。 我们将行对象转换为字典。 按照我们的习惯使用字典,然后将该字典再次转换回行。

import mathfrom pyspark.sql import Rowdef rowwise_function(row): # convert row to dict:   row_dict = row.asDict() # Add a new key in the dictionary with the new column name and value.   row_dict['Newcol'] = math.exp(row_dict['rating']) # convert dict to row:   newrow = Row(**row_dict) # return new row   return newrow# convert ratings dataframe to RDDratings_rdd = ratings.rdd# apply our function to RDD  ratings_rdd_new = ratings_rdd.map(lambda row: rowwise_function(row))# Convert RDD Back to DataFrameratings_new_df = sqlContext.createDataFrame(ratings_rdd_new)ratings_new_df.show()

4.Pandas UDF

> Photo by Pascal Bernardon on Unsplash

Spark版本2.3.1中引入了此功能。 这使您可以在Spark中使用Pands功能。 我通常在需要在Spark数据帧上运行groupby操作或需要创建滚动功能并想使用Pandas滚动功能/窗口功能的情况下使用它。

我们使用它的方式是使用F.pandas_udf装饰器。 我们在这里假设该函数的输入将是一个熊猫数据框。 我们需要从该函数依次返回一个Pandas数据框。

这里唯一的复杂性是我们必须为输出数据框提供一个架构。 我们可以使用以下格式来实现。

# Declare the schema for the output of our functionoutSchema = StructType([StructField('user_id',IntegerType(),True),StructField('movie_id',IntegerType(),True),StructField('rating',IntegerType(),True),StructField('unix_timestamp',IntegerType(),True),StructField('normalized_rating',DoubleType(),True)])# decorate our function with pandas_udf decorator@F.pandas_udf(outSchema, F.PandasUDFType.GROUPED_MAP)def subtract_mean(pdf):    # pdf is a pandas.DataFrame    v = pdf.rating    v = v - v.mean()    pdf['normalized_rating'] =v    return pdfrating_groupwise_normalization = ratings.groupby("movie_id").apply(subtract_mean)rating_groupwise_normalization.show()

我们还可以利用它在每个火花节点上训练多个单独的模型。 为此,我们复制数据并为每个复制提供一个键和一些训练参数,例如max_depth等。然后,我们的函数将使用熊猫Dataframe,运行所需的模型,然后返回结果。 结构如下所示。

# 0. Declare the schema for the output of our functionoutSchema = StructType([StructField('replication_id',IntegerType(),True),StructField('RMSE',DoubleType(),True)])# decorate our function with pandas_udf decorator@F.pandas_udf(outSchema, F.PandasUDFType.GROUPED_MAP)def run_model(pdf):    # 1. Get hyperparam values    num_trees = pdf.num_trees.values[0]    depth = pdf.depth.values[0]    replication_id = pdf.replication_id.values[0]    # 2. Train test split    Xtrain,Xcv,ytrain,ycv = train_test_split.....    # 3. Create model using the pandas dataframe    clf = RandomForestRegressor(max_depth = depth, num_trees=num_trees,....)    clf.fit(Xtrain,ytrain)    # 4. Evaluate the model    rmse = RMSE(clf.predict(Xcv,ycv)    # 5. return results as pandas DF    res =pd.DataFrame({'replication_id':replication_id,'RMSE':rmse})    return res                results = replicated_data.groupby("replication_id").apply(run_model)

以上只是一个想法,而不是一个有效的代码。 尽管应该稍作修改。

5.使用SQL

对于喜欢SQL的人,甚至可以使用SQL创建列。 为此,我们需要注册一个临时SQL表,然后使用带有附加列的简单选择查询。 一个人也可以用它来进行联接。

ratings.registerTempTable('ratings_table')newDF = sqlContext.sql('select *, 2*rating as newCol from ratings_table')newDF.show()

结论

> Photo by Kelly Sikkema on Unsplash

到此为止(双关语意)

希望我已经很好地介绍了列创建过程,以帮助您解决Spark问题。 如果您需要了解更多Spark基础知识,请查看:

不只是介绍

您可以在GitHub存储库或databricks上已发布的笔记本中找到此文章的所有代码。

另外,如果您想了解有关Spark和Spark DataFrames的更多信息,我想讲一门关于Big Data Essentials的优秀课程,这是Yandex提供的Big Data Specialization的一部分。

感谢您的阅读。 我将来也会写更多对初学者友好的文章。 在Medium中关注我或订阅我的博客以了解有关它们的信息。 与往常一样,我欢迎您提供反馈和建设性的批评,可以在Twitter @mlwhiz上与他们联系。

另外,这是一个小的免责声明-由于共享知识从来都不是一个坏主意,因此本文中可能会有一些与相关资源相关的会员链接。

(本文翻译自Rahul Agarwal的文章《5 Ways to add a new column in a PySpark Dataframe》,参考:https://towardsdatascience.com/5-ways-to-add-a-new-column-in-a-pyspark-dataframe-4e75c2fd8c08)

list vue 添加数据方法_在PySpark数据框中添加新列的5种方法相关推荐

  1. vb与matlab数据交互时间延时多长,VB中实现延时(等待)的几种方法

    在程序流程中经常要延时一段时间后再继续往下执行,在VB中常用的有以下几种方法(因为Timer控件打乱了程序的流程所以一般不用它): 1.使用Windows API函数Sleep 新建一个工程,添加一个 ...

  2. rails 添加外键_如何在Rails后端中添加功能强大的搜索引擎

    rails 添加外键 by Domenico Angilletta 通过多梅尼科·安吉列塔(Domenico Angilletta) In my experience as a Ruby on Rai ...

  3. java pdf添加透明水印_如何在PDF文件中添加透明水印

    原标题:如何在PDF文件中添加透明水印 有些文件添加水印,但是又不想水印影响文件的使用有时候会设置透明水印,那么PDF怎么设置透明水印呢,应该有很多的小伙伴们都很好奇应该怎么做吧,接下来就为大家分享一 ...

  4. mysql 添加表索引_如何向MySQL表中添加索引?

    如何向MySQL表中添加索引? 我有一个非常大的MySQL表,包含大约15万行数据.目前,当我试着运行SELECT * FROM table WHERE id = '1'; 代码运行良好,因为ID字段 ...

  5. 请描述定时器初值的计算方式_单片机C语言编程中定时器初值计算的两种方法...

    单片机C语言编程中,定时器的初值对于初学者真的是比较不好计算,因此我总结了以下几种方法. 第1种方法: #define FOSC 11059200L //晶振的频率 #define TIMS (655 ...

  6. 如何让自己时刻冷静的方法_怎么让心静下来(让心静下来的5种方法)

    每个人都知道无论你是在学习还是在工作,你都需要让自己冷静下来.只有你冷静下来才能思考问题并认真对待.然而,在生活中,人们的情绪会受到很多因素的影响.有时情绪变得非常烦人,没有办法冷静下来做事.那么解决 ...

  7. python添加重复元素_在Python 3.6中添加迭代时重复元素

    我正在尝试编写一部分代码,该代码从两个不同的列表中获取元素并进行匹配,如下所示,但是由于某种原因,我一直在输出列表中重复获取元素. def assign_tasks(operators, reques ...

  8. pandas中根据两列 或 多列进行条件对比,生成新列【三种方法】

    使用目的: 两列数量对比,收货比期望多,就是标记数量满足,否则就数量不满足 如果数量满足.日期满足,那么总体就标记满足,有一个不满足就总体标记不满足 第一种: .loc赋值 # 这里是先创建一个空列, ...

  9. Java中遍历Set集合的三种方法

    Map集合:链接: Map集合的五种遍历方式及Treemap方法 Set集合:链接: Java中遍历Set集合的三种方法 TreeSet集合:链接: Java深入了解TreeSet,和迭代器遍历方法 ...

  10. 深度学习数据自动编码器_如何学习数据科学编码

    深度学习数据自动编码器 意见 (Opinion) When I first wanted to learn programming, I coded along to a 4 hour long Yo ...

最新文章

  1. 为什么充电电流变化有点大
  2. Linux加载Linux的引导程序,Linux的引导-引导加载程序:LILO和GRUB
  3. sap生产工单报工_【案例】MES系统助力亨通电缆车间生产效率提升25%
  4. 应用程序级别之外使用注册为 allowDefinition='MachineToApplication' 的节是错误的(转载)...
  5. 选择WORD文档中的所有表格宏代码
  6. 投诉索尼中国:欺骗消费!敲诈消费者!
  7. 计算机专业课考研八月份复习,2016考研:计算机专业暑期复习攻略
  8. 如何在HTML中更改文本颜色?
  9. 自然语言处理领域国内外著名会议和期刊
  10. UINO优锘:DMV架构管理可视化,让架构图管理快速升级
  11. 如何用无桥图腾柱功率因数校正控制器实现出色的AC-DC功率转换效率
  12. 【计算机网络实验】笔记(实验一、二)
  13. 华为软开云7--部署
  14. self和[self class]
  15. 电除尘器 matlab,电除尘用高频高压直流叠加高压脉冲电源的制作方法
  16. react 断网提示
  17. Javascript中Window.open参数详解
  18. VMWare建立于W10的共享文件夹
  19. 计算机专业毕业论文摘要,计算机科学与技术专业论文摘要怎么写 计算机科学与技术专业论文摘要范文参考...
  20. 史上最详细的MongoDB操作命令大全

热门文章

  1. mysql如何更新一个表中的某个字段值等于另一个表的某个字段值
  2. Android -- Exif
  3. POJ 2488 A Knight's Journey
  4. API(应用程序编程接口)
  5. 9.使用 curses 函数库来管理基于文本的屏幕
  6. 2.tcpdump(2)
  7. 这是一个定时器,定时执行一次,用在定时发送邮件
  8. Data Science With R In Visual Studio
  9. 创建一个jFinal项目
  10. 《OD学hadoop》第二周0703