导读

昨日推文PySpark环境搭建和简介,今天开始介绍PySpark中的第一个重要组件SQL/DataFrame,实际上从名字便可看出这是关系型数据库SQL和pandas.DataFrame的结合体,功能也几乎恰是这样,所以如果具有良好的SQL基本功和熟练的pandas运用技巧,学习PySpark SQL会感到非常熟悉和舒适。

惯例开局一张图

01 PySpark SQL简介

前文提到,Spark是大数据生态圈中的一个快速分布式计算引擎,支持多种应用场景。例如Spark core中的RDD是最为核心的数据抽象,定位是替代传统的MapReduce计算框架;SQL是基于RDD的一个新的组件,集成了关系型数据库和数仓的主要功能,基本数据抽象是DataFrame,与pandas.DataFrame极为相近,适用于体量中等的数据查询和处理。

那么,在已经有了RDD的基础上,Spark为什么还要推出SQL呢?为此,Spark团队还专门为此发表论文做以介绍,原文可查找《Spark SQL: Relational Data Processing in Spark》一文。这里只节选其中的关键一段:

核心有两层意思,一是为了解决用户从多种数据源(包括结构化、半结构化和非结构化数据)执行数据ETL的需要;二是满足更为高级的数据分析需求,例如机器学习、图处理等。而为了实现这一目的,Spark团队推出SQL组件,一方面满足了多种数据源的处理问题,另一方面也为机器学习提供了全新的数据结构DataFrame(对应ml子模块)。

了解了Spark SQL的起源,那么其功能定位自然也十分清晰:基于DataFrame这一核心数据结构,提供类似数据库和数仓的核心功能,贯穿大部分数据处理流程:从ETL到数据处理到数据挖掘(机器学习)。

注:由于Spark是基于scala语言实现,所以PySpark在变量和函数命名中也普遍采用驼峰命名法(首单词小写,后面单次首字母大写,例如someFunction),而非Python中的蛇形命名(各单词均小写,由下划线连接,例如some_funciton)

02 几个重要的类为了支撑上述功能需求和定位,PySpark中核心的类主要包括以下几个:

  • SparkSession:从名字可以推断出这应该是为后续spark各种操作提供了一个session会话环境,具体来说接收一个SparkContext对象作为输入,建立Spark SQL的主入口。SparkSession之于SQL的地位恰如SparkContext之于Spark的地位一样,都是提供了核心入口点。这里,直白的理解就是SparkContext相当于是Spark软件和集群硬件之间的"驱动",SparkContext就是用来管理和调度这些资源的;而SparkSession则是在SQL端对集群资源的进一步调度和分发。按照惯例,建立SparkSession流程和命名规范如下:

from pyspark import SparkContextfrom pyspark.sql import SparkSessionsc = SparkContext()spark = SparkSession(sc)
  • DataFrame:是PySpark SQL中最为核心的数据结构,实质即为一个二维关系表,定位和功能与pandas.DataFrame以及R语言中的data.frame几乎一致。最大的不同在于pd.DataFrame行和列对象均为pd.Series对象,而这里的DataFrame每一行为一个Row对象,每一列为一个Column对象

  • Row:是DataFrame中每一行的数据抽象

  • Column:DataFrame中每一列的数据抽象

  • types:定义了DataFrame中各列的数据类型,基本与SQL中的数据类型同步,一般用于DataFrame数据创建时指定表结构schema

  • functions:这是PySpark SQL之所以能够实现SQL中的大部分功能的重要原因之一,functions子类提供了几乎SQL中所有的函数,包括数值计算、聚合统计、字符串以及时间函数等4大类,后续将专门予以介绍

  • Window:用于实现窗口函数功能,无论是传统关系型数据库SQL还是数仓Hive中,窗口函数都是一个大杀器,PySpark SQL自然也支持,重点是支持partition、orderby和rowsBetween三类操作,进而完成特定窗口内的聚合统计

注:这里的Window为单独的类,用于建立窗口函数over中的对象;functions子模块中还有window函数,其主要用于对时间类型数据完成重采样操作。

03 DataFrame

