PySpark | SparkSQL入门 | DataFrame入门
文章目录
- 一、快速入门
- 1. 什么是SparkSQL
- 2. 为什么要学习SparkSQL
- 3. SparkSQL特点
- 二、SparkSQL概述
- 1. SparkSQL和Hive的异同
- 2. SparkSQL的数据抽象
- 3. DataFrame数据抽象
- 4. SparkSession对象
- 6. SparkSQL HelloWorld
- 三、DataFrame入门和操作
- 1. DataFrame的组成
- 2. DataFrame的代码构建
- 2.1 基于RDD的方式1
- 2.2 基于RDD的方式2
- 2.3 基于RDD的方式3
- 2.4 基于pandas的DataFrame
- 2.5 读取外部数据
- 3. DataFrame的入门操作
- 3.1 DSL风格下的API总结
- 4. 词频统计案例
- 5. 电影评分数据分析
- 6. SparkSQL Shuffle 分区数目
- 7. SparkSQL 数据清洗API
- 7.1 数据去重:dropDuplicates
- 7.2 缺失值处理——dropna
- 7.3 缺失值填充——fillna
- 8. DataFrame数据写出
- 9. DataFrame 通过JDBC读写数据库
- 10. 总结
传送门:
- 视频地址:黑马程序员Spark全套视频教程
- 1.PySpark基础入门(一)
- 2.PySpark基础入门(二)
- 3.PySpark核心编程(一)
- 4.PySpark核心编程(二)
- 5.PySaprk——SparkSQL学习(一)
- 6.PySaprk——SparkSQL学习(二)
- 7.Spark综合案例——零售业务统计分析
- 8. Spark3新特性及核心概念(背)
一、快速入门
1. 什么是SparkSQL
SparkSQL是Spark的一个模块,用于处理海量结构化数据。
限定:结构化数据处理。
RDD算子可以处理结构化数据,非结构化数据,半结构化数据
2. 为什么要学习SparkSQL
SparkSQL是非常成熟的海量结构化数据处理框架。学习SparkSQL主要在2个点:
- SparkSQL本身十分优秀, 支持SQL语言\性能强\可以自动优化\API简单\兼容HIVE等
- 企业大面积在使用SparkSQL处理业务数据
- 离线开发
- 数仓搭建
- 科学计算
- 数据分析
3. SparkSQL特点
- 融合性
SQL可以无缝集成在代码中,随时用SQL处理数据 - 统一数据访问
一套标准API可读写不同数据源 - Hive兼容
可以使用SparkSQL直接计算并生成Hive数据表 - 标准化连接
支持标准化JDBC\ODBC连接,方便和各种数据库进行数据交互。
二、SparkSQL概述
1. SparkSQL和Hive的异同
相同点:
- Hive和Spark 均是:“分布式SQL计算引擎”,均是构建大规模结构化数据计算的绝佳利器。
不同点:
2. SparkSQL的数据抽象
SparkSQL的DataFrame底层借鉴了Pandas的DataFrame,是二维表数据结构、分布式集合(分区)。SparkSQL现在使用的有2类数据抽象对象:
- DataSet对象,可用于Java、Scala语言
DataSet支持泛型特性(Python语言没有泛型特性),可以让Java、Scala语言更好的利用到 - DataFrame对象,可用于Java、Scala、Python、R语言
我们以Python开发SparkSQL,主要使用的就是DataFrame对象作为核心数据结构。
3. DataFrame数据抽象
DataFrame和RDD都是:弹性的、分布式的数据集。只是,DataFrame存储的数据结构“限定”为:二维表结构化数据而RDD可以存储的数据则没有任何限制,想处理什么就处理什么。
4. SparkSession对象
在RDD阶段,程序的执行入口对象是: SparkContext对象。在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。
SparkSession对象可以:
- 用于SparkSQL编程作为入口对象
- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
所以,我们后续的代码,执行环境入口对象,统一变更为SparkSession对象。
6. SparkSQL HelloWorld
#!usr/bin/env python
# -*- coding:utf-8 -*-"""SparkSession入口对象作为SQL的编程入口SparkContext入口对象作为RDD的编程入口
"""# SparkSession对象的导包,SparkSQL 中的入口对象是SparkSession对象
from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建SparkSession入口对象# appName 设置程序名称, config设置一些常用属性spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()# 1.通过SparkSession对象获取SparkContext对象sc = spark.sparkContext# SparkSQL的HelloWord# 读取数据df = spark.read.csv('../data/input/stu_score.txt', sep=',', header=False)# 设置列名df2 = df.toDF('id', 'name', 'score')df2.printSchema() # 打印表结构df2.show() # 打印表内容df2.createTempView('score')# SQL风格spark.sql("""SELECT * FROM score WHERE name='语文' LIMIT 5""").show()# DSL风格df2.where("name='语文'").limit(5).show()
root|-- id: string (nullable = true)|-- name: string (nullable = true)|-- score: string (nullable = true)+---+----+-----+
| id|name|score|
+---+----+-----+
| 1|语文| 99|
| 2|语文| 99|
| 3|语文| 99|
| 4|语文| 99|
| 5|语文| 99|
| 6|语文| 99|
| 7|语文| 99|
| 8|语文| 99|
| 9|语文| 99|
| 10|语文| 99|
| 11|语文| 99|
| 12|语文| 99|
| 13|语文| 99|
| 14|语文| 99|
| 15|语文| 99|
| 16|语文| 99|
| 17|语文| 99|
| 18|语文| 99|
| 19|语文| 99|
| 20|语文| 99|
+---+----+-----+
only showing top 20 rows+---+----+-----+
| id|name|score|
+---+----+-----+
| 1|语文| 99|
| 2|语文| 99|
| 3|语文| 99|
| 4|语文| 99|
| 5|语文| 99|
+---+----+-----++---+----+-----+
| id|name|score|
+---+----+-----+
| 1|语文| 99|
| 2|语文| 99|
| 3|语文| 99|
| 4|语文| 99|
| 5|语文| 99|
+---+----+-----+
三、DataFrame入门和操作
1. DataFrame的组成
DataFrame是一个二维表结构, 那么表格结构就有无法绕开的三个点:
- 行
- 列
- 表结构描述
基于这个前提,DataFrame的组成如下:
在结构层面:
- StructType对象描述整个DataFrame的表结构
- StructField对象描述一个列的信息
在数据层面
- Row对象记录一行数据
- Column对象记录一列数据并包含列的信息
如图,在表结构层面,DataFrame的表结构由StructType对象来描述,如下图:
一个StructField记录:列名、列类型、列是否为空。多个StructField组成一个StructType对象。一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空。
2. DataFrame的代码构建
2.1 基于RDD的方式1
DataFrame对象可以从RDD转换而来,都是分布式数据集,就是转换一下内部存储的结构,转换为二维表结构。通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame。这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)。
#!usr/bin/env python
# -*- coding:utf-8 -*-
# TODO:基于RDD的方式构建DataFrame对象
from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 1.基于RDD转换成DataFramerdd = sc.textFile('../data/input/sql/people.txt'). \map(lambda x: x.split(',')). \map(lambda x: (x[0], int(x[1])))# 2. 构建DataFrame对象# 参数1:被转换的RDD;参数2指定列名,通过list的形式指定,按照顺序依次提供字符串名称df = spark.createDataFrame(rdd, schema=['name', 'age'])# 打印DataFrame表结构df.printSchema()# 打印DataFrame数据# 参数1表示展示多少条数据,默认不传的话是20# 参数2表示是否对列进行截断,如果列的数据长度超过20个字符串程度,后续的内容不显示,以...代替# 如果给False,表示全部显示(不截断),默认是Truedf.show()# 将DF对象转换为临时视图表,可供sql语句查询df.createOrReplaceTempView('people')spark.sql("SELECT * FROM people WHERE age < 30").show()
root|-- name: string (nullable = true)|-- age: long (nullable = true)+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---++-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Justin| 19|
+-------+---+
2.2 基于RDD的方式2
通过StructType对象来定义DataFrame的“表结构”转换RDD。
#!usr/bin/env python
# -*- coding:utf-8 -*-
"""基于RDD的方式构建DataFrame对象,通过StructType对象来定义DataFrame的“表结构”转换RDD
"""from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerTypeif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 1.基于RDD转换成DataFramerdd = sc.textFile('../data/input/sql/people.txt'). \map(lambda x: x.split(',')). \map(lambda x: (x[0], int(x[1])))# 2. 构建表结构描述对象——StructType对象schema = StructType().\add("name", StringType(), nullable=True). \add("age", IntegerType(), nullable=False)# 3.基于StructType对象去构建RDD到DF的转换df = spark.createDataFrame(rdd, schema=schema)df.printSchema()df.show()
root|-- name: string (nullable = true)|-- age: integer (nullable = false)+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
2.3 基于RDD的方式3
使用RDD的toDF方法转换RDD。
#!usr/bin/env python
# -*- coding:utf-8 -*-
"""基于RDD的方式构建DataFrame对象,使用RDD的toDF方法转换RDD
"""from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerTypeif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 1.基于RDD转换成DataFramerdd = sc.textFile('../data/input/sql/people.txt'). \map(lambda x: x.split(',')). \map(lambda x: (x[0], int(x[1])))# TODO:toDF的方式构建DataFramedf1 = rdd.toDF(['name','age'])df1.printSchema()df1.show()# TODO:通过表结构描述对象——StructType对象的方式构建DataFrameschema = StructType().\add("name", StringType(), nullable=True). \add("age", IntegerType(), nullable=False)df2 = rdd.toDF(schema=schema)df2.printSchema()df2.show()
root|-- name: string (nullable = true)|-- age: long (nullable = true)+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+root|-- name: string (nullable = true)|-- age: integer (nullable = false)+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
2.4 基于pandas的DataFrame
将Pandas的DataFrame对象,转变为分布式的SparkSQL的DataFrame对象。
#!usr/bin/env python
# -*- coding:utf-8 -*-
"""基于pandas的DataFrame构建SparkSQL的DataFrame
"""from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pdif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# TODO:基于pandas的DataFrame构建SparkSQL的DataFramepdf = pd.DataFrame({'id': [1, 2, 3],'name': ['张大仙', '王晓晓', '吕不韦'],'age': [11, 12, 13]})df = spark.createDataFrame(pdf)df.printSchema()df.show()
root|-- id: long (nullable = true)|-- name: string (nullable = true)|-- age: long (nullable = true)+---+------+---+
| id| name|age|
+---+------+---+
| 1|张大仙| 11|
| 2|王晓晓| 12|
| 3|吕不韦| 13|
+---+------+---+
2.5 读取外部数据
通过SparkSQL的统一API进行数据读取构建DataFrame。
统一API示例代码:
sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......").option("K", "V") # option可选.schema(StructType | String) # STRING的语法如.schema("name STRING", "age INT").load("被读取文件的路径, 支持本地文件系统和HDFS")
读取text数据源
读取text数据源,使用format(“text”)读取文本数据。读取数据的特点是:将一整行只作为一个列读取,列名默认称之为value,类型是String#!usr/bin/env python # -*- coding:utf-8 -*- """DataFrame的代码构建 - 读取外部数据通过SparkSQL的统一API进行数据读取构建DataFrame """from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType import pandas as pdif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 构建StructType对象schema = StructType().add('data', StringType(), nullable=True)# read读取外部数据df = spark.read.format('text'). \schema(schema=schema). \load('../data/input/sql/people.txt')df.printSchema()df.show()
root|-- data: string (nullable = true)+-----------+ | data| +-----------+ |Michael, 29| | Andy, 30| | Justin, 19| +-----------+
读取json数据源
使用format(“json”)读取json数据#!usr/bin/env python # -*- coding:utf-8 -*- """DataFrame的代码构建 - 读取外部数据通过SparkSQL的统一API进行数据读取构建DataFrame """from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType import pandas as pdif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# JSON数据自带schema信息df = spark.read.format('json').load('../data/input/sql/people.json')df.printSchema()df.show()
root|-- age: long (nullable = true)|-- name: string (nullable = true)+----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
读取csv数据源
使用format(“csv”)读取csv数据#!usr/bin/env python # -*- coding:utf-8 -*- """DataFrame的代码构建 - 读取外部数据通过SparkSQL的统一API进行数据读取构建DataFrame """from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType import pandas as pdif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 读取CSV文件df = spark.read.format('csv'). \option('sep', ';'). \ option('header', True). \option('encoding', 'utf-8'). \schema('name STRING, age INT, job STRING'). \load('../data/input/sql/people.csv')df.printSchema()df.show()
root|-- name: string (nullable = true)|-- age: integer (nullable = true)|-- job: string (nullable = true)+-----+----+---------+ | name| age| job| +-----+----+---------+ |Jorge| 30|Developer| | Bob| 32|Developer| | Ani| 11|Developer| | Lily| 11| Manager| | Put| 11|Developer| |Alice| 9| Manager| |Alice| 9| Manager| |Alice| 9| Manager| |Alice| 9| Manager| |Alice|null| Manager| |Alice| 9| null| +-----+----+---------+
读取parquet数据源
使用format(“parquet”)读取parquet数据
parquet: 是Spark中常用的一种列式存储文件格式。parquet对比普通的文本文件的区别:- parquet 内置schema (列名\ 列类型\ 是否为空)
- 存储是以列作为存储格式
- 存储是序列化存储在文件中的(有压缩属性体积小)
#!usr/bin/env python # -*- coding:utf-8 -*- """DataFrame的代码构建 - 读取外部数据通过SparkSQL的统一API进行数据读取构建DataFrame """from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType import pandas as pdif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 读取parquet文件df = spark.read.format('parquet').\load('../data/input/sql/users.parquet')df.printSchema()df.show()
root|-- name: string (nullable = true)|-- favorite_color: string (nullable = true)|-- favorite_numbers: array (nullable = true)| |-- element: integer (containsNull = true)+------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
Parquet文件不能直接打开查看,如果想要查看内容,可以在PyCharm中安装如下插件来查看:
3. DataFrame的入门操作
DataFrame支持两种风格进行编程,分别是:
DSL风格
DSL语法风格,可称之为:领域特定语言。其实就是指DataFrame的特有API。DSL风格意思就是以调用API的方式来处理Data。比如:df.where().limit()
#!usr/bin/env python # -*- coding:utf-8 -*- """DSL入门 """from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType import pandas as pdif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 1.读取txt数据df = spark.read.format('csv'). \schema('id INT,subject STRING,score INT'). \load('../data/input/sql/stu_score.txt')# TODO:select API演示# 获取column对象id_column = df['id']subject_column = df['subject']df.select(['id', 'subject']).show()df.select('id', 'subject').show()df.select([id_column, subject_column]).show()df.select(id_column, subject_column).show()# TODO:filter API演示df.filter('score < 99').show()df.filter(df['score'] < 99).show()# TODO:groupBy API# groupBy API的返回值是GroupedData对象,是一个有分组关系的数据结构,调用聚合方法后返回值是DataFramedf.groupBy('subject').count().show()df.groupBy(df['subject']).count().show()
SQL风格
SQL风格就是使用SQL语句处理DataFrame的数据,比如:spark.sql(“SELECT * FROM xxx)
。DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:
- 全局表:跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询前带上前缀:
global_temp
- 临时表:只在当前SparkSession中可用
#!usr/bin/env python # -*- coding:utf-8 -*- """SQL入门 """from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType import pandas as pdif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 1.读取txt数据df = spark.read.format('csv'). \schema('id INT,subject STRING,score INT'). \load('../data/input/sql/stu_score.txt')# 2.注册成临时表df.createTempView('score') # 注册临时视图(表)df.createOrReplaceTempView('score_2') # 注册或替换临时视图(表)df.createGlobalTempView('score_3') # 注册全局临时视图,全局临时视图在使用的时候,需要在浅看带上global——temp前缀# 3.通过SparkSession对象的spark API完成aql语句的执行spark.sql("SELECT subject,COUNT(*) AS cnt FROM score GROUP BY subject").show()spark.sql("SELECT subject,COUNT(*) AS cnt FROM score_2 GROUP BY subject").show()spark.sql("SELECT subject,COUNT(*) AS cnt FROM global_temp.score_3 GROUP BY subject").show()
+-------+---+ |subject|cnt| +-------+---+ | 英语| 30| | 语文| 30| | 数学| 30| +-------+---++-------+---+ |subject|cnt| +-------+---+ | 英语| 30| | 语文| 30| | 数学| 30| +-------+---++-------+---+ |subject|cnt| +-------+---+ | 英语| 30| | 语文| 30| | 数学| 30| +-------+---+
- 全局表:跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询前带上前缀:
3.1 DSL风格下的API总结
- show方法:DataFrame的API,展示DataFrame中的数据, 默认展示20条
- printSchema方法:DataFrame的API,打印输出df的schema信息
- select方法:DataFrame的API,选择DataFrame中的指定列(通过传入参数进行指定)
- filter和where方法:DataFrame的API,过滤DataFrame内的数据,返回一个过滤后的DataFrame
- groupby分组:DataFrame的API,按照指定的列进行数据的分组, 返回值是GroupedData对象,后续可以使用min、max、avg、sum、count方法进行分组聚合
- agg: 它是GroupedData对象的API, 作用是 在里面可以写多个聚合
- alias: 它是Column对象的API, 可以针对一个列 进行改名
- withColumnRenamed: 它是DataFrame的API,可以对DF中的列进行改名, 一次改一个列,改多个列 可以链式调用
- orderBy: DataFrame的API,进行排序,参数1是被排序的列,参数2是 升序(True) 或 降序 False
- first: DataFrame的API, 取出DF的第一行数据, 返回值结果是Row对象
Row对象 就是一个数组, 你可以通过row[‘列名’] 来取出当前行中, 某一列的具体数值. 返回值不再是DF 或者GroupedData 或者Column而是具体的值(字符串, 数字等)
4. 词频统计案例
PySpark提供了一个包: pyspark.sql.functions
。这个包里面提供了 一系列的计算函数供SparkSQL使用。这些功能函数,返回值多数都是Column对象。
#!usr/bin/env python
# -*- coding:utf-8 -*-
"""WordCount
"""from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# TODO 1:SQL风格进行处理rdd = sc.textFile('../data/input/words.txt'). \flatMap(lambda x: x.split(' ')). \map(lambda x: [x])# 使用RDD的toDF方法,将RDD转换为DataFramedf = rdd.toDF(['word'])# 注册DF为临时视图df.createTempView('words')spark.sql("SELECT word, COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC").show()# TODO 2: DSL风格# 读取text数据源,将一整行只作为一列读取df = spark.read.format('text'). \load('../data/input/words.txt')# withColumn方法# 方法功能:对已存在的列进行操作,返回一个新的列,如果名字与老列相同,那么替换;否则,作为新列存在df2 = df.withColumn('value', F.explode(F.split(df['value'], ' ')))df2.groupBy('value'). \count(). \withColumnRenamed('value', 'word'). \withColumnRenamed('count', 'cnt'). \orderBy('cnt', ascending=False). \show()
+------+---+
| word|cnt|
+------+---+
| hello| 3|
| spark| 1|
| flink| 1|
|hadoop| 1|
+------+---++------+---+
| word|cnt|
+------+---+
| hello| 3|
| spark| 1|
|hadoop| 1|
| flink| 1|
+------+---+
5. 电影评分数据分析
MovieLens数据集包含多个用户对多部电影的评级数据,也包括电影元数据信息和用户属性信息。
下载地址:
http://files.grouplens.org/datasets/movielens/介绍
下面以ml-100k数据集为例进行介绍,下载u.data
文件。由943个用户对1682个电影的10000条评分组成。每个用户至少评分20部电影。用户和电影从1号开始连续编号。数据是随机排序的。
需求:
- 查询用户平均分
- 查询电影平均分
- 查询大于平均分的电影的数量
- 查询高分电影中(>3打分次数最多的用户,并求出此人打的平均分
- 查询每个用户的平均打分,最低打分,最高打分
- 查询被评分超过100次的电影的平均分排名TOP10
代码:
#!usr/bin/env python # -*- coding:utf-8 -*- """电影评分数据分析1. 查询用户平均分2. 查询电影平均分3. 查询大于平均分的电影的数量4. 查询高分电影中(>3打分次数最多的用户,并求出此人打的平均分5. 查询每个用户的平均打分,最低打分,最高打分6. 查询被评分超过100次的电影的平均分排名TOP10 """from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType import pandas as pd from pyspark.sql import functions as Fif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 1.读取数据集schema = StructType().add('user_id', StringType(), nullable=True). \add('movie_id', IntegerType(), nullable=True). \add('rank', IntegerType(), nullable=True). \add('ts', StringType(), nullable=True)df = spark.read.format('csv'). \option('sep', '\t'). \option('header', False). \option('encoding', 'utf-8'). \schema(schema=schema). \load('../data/input/sql/u.data')# TODO 1:用户平均分计算# DSL风格df.groupBy('user_id'). \avg('rank'). \withColumnRenamed('avg(rank)', 'avg_rank'). \withColumn('avg_rank', F.round('avg_rank', 2)). \orderBy('avg_rank', ascending=False). \show()# # SQL风格# df.createTempView('user')# spark.sql("""# SELECT user_id, ROUND(AVG(rank),2) AS avg_rank FROM user GROUP BY user_id ORDER BY avg_rank DESC# """).show()# TODO 2:电影平均分计算# DSL风格df.groupBy('movie_id'). \avg('rank'). \withColumnRenamed('avg(rank)', 'avg_rank'). \withColumn('avg_rank', F.round('avg_rank', 2)). \orderBy('avg_rank', ascending=False). \show()# # SQL风格# df.createTempView('movie')# spark.sql("""# SELECT movie_id, ROUND(AVG(rank),2) AS avg_rank FROM movie GROUP BY movie_id ORDER BY avg_rank DESC# """).show()# TODO 3:查询大于平均分的电影的数量# DSL风格print("大于平均分电影的数量:", df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count())# TODO 4:查询高分电影中(>3),打分次数最多的用户,并求出此人打的平均分user_id = df.where('rank > 3'). \groupBy('user_id'). \count(). \withColumnRenamed('count', 'cnt'). \orderBy('cnt', ascending=False). \limit(1). \first()['user_id']# 计算这个人的打分平均分df.filter(df['user_id'] == user_id). \select(F.round(F.avg('rank'), 2)).show()# TODO 5:查询每个用户的平均打分,最低打分,最高打分df.groupBy('user_id'). \agg(F.round(F.avg('rank'), 2).alias('avg_rank'),F.min('rank').alias('min_rank'),F.max('rank').alias('max_rank'),).show()# TODO 6: 查询被评分超过100次的电影,同时平均分排名TOP10df.groupBy('movie_id'). \agg(F.count('movie_id').alias('cnt'),F.round(F.avg('rank'), 2).alias('avg_rank')).where('cnt > 100'). \orderBy('avg_rank', ascending=False). \limit(10). \show()
+-------+--------+ |user_id|avg_rank| +-------+--------+ | 849| 4.87| | 688| 4.83| | 507| 4.72| | 628| 4.7| | 928| 4.69| | 118| 4.66| | 907| 4.57| | 686| 4.56| | 427| 4.55| | 565| 4.54| | 469| 4.53| | 850| 4.53| | 225| 4.52| | 330| 4.5| | 477| 4.46| | 636| 4.45| | 242| 4.45| | 583| 4.44| | 252| 4.43| | 767| 4.43| +-------+--------+ only showing top 20 rows+--------+--------+ |movie_id|avg_rank| +--------+--------+ | 1653| 5.0| | 1122| 5.0| | 1467| 5.0| | 1201| 5.0| | 1189| 5.0| | 1293| 5.0| | 1599| 5.0| | 1536| 5.0| | 814| 5.0| | 1500| 5.0| | 1449| 4.63| | 1398| 4.5| | 1594| 4.5| | 119| 4.5| | 1642| 4.5| | 408| 4.49| | 169| 4.47| | 318| 4.47| | 483| 4.46| | 64| 4.45| +--------+--------+ only showing top 20 rows大于平均分电影的数量: 55375 +-------------------+ |round(avg(rank), 2)| +-------------------+ | 3.86| +-------------------++-------+--------+--------+--------+ |user_id|avg_rank|min_rank|max_rank| +-------+--------+--------+--------+ | 296| 4.18| 1| 5| | 467| 3.68| 2| 5| | 691| 4.22| 1| 5| | 675| 3.71| 1| 5| | 829| 3.55| 1| 5| | 125| 3.44| 1| 5| | 451| 2.73| 1| 5| | 800| 3.75| 2| 5| | 853| 2.98| 1| 5| | 666| 3.67| 2| 5| | 870| 3.45| 1| 5| | 919| 3.47| 1| 5| | 926| 3.3| 1| 5| | 7| 3.97| 1| 5| | 124| 3.5| 1| 5| | 51| 3.57| 1| 5| | 447| 3.6| 1| 5| | 591| 3.65| 2| 5| | 307| 3.79| 1| 5| | 475| 3.6| 1| 5| +-------+--------+--------+--------+ only showing top 20 rows+--------+---+--------+ |movie_id|cnt|avg_rank| +--------+---+--------+ | 408|112| 4.49| | 318|298| 4.47| | 169|118| 4.47| | 483|243| 4.46| | 64|283| 4.45| | 12|267| 4.39| | 603|209| 4.39| | 50|583| 4.36| | 178|125| 4.34| | 357|264| 4.29| +--------+---+--------+
6. SparkSQL Shuffle 分区数目
spark.sql.shuffle.partitions
参数指的是,在sql计算中,shuffle算子阶段默认的分区数是200个。 对于集群模式来说, 200个默认也算比较合适。如果在local下运行,200个很多,在调度上会带来额外的损耗。 所以在local下建议修改比较低,比如2\4\10均可。这个参数和Spark RDD中设置并行度的参数是相互独立的。
7. SparkSQL 数据清洗API
在大数据整个生产过程中,需要先对数据进行数据清洗,将杂乱无章的数据整理为符合后面处理要求的规整数据。以people.csv
数据为例,进行如下操作
7.1 数据去重:dropDuplicates
功能:对DF的数据进行去重,如果重复数据有多条,取第—条
#!usr/bin/env python
# -*- coding:utf-8 -*-from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
import timeif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \config("spark.sql.shuffle.partitions", 2). \getOrCreate()sc = spark.sparkContext# 1.读取数据df = spark.read.format('csv'). \option('sep', ';'). \option('header', True). \load('../data/input/sql/people.csv')# TODO 1:数据清洗——数据去重:drop_duplicatesdf.drop_duplicates().show()# API同样可以针对字段进行去重df.drop_duplicates(['age', 'job']).show()
+-----+----+---------+
| name| age| job|
+-----+----+---------+
| Bob| 32|Developer|
| Lily| 11| Manager|
|Alice| 9| null|
|Jorge| 30|Developer|
| Ani| 11|Developer|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice|null| Manager|
+-----+----+---------++-----+----+---------+
| name| age| job|
+-----+----+---------+
|Alice|null| Manager|
| Ani| 11|Developer|
| Lily| 11| Manager|
|Jorge| 30|Developer|
| Bob| 32|Developer|
|Alice| 9| null|
|Alice| 9| Manager|
+-----+----+---------+
7.2 缺失值处理——dropna
功能:如果数据中包含null,通过dropna来进行判断,符合条件就删除这一行数据。
#!usr/bin/env python
# -*- coding:utf-8 -*-from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
import timeif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \config("spark.sql.shuffle.partitions", 2). \getOrCreate()sc = spark.sparkContext# 1.读取数据df = spark.read.format('csv'). \option('sep', ';'). \option('header', True). \load('../data/input/sql/people.csv')# TODO 1:数据去重:drop_duplicatesdf.drop_duplicates().show()# API同样可以针对字段进行去重df.drop_duplicates(['age', 'job']).show()# TODO 2:缺失值处理# 无参数使用,只要有null,就删除一整行数据df.dropna().show()# thresh=3表示最少满足3个有效列,不满足就删除当前数据df.dropna(thresh=3).show()# subset用于设置要判断哪些列df.dropna(thresh=2, subset=['name', 'age']).show()
+-----+---+---------+
| name|age| job|
+-----+---+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
| Ani| 11|Developer|
| Lily| 11| Manager|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
+-----+---+---------++-----+---+---------+
| name|age| job|
+-----+---+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
| Ani| 11|Developer|
| Lily| 11| Manager|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
+-----+---+---------++-----+---+---------+
| name|age| job|
+-----+---+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
| Ani| 11|Developer|
| Lily| 11| Manager|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| null|
+-----+---+---------+
7.3 缺失值填充——fillna
功能:根据参数的规则,来进行null的替换
# coding:utf8
import timefrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext"""读取数据"""df = spark.read.format("csv").\option("sep", ";").\option("header", True).\load("../data/input/sql/people.csv")# TODO 3:缺失值填充:fillna# 缺失值处理也可以完成对缺失值进行填充# DataFrame的 fillna 对缺失的列进行填充df.fillna("loss").show()# 指定列进行填充df.fillna("N/A", subset=['job']).show()# 设定一个字典, 对所有的列 提供填充规则df.fillna({"name": "未知姓名", "age": 1, "job": "worker"}).show()
+-----+----+---------+
| name| age| job|
+-----+----+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
| Ani| 11|Developer|
| Lily| 11| Manager|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice|loss| Manager|
|Alice| 9| loss|
+-----+----+---------++-----+----+---------+
| name| age| job|
+-----+----+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
| Ani| 11|Developer|
| Lily| 11| Manager|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice|null| Manager|
|Alice| 9| N/A|
+-----+----+---------++-----+---+---------+
| name|age| job|
+-----+---+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
| Ani| 11|Developer|
| Lily| 11| Manager|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 1| Manager|
|Alice| 9| worker|
+-----+---+---------+
8. DataFrame数据写出
SparkSQL统一API写出DataFrame数据。
统一API语法:
#!usr/bin/env python
# -*- coding:utf-8 -*-from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \config("spark.sql.shuffle.partitions", 2). \getOrCreate()sc = spark.sparkContext# 1.读取电影数据集schema = StructType().add('user_id', StringType(), nullable=True). \add('movie_id', IntegerType(), nullable=True). \add('rank', IntegerType(), nullable=True). \add('ts', StringType(), nullable=True)df = spark.read.format('csv'). \option('sep', '\t'). \option('header', False). \option('encoding', 'utf-8'). \schema(schema=schema). \load('../data/input/sql/u.data')# write text,只能写出一个列的数据,需要将df转换成单列df# concat_ws函数表示将指定列拼接成一列df.select(F.concat_ws('---', 'user_id', 'movie_id', 'rank', 'ts')). \write. \mode('overwrite'). \format('text'). \save('../data/output/sql/text')# write csvdf.write.mode('overwrite'). \format('csv'). \option('sep', ';'). \option('header', True). \save('../data/output/sql/csv')# write jsondf.write.mode('overwrite'). \format('json'). \save('../data/output/sql/json')# write parquetdf.write.mode('overwrite'). \format('parquet'). \save('../data/output/sql/parquet')
将虚拟机的代码同步到本机,就可得到如图:
9. DataFrame 通过JDBC读写数据库
写出:
JDBC写出,会自动创建表的。因为DataFrame中有表结构信息, StructType记录的各个字段的名称类型和是否运行为空。
读取:
10. 总结
- DataFrame 在结构层面上由StructField组成列描述,由StructType构造表描述。在数据层面上,Column对象记录列数据,Row对象记录行数据
- DataFrame可以从RDD转换、Pandas DF转换、读取文件、读取JDBC等方法构建
spark.read.format()
和df.write.format()
是DataFrame读取和写出的统一化标准API- SparkSQL默认在Shuffle阶段200个分区,可以修改参数获得最好性能
- dropDuplicates可以去重、dropna可以删除缺失值、fillna可以填充缺失值
- SparkSQL支持JDBC读写,可用标准API对数据库进行读写操作
PySpark | SparkSQL入门 | DataFrame入门相关推荐
- Pandas DataFrame入门教程(图解版)
Pandas DataFrame入门教程(图解版) DataFrame 是 Pandas 的重要数据结构之一,也是在使用 Pandas 进行数据分析过程中最常用的结构之一,可以这么说,掌握了 Data ...
- Android自动化测试-从入门到入门(5)AdapterView的测试
在之前的文章中,我们简单介绍了Espresso的使用.通过onView()方法我们可以快速定位到界面上我们需要测试的目标元素.整体来说,onView()比较适用于UI比较简单的情况,在不需要过于复杂的 ...
- Android视频录制从不入门到入门系列教程(一)————简介
一.WHY Android SDK提供了MediaRecorder帮助开发者进行视频的录制,不过这个类很鸡肋,实际项目中应该很少用到它,最大的原因我觉得莫过于其输出的视频分辨率太有限了,满足不了项目的 ...
- 【新手上路】语法入门算法入门题单
作者:王清楚 链接:[新手上路]语法入门&算法入门题单_ACM竞赛_ACM/CSP/ICPC/CCPC/比赛经验/题解/资讯_牛客竞赛OJ_牛客网 来源:牛客网 介绍:本题单分为语法入门和算法 ...
- 数论基础,从入门到入门
数论基础,从入门到入门 文章目录 数论基础,从入门到入门 一.常用算法 1.素数筛法 2.线性筛 3.快速幂 4.矩阵快速幂 5.辗转相除法/欧几里得算法 6.扩展欧几里得 7.中国剩余定理/CRT ...
- ActiveMQ入门-amq入门
ActiveMQ是什么 ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线. ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS ...
- BIOS从快速入门到入门1------浅谈BIOS
眨眼间,从一个刚毕业的萌新小白,到现在的BIOS工程师,做了快2年半的x86服务器BIOS,勉强可以算是入门了.踩了很多坑,加了N多班,其中的辛酸和快乐,可能只有自己知道. 先不谈BIOS,我就个人经 ...
- GLOG从入门到入门
GLOG从入门到入门 1 C++日志需求 任何可用的工程代码,都无法保证不出bug.因此,日志系统对于工程代码非常重要.对于像耳东小白这样的初级程序员(感谢某高级程序员大佬赐予"初级程序员& ...
- pyspark操作 rdd dataframe,pyspark.sql.functions详解 行列变换
官网文档可以参考:https://spark.apache.org/docs/latest/api/python/index.html dataframe读写 生成以逗号分隔的数据 stringCSV ...
最新文章
- 预测 “疯狂三月” 冠军的办法,我只告诉你!
- vs自定义安装包的制作
- Please install [clang](http://clang.llvm.org/) or check configuration `clang.executable`
- 吴恩达机器学习之多变量线性回归实现部分
- mysql 参照完整性规则_MySQL存储引擎你们知道多少?
- 知识点026-rsync命令的使用
- 牛客 小a与星际探索 bfs
- DbVisualizer配置神通数据库驱动
- python中numpy.minimum函数
- html设置请求头host,Http请求头Host字段作用
- 计算机辅助设计和辅助制造简称,计算机辅助设计与制造
- 序号 html ol自动缩进,ol自定义序号样式的方法
- 联想Y40加装固态硬盘
- 前后端分离 Spring Boot + Vue 开发网易云、QQ音乐(附源码)!
- 通过bitset库实现sha256
- 浙江大学计算机学院 金小刚,金小刚(浙江大学CADCG国家重点实验室教授)_百度百科...
- Jmeter dubbo插件测试dubbo接口
- 宇宙探險.....如鹏不是游戏。
- Kubernetes的学习笔记总结之k8s集群安装部署
- 跟我一起写 Makefile-陈浩
热门文章
- Java版数据结构之单向链表 新增,有序新增的两种方式,修改和删除(CRUD)
- 做个程序员到底好不好
- 一支口红用了5年_一支口红多久该扔掉?保质期过了还能用吗?
- 04 vue3 scss 组件component kepp-alive缓存子组件 nextTick
- 各种Hash函数和代码
- Linux面板体验与推荐 AMH、AppNode、WDAP、宝塔
- 1.2 Unity3D 的注册
- 免费领7天腾讯视频VIP/优酷会员!
- fanyibishe
- “感动中国”2012年度人物颁奖词