文章目录

  • 一、快速入门
    • 1. 什么是SparkSQL
    • 2. 为什么要学习SparkSQL
    • 3. SparkSQL特点
  • 二、SparkSQL概述
    • 1. SparkSQL和Hive的异同
    • 2. SparkSQL的数据抽象
    • 3. DataFrame数据抽象
    • 4. SparkSession对象
    • 6. SparkSQL HelloWorld
  • 三、DataFrame入门和操作
    • 1. DataFrame的组成
    • 2. DataFrame的代码构建
      • 2.1 基于RDD的方式1
      • 2.2 基于RDD的方式2
      • 2.3 基于RDD的方式3
      • 2.4 基于pandas的DataFrame
      • 2.5 读取外部数据
    • 3. DataFrame的入门操作
      • 3.1 DSL风格下的API总结
    • 4. 词频统计案例
    • 5. 电影评分数据分析
    • 6. SparkSQL Shuffle 分区数目
    • 7. SparkSQL 数据清洗API
      • 7.1 数据去重:dropDuplicates
      • 7.2 缺失值处理——dropna
      • 7.3 缺失值填充——fillna
    • 8. DataFrame数据写出
    • 9. DataFrame 通过JDBC读写数据库
    • 10. 总结

传送门:

  • 视频地址:黑马程序员Spark全套视频教程
  • 1.PySpark基础入门(一)
  • 2.PySpark基础入门(二)
  • 3.PySpark核心编程(一)
  • 4.PySpark核心编程(二)
  • 5.PySaprk——SparkSQL学习(一)
  • 6.PySaprk——SparkSQL学习(二)
  • 7.Spark综合案例——零售业务统计分析
  • 8. Spark3新特性及核心概念(背)

一、快速入门

1. 什么是SparkSQL

  SparkSQL是Spark的一个模块,用于处理海量结构化数据。

限定:结构化数据处理。
RDD算子可以处理结构化数据,非结构化数据,半结构化数据

2. 为什么要学习SparkSQL

  SparkSQL是非常成熟的海量结构化数据处理框架。学习SparkSQL主要在2个点:

  • SparkSQL本身十分优秀, 支持SQL语言\性能强\可以自动优化\API简单\兼容HIVE等
  • 企业大面积在使用SparkSQL处理业务数据
    • 离线开发
    • 数仓搭建
    • 科学计算
    • 数据分析

3. SparkSQL特点

  • 融合性
    SQL可以无缝集成在代码中,随时用SQL处理数据
  • 统一数据访问
    一套标准API可读写不同数据源
  • Hive兼容
    可以使用SparkSQL直接计算并生成Hive数据表
  • 标准化连接
    支持标准化JDBC\ODBC连接,方便和各种数据库进行数据交互。

二、SparkSQL概述

1. SparkSQL和Hive的异同

相同点:

  • Hive和Spark 均是:“分布式SQL计算引擎”,均是构建大规模结构化数据计算的绝佳利器。

不同点:

2. SparkSQL的数据抽象

  SparkSQL的DataFrame底层借鉴了Pandas的DataFrame,是二维表数据结构、分布式集合(分区)。SparkSQL现在使用的有2类数据抽象对象:

  • DataSet对象,可用于Java、Scala语言
    DataSet支持泛型特性(Python语言没有泛型特性),可以让Java、Scala语言更好的利用到
  • DataFrame对象,可用于Java、Scala、Python、R语言

我们以Python开发SparkSQL,主要使用的就是DataFrame对象作为核心数据结构。

3. DataFrame数据抽象

  DataFrame和RDD都是:弹性的、分布式的数据集。只是,DataFrame存储的数据结构“限定”为:二维表结构化数据而RDD可以存储的数据则没有任何限制,想处理什么就处理什么。

4. SparkSession对象

  在RDD阶段,程序的执行入口对象是: SparkContext对象。在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象
SparkSession对象可以:

  • 用于SparkSQL编程作为入口对象
  • 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext

所以,我们后续的代码,执行环境入口对象,统一变更为SparkSession对象。

6. SparkSQL HelloWorld