DataFrame是PySpark中核心的数据抽象和定义,理解DataFrame的最佳方式是从以下2个方面:

  • 是面向二维关系表而设计的数据结构,所以SQL中的功能在这里均有所体现

  • 无论是功能定位还是方法接口均与pd.DataFrame极为相似,所以部分功能又是仿照后者设计

换言之,记忆PySpark中的DataFrame只需对比SQL+pd.DataFrame即可。下面对DataFrame对象的主要功能进行介绍:

  • 数据读写及类型转换。

    1)创建DataFrame的方式主要有两大类:

    • 从其他数据类型转换,包括RDD、嵌套list、pd.DataFrame等,主要是通过spark.createDataFrame()接口创建

    • 从文件、数据库中读取创建,文件包括Json、csv等,数据库包括主流关系型数据库MySQL,以及数仓Hive,主要是通过sprak.read属性+相应数据源类型进行读写,例如spark.read.csv()用于读取csv文件,spark.read.jdbc()则可用于读取数据库

2)数据写入。与spark.read属性类似,.write则可用于将DataFrame对象写入相应文件,包括写入csv文件、写入数据库等

3)数据类型转换。DataFrame既然可以通过其他类型数据结构创建,那么自然也可转换为相应类型,常用的转换其实主要还是DataFrame=>rdd和DataFrame=>pd.DataFrame,前者通过属性可直接访问,后者则需相应接口:

df.rdd  # PySpark SQL DataFrame => RDDdf.toPandas()  # PySpark SQL DataFrame => pd.DataFrame
  • select:查看和切片

    这是DataFrame中最为常用的功能之一,用法与SQL中的select关键字类似,可用于提取其中一列或多列,也可经过简单变换后提取。同时,仿照pd.DataFrame中提取单列的做法,SQL中的DataFrame也支持"[]"或"."两种提取方式,但与select查看的最大区别在于select提取后得到的是仍然是一个DataFrame,而[]和.获得则是一个Column对象。例如:

df = spark.createDataFrame([("John", 17), ("Tom", 18)], schema=["name", "age"])df.select('name')  # DataFrame[name: string]df['name']  # Columndf.name  # Column

除了提取单列外,select还支持类似SQL中"*"提取所有列,以及对单列进行简单的运算和变换,具体应用场景可参考pd.DataFrame中赋值新列的用法,例如下述例子中首先通过"*"关键字提取现有的所有列,而后通过df.age+1构造了名字为(age+1)的新列。

df = spark.createDataFrame([("John", 17), ("Tom", 18)], schema=["name", "age"])df.select('*', df.age+1).show()"""+----+---+---------+|name|age|(age + 1)|+----+---+---------+|John| 17|       18|| Tom| 18|       19|+----+---+---------+"""
  • alias:起别名

    熟悉SQL语法的都知道as的用法,实际上as即为alias的简写,这里的alias的功能与as也完全一致,即对一个对象起别名,除了对单列起别名外也支持对整个DataFrame对象起别名

df.select('*', (df.age+1).alias('age1')).show()"""+----+---+----+|name|age|age1|+----+---+----+|John| 17|  18|| Tom| 18|  19|+----+---+----+"""
  • where/filter:条件过滤

    SQL中实现条件过滤的关键字是where,在聚合后的条件中则是having,而这在sql DataFrame中也有类似用法,其中filter和where二者功能是一致的:均可实现指定条件过滤。以下4种写法均可实现特定功能:

df.where(df.age==18).show()df.filter(df.age==18).show()df.where('age=18').show()df.filter('age=18').show()"""+----+---+|name|age|+----+---+| Tom| 18|+----+---+"""

值得指出的是在pandas.DataFrame中类似的用法是query函数,不同的是query()中表达相等的条件符号是"==",而这里filter或where的相等条件判断则是更符合SQL语法中的单等号"="。

  • groupby/groupBy:分组聚合

    分组聚合是数据分析中最为常用的基础操作,其基本用法也与SQL中的group by关键字完全类似,既可直接根据某一字段执行聚合统计,也可根据某一列的简单运算结果进行统计。groupby和groupBy是互为别名的关系,二者功能完全一致。之后所接的聚合函数方式也有两种:直接+聚合函数或者agg()+字典形式聚合函数,这与pandas中的用法几乎完全一致,所以不再赘述,具体可参考Pandas中groupby的这些用法你都知道吗?一文。这里补充groupby的两个特殊用法:

    • groupby+window时间开窗函数时间重采样,对标pandas中的resample

    • groupby+pivot实现数据透视表操作,对标pandas中的pivot_table