#!usr/bin/env python
# -*- coding:utf-8 -*-"""SparkSession入口对象作为SQL的编程入口SparkContext入口对象作为RDD的编程入口
"""# SparkSession对象的导包,SparkSQL 中的入口对象是SparkSession对象
from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建SparkSession入口对象# appName 设置程序名称, config设置一些常用属性spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()# 1.通过SparkSession对象获取SparkContext对象sc = spark.sparkContext# SparkSQL的HelloWord# 读取数据df = spark.read.csv('../data/input/stu_score.txt', sep=',', header=False)# 设置列名df2 = df.toDF('id', 'name', 'score')df2.printSchema()  # 打印表结构df2.show()  # 打印表内容df2.createTempView('score')# SQL风格spark.sql("""SELECT * FROM score WHERE name='语文' LIMIT 5""").show()# DSL风格df2.where("name='语文'").limit(5).show()
root|-- id: string (nullable = true)|-- name: string (nullable = true)|-- score: string (nullable = true)+---+----+-----+
| id|name|score|
+---+----+-----+
|  1|语文|   99|
|  2|语文|   99|
|  3|语文|   99|
|  4|语文|   99|
|  5|语文|   99|
|  6|语文|   99|
|  7|语文|   99|
|  8|语文|   99|
|  9|语文|   99|
| 10|语文|   99|
| 11|语文|   99|
| 12|语文|   99|
| 13|语文|   99|
| 14|语文|   99|
| 15|语文|   99|
| 16|语文|   99|
| 17|语文|   99|
| 18|语文|   99|
| 19|语文|   99|
| 20|语文|   99|
+---+----+-----+
only showing top 20 rows+---+----+-----+
| id|name|score|
+---+----+-----+
|  1|语文|   99|
|  2|语文|   99|
|  3|语文|   99|
|  4|语文|   99|
|  5|语文|   99|
+---+----+-----++---+----+-----+
| id|name|score|
+---+----+-----+
|  1|语文|   99|
|  2|语文|   99|
|  3|语文|   99|
|  4|语文|   99|
|  5|语文|   99|
+---+----+-----+

三、DataFrame入门和操作

1. DataFrame的组成

  DataFrame是一个二维表结构, 那么表格结构就有无法绕开的三个点:

  • 表结构描述

基于这个前提,DataFrame的组成如下:
在结构层面:

  • StructType对象描述整个DataFrame的表结构
  • StructField对象描述一个列的信息

在数据层面

  • Row对象记录一行数据
  • Column对象记录一列数据并包含列的信息


  如图,在表结构层面,DataFrame的表结构由StructType对象来描述,如下图:

一个StructField记录:列名、列类型、列是否为空。多个StructField组成一个StructType对象。一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空。

2. DataFrame的代码构建

2.1 基于RDD的方式1

  DataFrame对象可以从RDD转换而来,都是分布式数据集,就是转换一下内部存储的结构,转换为二维表结构。通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame。这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)。

#!usr/bin/env python
# -*- coding:utf-8 -*-
# TODO:基于RDD的方式构建DataFrame对象
from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 1.基于RDD转换成DataFramerdd = sc.textFile('../data/input/sql/people.txt'). \map(lambda x: x.split(',')). \map(lambda x: (x[0], int(x[1])))# 2. 构建DataFrame对象# 参数1:被转换的RDD;参数2指定列名,通过list的形式指定,按照顺序依次提供字符串名称df = spark.createDataFrame(rdd, schema=['name', 'age'])# 打印DataFrame表结构df.printSchema()# 打印DataFrame数据# 参数1表示展示多少条数据,默认不传的话是20# 参数2表示是否对列进行截断,如果列的数据长度超过20个字符串程度,后续的内容不显示,以...代替# 如果给False,表示全部显示(不截断),默认是Truedf.show()# 将DF对象转换为临时视图表,可供sql语句查询df.createOrReplaceTempView('people')spark.sql("SELECT * FROM people WHERE age < 30").show()
root|-- name: string (nullable = true)|-- age: long (nullable = true)+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---++-------+---+
|   name|age|
+-------+---+
|Michael| 29|
| Justin| 19|
+-------+---+

2.2 基于RDD的方式2

  通过StructType对象来定义DataFrame的“表结构”转换RDD。

#!usr/bin/env python
# -*- coding:utf-8 -*-
"""基于RDD的方式构建DataFrame对象,通过StructType对象来定义DataFrame的“表结构”转换RDD
"""from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerTypeif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 1.基于RDD转换成DataFramerdd = sc.textFile('../data/input/sql/people.txt'). \map(lambda x: x.split(',')). \map(lambda x: (x[0], int(x[1])))# 2. 构建表结构描述对象——StructType对象schema = StructType().\add("name", StringType(), nullable=True). \add("age", IntegerType(), nullable=False)# 3.基于StructType对象去构建RDD到DF的转换df = spark.createDataFrame(rdd, schema=schema)df.printSchema()df.show()
root|-- name: string (nullable = true)|-- age: integer (nullable = false)+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

2.3 基于RDD的方式3

  使用RDD的toDF方法转换RDD。

#!usr/bin/env python
# -*- coding:utf-8 -*-
"""基于RDD的方式构建DataFrame对象,使用RDD的toDF方法转换RDD
"""from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerTypeif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 1.基于RDD转换成DataFramerdd = sc.textFile('../data/input/sql/people.txt'). \map(lambda x: x.split(',')). \map(lambda x: (x[0], int(x[1])))# TODO:toDF的方式构建DataFramedf1 = rdd.toDF(['name','age'])df1.printSchema()df1.show()# TODO:通过表结构描述对象——StructType对象的方式构建DataFrameschema = StructType().\add("name", StringType(), nullable=True). \add("age", IntegerType(), nullable=False)df2 = rdd.toDF(schema=schema)df2.printSchema()df2.show()
root|-- name: string (nullable = true)|-- age: long (nullable = true)+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+root|-- name: string (nullable = true)|-- age: integer (nullable = false)+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

2.4 基于pandas的DataFrame

  将Pandas的DataFrame对象,转变为分布式的SparkSQL的DataFrame对象。

#!usr/bin/env python
# -*- coding:utf-8 -*-
"""基于pandas的DataFrame构建SparkSQL的DataFrame
"""from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pdif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# TODO:基于pandas的DataFrame构建SparkSQL的DataFramepdf = pd.DataFrame({'id': [1, 2, 3],'name': ['张大仙', '王晓晓', '吕不韦'],'age': [11, 12, 13]})df = spark.createDataFrame(pdf)df.printSchema()df.show()
root|-- id: long (nullable = true)|-- name: string (nullable = true)|-- age: long (nullable = true)+---+------+---+
| id|  name|age|
+---+------+---+
|  1|张大仙| 11|
|  2|王晓晓| 12|
|  3|吕不韦| 13|
+---+------+---+

2.5 读取外部数据

  通过SparkSQL的统一API进行数据读取构建DataFrame。
统一API示例代码

sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......").option("K", "V") # option可选.schema(StructType | String) # STRING的语法如.schema("name STRING", "age INT").load("被读取文件的路径, 支持本地文件系统和HDFS")
  • 读取text数据源
    读取text数据源,使用format(“text”)读取文本数据。读取数据的特点是:将一整行只作为一个列读取,列名默认称之为value,类型是String

    #!usr/bin/env python
    # -*- coding:utf-8 -*-
    """DataFrame的代码构建 - 读取外部数据通过SparkSQL的统一API进行数据读取构建DataFrame
    """from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, IntegerType
    import pandas as pdif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 构建StructType对象schema = StructType().add('data', StringType(), nullable=True)# read读取外部数据df = spark.read.format('text'). \schema(schema=schema). \load('../data/input/sql/people.txt')df.printSchema()df.show()
    
    root|-- data: string (nullable = true)+-----------+
    |       data|
    +-----------+
    |Michael, 29|
    |   Andy, 30|
    | Justin, 19|
    +-----------+
    
  • 读取json数据源
    使用format(“json”)读取json数据

    #!usr/bin/env python
    # -*- coding:utf-8 -*-
    """DataFrame的代码构建 - 读取外部数据通过SparkSQL的统一API进行数据读取构建DataFrame
    """from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, IntegerType
    import pandas as pdif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# JSON数据自带schema信息df = spark.read.format('json').load('../data/input/sql/people.json')df.printSchema()df.show()
    
    root|-- age: long (nullable = true)|-- name: string (nullable = true)+----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    
  • 读取csv数据源
    使用format(“csv”)读取csv数据

    #!usr/bin/env python
    # -*- coding:utf-8 -*-
    """DataFrame的代码构建 - 读取外部数据通过SparkSQL的统一API进行数据读取构建DataFrame
    """from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, IntegerType
    import pandas as pdif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 读取CSV文件df = spark.read.format('csv'). \option('sep', ';'). \               option('header', True). \option('encoding', 'utf-8'). \schema('name STRING, age INT, job STRING'). \load('../data/input/sql/people.csv')df.printSchema()df.show()
    
    root|-- name: string (nullable = true)|-- age: integer (nullable = true)|-- job: string (nullable = true)+-----+----+---------+
    | name| age|      job|
    +-----+----+---------+
    |Jorge|  30|Developer|
    |  Bob|  32|Developer|
    |  Ani|  11|Developer|
    | Lily|  11|  Manager|
    |  Put|  11|Developer|
    |Alice|   9|  Manager|
    |Alice|   9|  Manager|
    |Alice|   9|  Manager|
    |Alice|   9|  Manager|
    |Alice|null|  Manager|
    |Alice|   9|     null|
    +-----+----+---------+
    
  • 读取parquet数据源
    使用format(“parquet”)读取parquet数据
    parquet: 是Spark中常用的一种列式存储文件格式。parquet对比普通的文本文件的区别:

    • parquet 内置schema (列名\ 列类型\ 是否为空)
    • 存储是以列作为存储格式
    • 存储是序列化存储在文件中的(有压缩属性体积小)
    #!usr/bin/env python
    # -*- coding:utf-8 -*-
    """DataFrame的代码构建 - 读取外部数据通过SparkSQL的统一API进行数据读取构建DataFrame
    """from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, IntegerType
    import pandas as pdif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 读取parquet文件df = spark.read.format('parquet').\load('../data/input/sql/users.parquet')df.printSchema()df.show()
    
    root|-- name: string (nullable = true)|-- favorite_color: string (nullable = true)|-- favorite_numbers: array (nullable = true)|    |-- element: integer (containsNull = true)+------+--------------+----------------+
    |  name|favorite_color|favorite_numbers|
    +------+--------------+----------------+
    |Alyssa|          null|  [3, 9, 15, 20]|
    |   Ben|           red|              []|
    +------+--------------+----------------+
    

    Parquet文件不能直接打开查看,如果想要查看内容,可以在PyCharm中安装如下插件来查看:

3. DataFrame的入门操作

  DataFrame支持两种风格进行编程,分别是:

  • DSL风格
    DSL语法风格,可称之为:领域特定语言。其实就是指DataFrame的特有API。DSL风格意思就是以调用API的方式来处理Data。比如:df.where().limit()

    #!usr/bin/env python
    # -*- coding:utf-8 -*-
    """DSL入门
    """from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, IntegerType
    import pandas as pdif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 1.读取txt数据df = spark.read.format('csv'). \schema('id INT,subject STRING,score INT'). \load('../data/input/sql/stu_score.txt')# TODO:select API演示# 获取column对象id_column = df['id']subject_column = df['subject']df.select(['id', 'subject']).show()df.select('id', 'subject').show()df.select([id_column, subject_column]).show()df.select(id_column, subject_column).show()# TODO:filter API演示df.filter('score < 99').show()df.filter(df['score'] < 99).show()# TODO:groupBy API# groupBy API的返回值是GroupedData对象,是一个有分组关系的数据结构,调用聚合方法后返回值是DataFramedf.groupBy('subject').count().show()df.groupBy(df['subject']).count().show()
    
  • SQL风格
    SQL风格就是使用SQL语句处理DataFrame的数据,比如:spark.sql(“SELECT * FROM xxx)。DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:

    • 全局表:跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询前带上前缀:global_temp
    • 临时表:只在当前SparkSession中可用
    #!usr/bin/env python
    # -*- coding:utf-8 -*-
    """SQL入门
    """from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, IntegerType
    import pandas as pdif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 1.读取txt数据df = spark.read.format('csv'). \schema('id INT,subject STRING,score INT'). \load('../data/input/sql/stu_score.txt')# 2.注册成临时表df.createTempView('score')  # 注册临时视图(表)df.createOrReplaceTempView('score_2')  # 注册或替换临时视图(表)df.createGlobalTempView('score_3')  # 注册全局临时视图,全局临时视图在使用的时候,需要在浅看带上global——temp前缀# 3.通过SparkSession对象的spark API完成aql语句的执行spark.sql("SELECT subject,COUNT(*) AS cnt FROM score GROUP BY subject").show()spark.sql("SELECT subject,COUNT(*) AS cnt FROM score_2 GROUP BY subject").show()spark.sql("SELECT subject,COUNT(*) AS cnt FROM global_temp.score_3 GROUP BY subject").show()
    
    +-------+---+
    |subject|cnt|
    +-------+---+
    |   英语| 30|
    |   语文| 30|
    |   数学| 30|
    +-------+---++-------+---+
    |subject|cnt|
    +-------+---+
    |   英语| 30|
    |   语文| 30|
    |   数学| 30|
    +-------+---++-------+---+
    |subject|cnt|
    +-------+---+
    |   英语| 30|
    |   语文| 30|
    |   数学| 30|
    +-------+---+
    

3.1 DSL风格下的API总结

  • show方法:DataFrame的API,展示DataFrame中的数据, 默认展示20条
  • printSchema方法:DataFrame的API,打印输出df的schema信息
  • select方法:DataFrame的API,选择DataFrame中的指定列(通过传入参数进行指定)
  • filter和where方法:DataFrame的API,过滤DataFrame内的数据,返回一个过滤后的DataFrame
  • groupby分组:DataFrame的API,按照指定的列进行数据的分组, 返回值是GroupedData对象,后续可以使用min、max、avg、sum、count方法进行分组聚合
  • agg: 它是GroupedData对象的API, 作用是 在里面可以写多个聚合
  • alias: 它是Column对象的API, 可以针对一个列 进行改名
  • withColumnRenamed: 它是DataFrame的API,可以对DF中的列进行改名, 一次改一个列,改多个列 可以链式调用
  • orderBy: DataFrame的API,进行排序,参数1是被排序的列,参数2是 升序(True) 或 降序 False
  • first: DataFrame的API, 取出DF的第一行数据, 返回值结果是Row对象
    Row对象 就是一个数组, 你可以通过row[‘列名’] 来取出当前行中, 某一列的具体数值. 返回值不再是DF 或者GroupedData 或者Column而是具体的值(字符串, 数字等)

4. 词频统计案例

  PySpark提供了一个包: pyspark.sql.functions。这个包里面提供了 一系列的计算函数供SparkSQL使用。这些功能函数,返回值多数都是Column对象。

#!usr/bin/env python
# -*- coding:utf-8 -*-
"""WordCount
"""from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# TODO 1:SQL风格进行处理rdd = sc.textFile('../data/input/words.txt'). \flatMap(lambda x: x.split(' ')). \map(lambda x: [x])# 使用RDD的toDF方法,将RDD转换为DataFramedf = rdd.toDF(['word'])# 注册DF为临时视图df.createTempView('words')spark.sql("SELECT word, COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC").show()# TODO 2: DSL风格# 读取text数据源,将一整行只作为一列读取df = spark.read.format('text'). \load('../data/input/words.txt')# withColumn方法# 方法功能:对已存在的列进行操作,返回一个新的列,如果名字与老列相同,那么替换;否则,作为新列存在df2 = df.withColumn('value', F.explode(F.split(df['value'], ' ')))df2.groupBy('value'). \count(). \withColumnRenamed('value', 'word'). \withColumnRenamed('count', 'cnt'). \orderBy('cnt', ascending=False). \show()
+------+---+
|  word|cnt|
+------+---+
| hello|  3|
| spark|  1|
| flink|  1|
|hadoop|  1|
+------+---++------+---+
|  word|cnt|
+------+---+
| hello|  3|
| spark|  1|
|hadoop|  1|
| flink|  1|
+------+---+

5. 电影评分数据分析

  MovieLens数据集包含多个用户对多部电影的评级数据,也包括电影元数据信息和用户属性信息。

  • 下载地址:
    http://files.grouplens.org/datasets/movielens/

  • 介绍
    下面以ml-100k数据集为例进行介绍,下载u.data文件。由943个用户对1682个电影的10000条评分组成。每个用户至少评分20部电影。用户和电影从1号开始连续编号。数据是随机排序的。

  • 需求:

    1. 查询用户平均分
    2. 查询电影平均分
    3. 查询大于平均分的电影的数量
    4. 查询高分电影中(>3打分次数最多的用户,并求出此人打的平均分
    5. 查询每个用户的平均打分,最低打分,最高打分
    6. 查询被评分超过100次的电影的平均分排名TOP10
  • 代码:

    #!usr/bin/env python
    # -*- coding:utf-8 -*-
    """电影评分数据分析1. 查询用户平均分2. 查询电影平均分3. 查询大于平均分的电影的数量4. 查询高分电影中(>3打分次数最多的用户,并求出此人打的平均分5. 查询每个用户的平均打分,最低打分,最高打分6. 查询被评分超过100次的电影的平均分排名TOP10
    """from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, IntegerType
    import pandas as pd
    from pyspark.sql import functions as Fif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \getOrCreate()sc = spark.sparkContext# 1.读取数据集schema = StructType().add('user_id', StringType(), nullable=True). \add('movie_id', IntegerType(), nullable=True). \add('rank', IntegerType(), nullable=True). \add('ts', StringType(), nullable=True)df = spark.read.format('csv'). \option('sep', '\t'). \option('header', False). \option('encoding', 'utf-8'). \schema(schema=schema). \load('../data/input/sql/u.data')# TODO 1:用户平均分计算# DSL风格df.groupBy('user_id'). \avg('rank'). \withColumnRenamed('avg(rank)', 'avg_rank'). \withColumn('avg_rank', F.round('avg_rank', 2)). \orderBy('avg_rank', ascending=False). \show()# # SQL风格# df.createTempView('user')# spark.sql("""#         SELECT user_id, ROUND(AVG(rank),2) AS avg_rank FROM user GROUP BY user_id ORDER BY avg_rank DESC#     """).show()# TODO 2:电影平均分计算# DSL风格df.groupBy('movie_id'). \avg('rank'). \withColumnRenamed('avg(rank)', 'avg_rank'). \withColumn('avg_rank', F.round('avg_rank', 2)). \orderBy('avg_rank', ascending=False). \show()# # SQL风格# df.createTempView('movie')# spark.sql("""#     SELECT movie_id, ROUND(AVG(rank),2) AS avg_rank FROM movie GROUP BY movie_id ORDER BY avg_rank DESC# """).show()# TODO 3:查询大于平均分的电影的数量# DSL风格print("大于平均分电影的数量:", df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count())# TODO 4:查询高分电影中(>3),打分次数最多的用户,并求出此人打的平均分user_id = df.where('rank > 3'). \groupBy('user_id'). \count(). \withColumnRenamed('count', 'cnt'). \orderBy('cnt', ascending=False). \limit(1). \first()['user_id']# 计算这个人的打分平均分df.filter(df['user_id'] == user_id). \select(F.round(F.avg('rank'), 2)).show()# TODO 5:查询每个用户的平均打分,最低打分,最高打分df.groupBy('user_id'). \agg(F.round(F.avg('rank'), 2).alias('avg_rank'),F.min('rank').alias('min_rank'),F.max('rank').alias('max_rank'),).show()# TODO 6: 查询被评分超过100次的电影,同时平均分排名TOP10df.groupBy('movie_id'). \agg(F.count('movie_id').alias('cnt'),F.round(F.avg('rank'), 2).alias('avg_rank')).where('cnt > 100'). \orderBy('avg_rank', ascending=False). \limit(10). \show()
    
    +-------+--------+
    |user_id|avg_rank|
    +-------+--------+
    |    849|    4.87|
    |    688|    4.83|
    |    507|    4.72|
    |    628|     4.7|
    |    928|    4.69|
    |    118|    4.66|
    |    907|    4.57|
    |    686|    4.56|
    |    427|    4.55|
    |    565|    4.54|
    |    469|    4.53|
    |    850|    4.53|
    |    225|    4.52|
    |    330|     4.5|
    |    477|    4.46|
    |    636|    4.45|
    |    242|    4.45|
    |    583|    4.44|
    |    252|    4.43|
    |    767|    4.43|
    +-------+--------+
    only showing top 20 rows+--------+--------+
    |movie_id|avg_rank|
    +--------+--------+
    |    1653|     5.0|
    |    1122|     5.0|
    |    1467|     5.0|
    |    1201|     5.0|
    |    1189|     5.0|
    |    1293|     5.0|
    |    1599|     5.0|
    |    1536|     5.0|
    |     814|     5.0|
    |    1500|     5.0|
    |    1449|    4.63|
    |    1398|     4.5|
    |    1594|     4.5|
    |     119|     4.5|
    |    1642|     4.5|
    |     408|    4.49|
    |     169|    4.47|
    |     318|    4.47|
    |     483|    4.46|
    |      64|    4.45|
    +--------+--------+
    only showing top 20 rows大于平均分电影的数量: 55375
    +-------------------+
    |round(avg(rank), 2)|
    +-------------------+
    |               3.86|
    +-------------------++-------+--------+--------+--------+
    |user_id|avg_rank|min_rank|max_rank|
    +-------+--------+--------+--------+
    |    296|    4.18|       1|       5|
    |    467|    3.68|       2|       5|
    |    691|    4.22|       1|       5|
    |    675|    3.71|       1|       5|
    |    829|    3.55|       1|       5|
    |    125|    3.44|       1|       5|
    |    451|    2.73|       1|       5|
    |    800|    3.75|       2|       5|
    |    853|    2.98|       1|       5|
    |    666|    3.67|       2|       5|
    |    870|    3.45|       1|       5|
    |    919|    3.47|       1|       5|
    |    926|     3.3|       1|       5|
    |      7|    3.97|       1|       5|
    |    124|     3.5|       1|       5|
    |     51|    3.57|       1|       5|
    |    447|     3.6|       1|       5|
    |    591|    3.65|       2|       5|
    |    307|    3.79|       1|       5|
    |    475|     3.6|       1|       5|
    +-------+--------+--------+--------+
    only showing top 20 rows+--------+---+--------+
    |movie_id|cnt|avg_rank|
    +--------+---+--------+
    |     408|112|    4.49|
    |     318|298|    4.47|
    |     169|118|    4.47|
    |     483|243|    4.46|
    |      64|283|    4.45|
    |      12|267|    4.39|
    |     603|209|    4.39|
    |      50|583|    4.36|
    |     178|125|    4.34|
    |     357|264|    4.29|
    +--------+---+--------+
    

6. SparkSQL Shuffle 分区数目

  spark.sql.shuffle.partitions 参数指的是,在sql计算中,shuffle算子阶段默认的分区数是200个。 对于集群模式来说, 200个默认也算比较合适。如果在local下运行,200个很多,在调度上会带来额外的损耗。 所以在local下建议修改比较低,比如2\4\10均可。这个参数和Spark RDD中设置并行度的参数是相互独立的。

7. SparkSQL 数据清洗API

  在大数据整个生产过程中,需要先对数据进行数据清洗,将杂乱无章的数据整理为符合后面处理要求的规整数据。以people.csv数据为例,进行如下操作

7.1 数据去重:dropDuplicates

功能:对DF的数据进行去重,如果重复数据有多条,取第—条

#!usr/bin/env python
# -*- coding:utf-8 -*-from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
import timeif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \config("spark.sql.shuffle.partitions", 2). \getOrCreate()sc = spark.sparkContext# 1.读取数据df = spark.read.format('csv'). \option('sep', ';'). \option('header', True). \load('../data/input/sql/people.csv')# TODO 1:数据清洗——数据去重:drop_duplicatesdf.drop_duplicates().show()# API同样可以针对字段进行去重df.drop_duplicates(['age', 'job']).show()
+-----+----+---------+
| name| age|      job|
+-----+----+---------+
|  Bob|  32|Developer|
| Lily|  11|  Manager|
|Alice|   9|     null|
|Jorge|  30|Developer|
|  Ani|  11|Developer|
|  Put|  11|Developer|
|Alice|   9|  Manager|
|Alice|null|  Manager|
+-----+----+---------++-----+----+---------+
| name| age|      job|
+-----+----+---------+
|Alice|null|  Manager|
|  Ani|  11|Developer|
| Lily|  11|  Manager|
|Jorge|  30|Developer|
|  Bob|  32|Developer|
|Alice|   9|     null|
|Alice|   9|  Manager|
+-----+----+---------+

7.2 缺失值处理——dropna

功能:如果数据中包含null,通过dropna来进行判断,符合条件就删除这一行数据。

#!usr/bin/env python
# -*- coding:utf-8 -*-from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
import timeif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \config("spark.sql.shuffle.partitions", 2). \getOrCreate()sc = spark.sparkContext# 1.读取数据df = spark.read.format('csv'). \option('sep', ';'). \option('header', True). \load('../data/input/sql/people.csv')# TODO 1:数据去重:drop_duplicatesdf.drop_duplicates().show()# API同样可以针对字段进行去重df.drop_duplicates(['age', 'job']).show()# TODO 2:缺失值处理# 无参数使用,只要有null,就删除一整行数据df.dropna().show()# thresh=3表示最少满足3个有效列,不满足就删除当前数据df.dropna(thresh=3).show()# subset用于设置要判断哪些列df.dropna(thresh=2, subset=['name', 'age']).show()
+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
|  Ani| 11|Developer|
| Lily| 11|  Manager|
|  Put| 11|Developer|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
+-----+---+---------++-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
|  Ani| 11|Developer|
| Lily| 11|  Manager|
|  Put| 11|Developer|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
+-----+---+---------++-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
|  Ani| 11|Developer|
| Lily| 11|  Manager|
|  Put| 11|Developer|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|     null|
+-----+---+---------+

7.3 缺失值填充——fillna

功能:根据参数的规则,来进行null的替换

# coding:utf8
import timefrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext"""读取数据"""df = spark.read.format("csv").\option("sep", ";").\option("header", True).\load("../data/input/sql/people.csv")#  TODO 3:缺失值填充:fillna# 缺失值处理也可以完成对缺失值进行填充# DataFrame的 fillna 对缺失的列进行填充df.fillna("loss").show()# 指定列进行填充df.fillna("N/A", subset=['job']).show()# 设定一个字典, 对所有的列 提供填充规则df.fillna({"name": "未知姓名", "age": 1, "job": "worker"}).show()
+-----+----+---------+
| name| age|      job|
+-----+----+---------+
|Jorge|  30|Developer|
|  Bob|  32|Developer|
|  Ani|  11|Developer|
| Lily|  11|  Manager|
|  Put|  11|Developer|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|loss|  Manager|
|Alice|   9|     loss|
+-----+----+---------++-----+----+---------+
| name| age|      job|
+-----+----+---------+
|Jorge|  30|Developer|
|  Bob|  32|Developer|
|  Ani|  11|Developer|
| Lily|  11|  Manager|
|  Put|  11|Developer|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|   9|  Manager|
|Alice|null|  Manager|
|Alice|   9|      N/A|
+-----+----+---------++-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
|  Ani| 11|Developer|
| Lily| 11|  Manager|
|  Put| 11|Developer|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  1|  Manager|
|Alice|  9|   worker|
+-----+---+---------+

8. DataFrame数据写出

  SparkSQL统一API写出DataFrame数据。
统一API语法

#!usr/bin/env python
# -*- coding:utf-8 -*-from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0.构建SparkSession执行环境入口对象spark = SparkSession.builder. \appName('test'). \master('local[*]'). \config("spark.sql.shuffle.partitions", 2). \getOrCreate()sc = spark.sparkContext# 1.读取电影数据集schema = StructType().add('user_id', StringType(), nullable=True). \add('movie_id', IntegerType(), nullable=True). \add('rank', IntegerType(), nullable=True). \add('ts', StringType(), nullable=True)df = spark.read.format('csv'). \option('sep', '\t'). \option('header', False). \option('encoding', 'utf-8'). \schema(schema=schema). \load('../data/input/sql/u.data')# write text,只能写出一个列的数据,需要将df转换成单列df# concat_ws函数表示将指定列拼接成一列df.select(F.concat_ws('---', 'user_id', 'movie_id', 'rank', 'ts')). \write. \mode('overwrite'). \format('text'). \save('../data/output/sql/text')# write csvdf.write.mode('overwrite'). \format('csv'). \option('sep', ';'). \option('header', True). \save('../data/output/sql/csv')# write jsondf.write.mode('overwrite'). \format('json'). \save('../data/output/sql/json')# write parquetdf.write.mode('overwrite'). \format('parquet'). \save('../data/output/sql/parquet')

将虚拟机的代码同步到本机,就可得到如图:

9. DataFrame 通过JDBC读写数据库


写出
  JDBC写出,会自动创建表的。因为DataFrame中有表结构信息, StructType记录的各个字段的名称类型和是否运行为空。

读取

10. 总结

  1. DataFrame 在结构层面上由StructField组成列描述,由StructType构造表描述。在数据层面上,Column对象记录列数据,Row对象记录行数据
  2. DataFrame可以从RDD转换、Pandas DF转换、读取文件、读取JDBC等方法构建
  3. spark.read.format()df.write.format() 是DataFrame读取和写出的统一化标准API
  4. SparkSQL默认在Shuffle阶段200个分区,可以修改参数获得最好性能
  5. dropDuplicates可以去重、dropna可以删除缺失值、fillna可以填充缺失值
  6. SparkSQL支持JDBC读写,可用标准API对数据库进行读写操作

PySpark | SparkSQL入门 | DataFrame入门相关推荐

  1. Pandas DataFrame入门教程(图解版)

    Pandas DataFrame入门教程(图解版) DataFrame 是 Pandas 的重要数据结构之一,也是在使用 Pandas 进行数据分析过程中最常用的结构之一,可以这么说,掌握了 Data ...

  2. Android自动化测试-从入门到入门(5)AdapterView的测试

    在之前的文章中,我们简单介绍了Espresso的使用.通过onView()方法我们可以快速定位到界面上我们需要测试的目标元素.整体来说,onView()比较适用于UI比较简单的情况,在不需要过于复杂的 ...

  3. Android视频录制从不入门到入门系列教程(一)————简介

    一.WHY Android SDK提供了MediaRecorder帮助开发者进行视频的录制,不过这个类很鸡肋,实际项目中应该很少用到它,最大的原因我觉得莫过于其输出的视频分辨率太有限了,满足不了项目的 ...

  4. 【新手上路】语法入门算法入门题单

    作者:王清楚 链接:[新手上路]语法入门&算法入门题单_ACM竞赛_ACM/CSP/ICPC/CCPC/比赛经验/题解/资讯_牛客竞赛OJ_牛客网 来源:牛客网 介绍:本题单分为语法入门和算法 ...

  5. 数论基础,从入门到入门

    数论基础,从入门到入门 文章目录 数论基础,从入门到入门 一.常用算法 1.素数筛法 2.线性筛 3.快速幂 4.矩阵快速幂 5.辗转相除法/欧几里得算法 6.扩展欧几里得 7.中国剩余定理/CRT ...

  6. ActiveMQ入门-amq入门

    ActiveMQ是什么 ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线. ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS ...

  7. BIOS从快速入门到入门1------浅谈BIOS

    眨眼间,从一个刚毕业的萌新小白,到现在的BIOS工程师,做了快2年半的x86服务器BIOS,勉强可以算是入门了.踩了很多坑,加了N多班,其中的辛酸和快乐,可能只有自己知道. 先不谈BIOS,我就个人经 ...

  8. GLOG从入门到入门

    GLOG从入门到入门 1 C++日志需求 任何可用的工程代码,都无法保证不出bug.因此,日志系统对于工程代码非常重要.对于像耳东小白这样的初级程序员(感谢某高级程序员大佬赐予"初级程序员& ...

  9. pyspark操作 rdd dataframe,pyspark.sql.functions详解 行列变换

    官网文档可以参考:https://spark.apache.org/docs/latest/api/python/index.html dataframe读写 生成以逗号分隔的数据 stringCSV ...

最新文章

  1. 预测 “疯狂三月” 冠军的办法,我只告诉你!
  2. vs自定义安装包的制作
  3. Please install [clang](http://clang.llvm.org/) or check configuration `clang.executable`
  4. 吴恩达机器学习之多变量线性回归实现部分
  5. mysql 参照完整性规则_MySQL存储引擎你们知道多少?
  6. 知识点026-rsync命令的使用
  7. 牛客 小a与星际探索 bfs
  8. DbVisualizer配置神通数据库驱动
  9. python中numpy.minimum函数
  10. html设置请求头host,Http请求头Host字段作用
  11. 计算机辅助设计和辅助制造简称,计算机辅助设计与制造
  12. 序号 html ol自动缩进,ol自定义序号样式的方法
  13. 联想Y40加装固态硬盘
  14. 前后端分离 Spring Boot + Vue 开发网易云、QQ音乐(附源码)!
  15. 通过bitset库实现sha256
  16. 浙江大学计算机学院 金小刚,金小刚(浙江大学CADCG国家重点实验室教授)_百度百科...
  17. Jmeter dubbo插件测试dubbo接口
  18. 宇宙探險.....如鹏不是游戏。
  19. Kubernetes的学习笔记总结之k8s集群安装部署
  20. 跟我一起写 Makefile-陈浩

热门文章

  1. Java版数据结构之单向链表 新增,有序新增的两种方式,修改和删除(CRUD)
  2. 做个程序员到底好不好
  3. 一支口红用了5年_一支口红多久该扔掉?保质期过了还能用吗?
  4. 04 vue3 scss 组件component kepp-alive缓存子组件 nextTick
  5. 各种Hash函数和代码
  6. Linux面板体验与推荐 AMH、AppNode、WDAP、宝塔
  7. 1.2 Unity3D 的注册
  8. 免费领7天腾讯视频VIP/优酷会员!
  9. fanyibishe
  10. “感动中国”2012年度人物颁奖词