# 原始DataFramedf.show()"""+----+---+-------------------+|name|age|               time|+----+---+-------------------+|John| 17|2020-09-06 15:11:00|| Tom| 17|2020-09-06 15:12:00|| Joy| 17|2020-09-06 15:13:00|| Tim| 18|2020-09-06 15:16:00|+----+---+-------------------+"""#  gorupby+pivot实现数据透视表df.groupby(fn.substring('name', 1, 1).alias('firstName')).pivot('age').count().show()"""+---------+---+----+|firstName| 17|  18|+---------+---+----+|        T|  1|   1||        J|  2|null|+---------+---+----+"""#  window函数实现时间重采样df.groupby(fn.window('time', '5 minutes')).count().show()"""+--------------------+-----+|              window|count|+--------------------+-----+|[2020-09-06 15:10...|    3||[2020-09-06 15:15...|    1|+--------------------+-----+"""
  • orderBy/sort:排序

    orderby的用法与SQL中的用法也是完全一致的,都是根据指定字段或字段的简单运算执行排序,sort实现功能与orderby功能一致。接受参数可以是一列或多列(列表形式),并可接受是否升序排序作为参数。常规用法如下:

# 多列排序,默认升序df.sort('name', 'age').show()"""+----+---+-------------------+|name|age|               time|+----+---+-------------------+|John| 17|2020-09-06 15:11:00|| Joy| 17|2020-09-06 15:13:00|| Tim| 18|2020-09-06 15:16:00|| Tom| 17|2020-09-06 15:12:00|+----+---+-------------------+"""#  多列排序,并制定不同排序规则df.sort(['age', 'name'], ascending=[True, False]).show()"""+----+---+-------------------+|name|age|               time|+----+---+-------------------+| Tom| 17|2020-09-06 15:12:00|| Joy| 17|2020-09-06 15:13:00||John| 17|2020-09-06 15:11:00|| Tim| 18|2020-09-06 15:16:00|+----+---+-------------------+"""
  • join:表连接

    这也是一个完全等同于SQL中相应关键字的操作,并支持不同关联条件和不同连接方式,除了常规的SQL中的内连接、左右连接、和全连接外,还支持Hive中的半连接,可以说是兼容了数据库的数仓的表连接操作

  • union/unionAll:表拼接

    功能分别等同于SQL中union和union all,其中前者是去重后拼接,而后者则直接拼接,所以速度更快

  • limit:限制返回记录数

    与SQL中limit关键字功能一致

另外,类似于SQL中count和distinct关键字,DataFrame中也有相同的用法。

以上主要是类比SQL中的关键字用法介绍了DataFrame部分主要操作,而学习DataFrame的另一个主要参照物就是pandas.DataFrame,例如以下操作:

  • dropna:删除空值行

    实际上也可以接收指定列名或阈值,当接收列名时则仅当相应列为空时才删除;当接收阈值参数时,则根据各行空值个数是否达到指定阈值进行删除与否

  • dropDuplicates/drop_duplicates:删除重复行

    二者为同名函数,与pandas中的drop_duplicates函数功能完全一致

  • fillna:空值填充

    与pandas中fillna功能一致,根据特定规则对空值进行填充,也可接收字典参数对各列指定不同填充

  • fill:广义填充

  • drop:删除指定列

最后,再介绍DataFrame的几个通用的常规方法:

  • withColumn:在创建新列或修改已有列时较为常用,接收两个参数,其中第一个参数为函数执行后的列名(若当前已有则执行修改,否则创建新列),第二个参数则为该列取值,可以是常数也可以是根据已有列进行某种运算得到,返回值是一个调整了相应列后的新DataFrame

# 根据age列创建一个名为ageNew的新列df.withColumn('ageNew', df.age+100).show()"""+----+---+-------------------+------+|name|age|               time|ageNew|+----+---+-------------------+------+|John| 17|2020-09-06 15:11:00|   117|| Tom| 17|2020-09-06 15:12:00|   117|| Joy| 17|2020-09-06 15:13:00|   117|| Tim| 18|2020-09-06 15:16:00|   118|+----+---+-------------------+------+"""

注意到,withColumn实现的功能完全可以由select等价实现,二者的区别和联系是:withColumn是在现有DataFrame基础上增加或修改一列,并返回新的DataFrame(包括原有其他列),适用于仅创建或修改单列;而select准确的讲是筛选新列,仅仅是在筛选过程中可以通过添加运算或表达式实现创建多个新列,返回一个筛选新列的DataFrame,而且是筛选多少列就返回多少列,适用于同时创建多列的情况(官方文档建议出于性能考虑和防止内存溢出,在创建多列时首选select)

  • show:将DataFrame显示打印

实际上show是spark中的action算子,即会真正执行计算并返回结果;而前面的很多操作则属于transform,仅加入到DAG中完成逻辑添加,并不实际执行计算

  • take/head/tail/collect:均为提取特定行的操作,也属于action算子

另外,DataFrame还有一个重要操作:在session中注册为虚拟表,而后即可真正像执行SQL查询一样完成相应SQL操作。

df.createOrReplaceTempView('person')  # 将df注册为表名叫person的临时表spark.sql('select * from person').show()  # 通过sql接口在person临时表中执行SQL操作"""+----+---+-------------------+|name|age|               time|+----+---+-------------------+|John| 17|2020-09-06 15:11:00|| Tom| 17|2020-09-06 15:12:00|| Joy| 17|2020-09-06 15:13:00|| Tim| 18|2020-09-06 15:16:00|+----+---+-------------------+"""

04 sql.functions核心API

基于DataFrame可以实现SQL中大部分功能,同时为了进一步实现SQL中的运算操作,spark.sql还提供了几乎所有的SQL中的函数,确实可以实现SQL中的全部功能。按照功能,functions子模块中的功能可以主要分为以下几类:

  • 聚合统计类,也是最为常用的,除了常规的max、min、avg(mean)、count和sum外,还支持窗口函数中的row_number、rank、dense_rank、ntile,以及前文提到的可用于时间重采样的窗口函数window等

  • 数值处理类,主要是一些数学函数,包括sqrt、abs、ceil、floor、sin、log等

  • 字符串类,包括子字符串提取substring、字符串拼接concat、concat_ws、split、strim、lpad等

  • 时间处理类,主要是对timestamp类型数据进行处理,包括year、month、hour提取相应数值,timestamp转换为时间戳、date_format格式化日期、datediff求日期差等

这些函数数量较多,且与SQL中相应函数用法和语法几乎一致,无需全部记忆,仅在需要时查找使用即可。

05 总结

本文较为系统全面的介绍了PySpark中的SQL组件以及其核心数据抽象DataFrame,总体而言:该组件是PySpark中的一个重要且常用的子模块,功能丰富,既继承了Spark core中RDD的基本特点(算子和延迟执行特性),也是Spark.ml机器学习子模块的基础数据结构,其作用自然不言而喻。

与此同时,DataFrame学习成本并不高,大致相当于关系型数据库SQL+pandas.DataFrame的结合体,很多接口和功能都可以触类旁通。

相关阅读:

  • PySpark——开启大数据分析师之路

  • 数据科学系列:sklearn库主要模块简介

  • 数据科学系列:seaborn入门详细教程

  • 数据科学系列:pandas入门详细教程

  • 数据科学系列:matplotlib入门详细教程

  • 数据科学系列:numpy入门详细教程

dataframe groupby_PySpark SQL——SQL和pd.DataFrame的结合体相关推荐

  1. pd.DataFrame系列

    文章目录 如何让dataframe优雅的增加一列? 获取一个dataframe的后三列,0行到100行 读取excel,将结果写入到一个excel的多个sheet 筛选数据啦 DataFrame.gr ...

  2. pd.DataFrame.melt()函数

    对这个函数的理解就是二维变一维,就是逆序数列melt(self, id_vars=None, value_vars=None, var_name=None, value_name='value', c ...

  3. python读取数据库数据类型_Python实现从SQL型数据库读写dataframe型数据的方法【基于pandas】...

    本文实例讲述了Python实现从SQL型数据库读写dataframe型数据的方法.分享给大家供大家参考,具体如下: Python的pandas包对表格化的数据处理能力很强,而SQL数据库的数据就是以表 ...

  4. python读取数据库数据类型_Python中从SQL型数据库读写dataframe型数据

    Python的pandas包对表格化的数据处理能力很强,而SQL数据库的数据就是以表格的形式储存,因此经常将sql数据库里的数据直接读取为dataframe,分析操作以后再将dataframe存到sq ...

  5. pandas.DataFrame的类SQL操作

    前言 pandas的DataFrame是类似于一张表的结构,但是并没有像数据库表那样的SQL操作.虽然如此,它依然可以使用python语言的风格实现SQL中的所有操作. 文章较长,建议点击右侧目录定位 ...

  6. pyspark笔记(RDD,DataFrame和Spark SQL)

    https://github.com/QInzhengk/Math-Model-and-Machine-Learning PySpark RDD和DataFrame 1.SparkSession 介绍 ...

  7. pyspark操作 rdd dataframe,pyspark.sql.functions详解 行列变换

    官网文档可以参考:https://spark.apache.org/docs/latest/api/python/index.html dataframe读写 生成以逗号分隔的数据 stringCSV ...

  8. Spark SQL 1.3.0 DataFrame介绍、使用及提供了些完整的数据写入

     问题导读 1.DataFrame是什么? 2.如何创建DataFrame? 3.如何将普通RDD转变为DataFrame? 4.如何使用DataFrame? 5.在1.3.0中,提供了哪些完整的 ...

  9. spark sql 数据类型转换_spark dataframe 类型转换

    读一张表,对其进行二值化特征转换.可以二值化要求输入类型必须double类型,类型怎么转换呢? 直接利用spark column 就可以进行转换: DataFrame dataset= hive.sq ...

最新文章

  1. 【Qt】错误处理:error: undefined reference to `qMain(int, char**)‘
  2. 独家 | 教你用卷积神经网络对视觉神经元进行建模(附资源)
  3. 凯文·凯利:未来很美好,今天仍是Day1
  4. mysql 表名不加单引号_当表名“ match”没有用单引号引起来时,MySQL引发错误?...
  5. 【Python小脚本】实现王者农药自动刷金币啦~啦啦啦走跟我一起组队~
  6. 学好Python能做什么?有哪些从业方向?
  7. C++trie类的实现(附完整源码)
  8. Ubuntu如何搭建Django与Flup和Nginx环境?
  9. webpack学习笔记1
  10. 大一下学期的自我目标
  11. 拳王虚拟项目公社:自动化的虚拟资源产品,唱歌教程赚地盆满钵满
  12. shell中变量的引用(两种方式:$变量名,${变量名})。
  13. Java jar 包免费下载(全)
  14. 解决方法-SQLserver建表后更改列,显示不允许保存更改。您所做的更改要求删除并重新创建以下表
  15. srt字幕转ass字幕在线工具分享
  16. uilable 上面加子视图图
  17. html5超链接不加下划线,css怎么让超链接不加下划线
  18. 教你一招:Win10切换输入法与Win7一样(Ctrl + 空格)
  19. 定义一个复数类Complex,重载运算符“+”,“ -”,“*”,“/”使之能用于计算两个复数的加减乘除。
  20. 梦幻西游进入游戏显示服务器程序停止工作,win10系统提示“梦幻西游已停止工作”的设置教程...

热门文章

  1. 如何在Linux下使用Gitblit工具创建Git仓库服务
  2. 我的阿里梦——淘宝前端必备技能
  3. POJ 3368 Frequent values 线段树区间合并
  4. MyBatis 注释
  5. 基于SpringBoot的考研管理系统
  6. python 中 numpy 模块的 size,shape, len的用法
  7. JAVA-如何打包成jar包
  8. struts2框架之国际化(参考第二天学习笔记)
  9. How to Enable Trace or Debug for APIs executed as SQL Script Outside of the Applications ?
  10. PHP开发者应了解的24个库