【Spark】Spark SQL, DataFrames and Datasets Guide(翻译文,持续更新)
本文主要是翻译Spark官网Spark SQL programming guide 。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷。目录导航在右上角
Spark SQL、DataFrames 和 Datasets 指南
概述
Spark SQL 是一个结构化数据处理的 Spark 模块 。 与基础的 Spark RDD API 不同的是, Spark SQL 所提供的接口为 Spark 提供了 更多关于数据结构和正在执行的计算结构的信息。 Spark 在其内部利用这些额外的信息去做更多的优化。有几种用于和 Sparrk SQL交互的方法,包括 SQL 和 Dataset API。 当你计算一个结果, 会使用同一个执行引擎, 这独立于你所用来描述这个算法的API和语言。这种一致性意味着开发者可以轻易地在不同的 API 中来回切换, 因为它为表达给定的转换提供了最自然的方式。
本页所有示例使用了 Spark 提供的样例数据并且可以在 spark-shell 、pyspark shell 或者 sparkR shell 中运行。
SQL
Spark SQL 的一个用处是执行 SQL 查询。 Spark SQL 同样可以用来从 现有的 HIVE 中读取数据。 更多有关配置这个特性的信息,请查阅 HIVE Tables 部分。当你使用其他语言执行SQL时,将会返回一个 Dataset 或者 DataFrame 作为结果。你同样可以使用命令行或者 JDBC/ODBC 与 SQL 接口进行交互。
Dataset 和 Dataframe
Dataset 是一种分布式数据集,是 Spark1.6 新增的接口。它提供了RDD(强类型,可以使用强大的 lambda 表达式)的优点,并受益于Spark SQL 的优化执行引擎。Dataset 可以通过 JVM 构建,然后使用转换方法(map, flatMap, filter等等)进行操作。 Dataset API 在 Java 和 Scala 中可用。 Python 并不支持Dataset API。但是由于Python的动态特性, Dataset API 的很多优势都是可用的(比如你可以自然地使用名称 row.columnName 来访问 row 的域 )。 R 语言的情况类似。
DataFrame 是一种按列命名组织的 Dataset, 它在概念上等价于关系型数据库的一个表或者 R/Python 的一个数据帧, 但是它(DataFrame)的底层做了更多的优化。DataFrame 可以通过大量的数据源构建,例如:结构化的数据文件, HIVE 的表, 数据库,或现有的RDD。Java、Python、Scala、R语言都支持 DataFrame API。 在 Scala 和 Java, DataFrame 由Dataset的 rowS 表示。 在 Scala API 中,DataFrame 可以简单地认为是 Dataset[Row] 的别名。 然而,在 Java API 中, 用户需要使用 Dataset<Row> 来表示 DataFrame。
在整个文档中, 我们通常把 Scala/Java Dataset 的 RowS 称为 DataFrames。
准备开始
起点: SparkSession
Spark 所有功能的入口是 SparkSession 类。创建最基本的 SaprkSession, 只需要调用 SparkSession.builder():
scala版
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()// For implicit conversions like converting RDDs to DataFrames import spark.implicits.
在 Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。
java版
import org.apache.spark.sql.SparkSession;SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate();
在 Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。
Python版
from pyspark.sql import SparkSessionspark = SparkSession \.builder \.appName("Python Spark SQL basic example") \.config("spark.some.config.option", "some-value") \.getOrCreate()
在 Spark 仓库 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例代码。
R语言
sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))
在 Spark 仓库 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例代码
请注意,sparkR.session() 第一次被调用时,它会初始化一个全局的 SparkSession 单例对象,并且之后继续调用这个方法都将返回这个实例。 通过这种方式,用户只需要对 SparkSession 做一次初始化,然后 SparkR 的其他方法比如 read.df 将会隐式地访问这个全局地单例对象, 并且用户不需要传递 SparkSession 的实例。
Spark2.0 的 SparkSession 提供了对 HIVE 特性的内置支持, 包括使用 HiveQL 编写查询语句的能力,访问 Hive UDFs 和 从 Hive Table 中读取数据的能力。为了使用这些特性,您需要安装一个 HIVE。
创建 DataFrame
有了SparkSession, 应用程序可以通过本地的 R data.frame、Hive Table、 或者 Spark 数据源 来创建DataFrame。
作为示例,以下代码使用一个 JSON 文件的内容 创建一个 DataFrame
Scala版
val df = spark.read.json("examples/src/main/resources/people.json")// Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
在 Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。
Java版
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");// Displays the content of the DataFrame to stdout df.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
在 Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。
Python版
# spark is an existing SparkSession df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
在 Spark 仓库 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例代码。
R语言
df <- read.json("examples/src/main/resources/people.json")# Displays the content of the DataFrame head(df) ## age name ## 1 NA Michael ## 2 30 Andy ## 3 19 Justin# Another method to print the first few rows and optionally truncate the printing of long values showDF(df) ## +----+-------+ ## | age| name| ## +----+-------+ ## |null|Michael| ## | 30| Andy| ## | 19| Justin| ## +----+-------+
在 Spark 仓库 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例代码
弱类型的 Dataset 操作(aka DataFrame 操作)
DataFrame 为 Scala、Java、Python、R语言提供了一种特定的结构化数据操作。
上面提到过,在 Spark2.0 中,DataFrame 对于 Scala 和 Java API 仅仅是 Dataset 的 RowS。这些操作也被称为 “弱类型转换”,这与 强类型的Scala/Java 中的 “强类型转换” 形成了鲜明的对比。
这里我们囊括了使用 Datasets 做结构化数据处理的基本示例:
Scala版
// This import is needed to use the $-notation import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true)// Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+// Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+// Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+// Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
在 Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。
java版
// col("...") is preferable to df.col("...") import static org.apache.spark.sql.functions.col;// Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true)// Select only the "name" column df.select("name").show(); // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+// Select everybody, but increment the age by 1 df.select(col("name"), col("age").plus(1)).show(); // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+// Select people older than 21 df.filter(col("age").gt(21)).show(); // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+// Count people by age df.groupBy("age").count().show(); // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
在 Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。
Python版
对于Python来说,我们可以通过属性(df.age)或者通过索引(df['age']) 来访问 DataFrame 的列。 虽然前者用于交互式数据探索非常方便, 但使用者强烈建议使用后者,因为它具有前瞻性,并且不会因为 DataFrame 的列命和属性名重复产生冲突。
# spark, df are from the previous example # Print the schema in a tree format df.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true)# Select only the "name" column df.select("name").show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+# Select everybody, but increment the age by 1 df.select(df['name'], df['age'] + 1).show() # +-------+---------+ # | name|(age + 1)| # +-------+---------+ # |Michael| null| # | Andy| 31| # | Justin| 20| # +-------+---------+# Select people older than 21 df.filter(df['age'] > 21).show() # +---+----+ # |age|name| # +---+----+ # | 30|Andy| # +---+----+# Count people by age df.groupBy("age").count().show() # +----+-----+ # | age|count| # +----+-----+ # | 19| 1| # |null| 1| # | 30| 1| # +----+-----+
在 Spark 仓库 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例代码。
R语言
# Create the DataFrame df <- read.json("examples/src/main/resources/people.json")# Show the content of the DataFrame head(df) ## age name ## 1 NA Michael ## 2 30 Andy ## 3 19 Justin# Print the schema in a tree format printSchema(df) ## root ## |-- age: long (nullable = true) ## |-- name: string (nullable = true)# Select only the "name" column head(select(df, "name")) ## name ## 1 Michael ## 2 Andy ## 3 Justin# Select everybody, but increment the age by 1 head(select(df, df$name, df$age + 1)) ## name (age + 1.0) ## 1 Michael NA ## 2 Andy 31 ## 3 Justin 20# Select people older than 21 head(where(df, df$age > 21)) ## age name ## 1 30 Andy# Count people by age head(count(groupBy(df, "age"))) ## age count ## 1 19 1 ## 2 NA 1 ## 3 30 1
在 Spark 仓库 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例代码
关于 DataFrame 可执行的操作的完整列表,请移步 API Documentation。
除了简单的列引用和表示之外,DataFrame 同样有一个丰富的函数库,包括字符串操作、日期算法、常用数学操作 等等。 完整的列表可以在 DataFrame Function Reference.中找到。
以编程方式执行 SQL 查询
Scala版
SparkSession 的 sql 功能使应用可以以编程的方式执行 SQL 查询并且返回一个 DataFrame
// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people")val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
在 Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。
java版
SparkSession 的 sql 功能使应用可以以编程的方式执行 SQL 查询并且返回一个 DataSet<Row>
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people");Dataset<Row> sqlDF = spark.sql("SELECT * FROM people"); sqlDF.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
在 Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。
Python版
SparkSession 的 sql 功能使应用可以以编程的方式执行 SQL 查询并且返回一个 DataFrame
# Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people")sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
在 Spark 仓库 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例代码。
R语言
SparkSession 的 sql 功能使应用可以以编程的方式执行 SQL 查询并且返回一个 SparkDataFrame
df <- sql("SELECT * FROM table")
在 Spark 仓库 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例代码
全局临时视图
Spark SQL 临时视图的作用域是 session 并且如果创建它(临时视图)的 session 终止,视图就会消失。如果你想使一个临时视图在 Spark 应用终止之前可以在所有 session 中共享, 那么你可以创建一个全局临时视图。全局临时视图是和系统保存的数据库 global_temp 联系在一起的, 我们必须使用限定的名称来指代它,比如: SELECT * FROM global_temp.view1
Scala版
// Register the DataFrame as a global temporary view df.createGlobalTempView("people")// Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+// Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
在 Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。
java版
// Register the DataFrame as a global temporary view df.createGlobalTempView("people");// Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+// Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
在 Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。
Python版
# Register the DataFrame as a global temporary view df.createGlobalTempView("people")# Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+# Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
在 Spark 仓库 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例代码。
SQL
CREATE GLOBAL TEMPORARY VIEW temp_view AS SELECT a + 1, b * 2 FROM tblSELECT * FROM global_temp.temp_view
创建 Dataset
Dataset 与 RDD 很像, 当然,它们使用专门的编码器, 而不是 java 序列化或 Kryo, 来序列化一个对象以便在网络上进行处理或者传输。
Scala版
case class Person(name: String, age: Long)// Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+// Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
在 Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。
java版
import java.util.Arrays; import java.util.Collections; import java.io.Serializable;import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders;public static class Person implements Serializable {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;} }// Create an instance of a Bean class Person person = new Person(); person.setName("Andy"); person.setAge(32);// Encoders are created for Java beans Encoder<Person> personEncoder = Encoders.bean(Person.class); Dataset<Person> javaBeanDS = spark.createDataset(Collections.singletonList(person),personEncoder ); javaBeanDS.show(); // +---+----+ // |age|name| // +---+----+ // | 32|Andy| // +---+----+// Encoders for most common types are provided in class Encoders Encoder<Integer> integerEncoder = Encoders.INT(); Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder); Dataset<Integer> transformedDS = primitiveDS.map((MapFunction<Integer, Integer>) value -> value + 1,integerEncoder); transformedDS.collect(); // Returns [2, 3, 4]// DataFrames can be converted to a Dataset by providing a class. Mapping based on name String path = "examples/src/main/resources/people.json"; Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder); peopleDS.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
在 Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。
(2018-10-08)
转载于:https://www.cnblogs.com/yeyeck/p/9665090.html
【Spark】Spark SQL, DataFrames and Datasets Guide(翻译文,持续更新)相关推荐
- 【WAF剖析】——sql注入之奇淫巧技bypass(持续更新中)
作者名:Demo不是emo 主页面链接:主页传送门 创作初心:舞台再大,你不上台,永远是观众,没人会关心你努不努力,摔的痛不痛,他们只会看你最后站在什么位置,然后羡慕或鄙夷 座右铭:不要让时代的悲哀 ...
- Apache Spark 2.2.0 中文文档 - Spark RDD(Resilient Distributed Datasets)
Spark RDD(Resilient Distributed Datasets)论文 概要 1: 介绍 2: Resilient Distributed Datasets(RDDs) 2.1 RDD ...
- Spark之SQL解析(源码阅读十)
如何能更好的运用与监控sparkSQL?或许我们改更深层次的了解它深层次的原理是什么.之前总结的已经写了传统数据库与Spark的sql解析之间的差别.那么我们下来直切主题~ 如今的Spark已经支持多 ...
- spark sql uv_使用Spark Streaming SQL进行PV/UV统计
作者:关文选,花名云魄,阿里云E-MapReduce 高级开发工程师,专注于流式计算,Spark Contributor 1.背景介绍 PV/UV统计是流式分析一个常见的场景.通过PV可以对访问的网站 ...
- 使用Spark Streaming SQL基于时间窗口进行数据统计
1.背景介绍 流式计算一个很常见的场景是基于事件时间进行处理,常用于检测.监控.根据时间进行统计等系统中.比如埋点日志中每条日志记录了埋点处操作的时间,或者业务系统中记录了用户操作时间,用于统计各种操 ...
- Spark Structured SQL : JDBC写入Oracle
1. 背景 想做spark structured sql 写入oracle ,但是报错,代码如下 @Testdef testSinkToOracle1(): Unit = {val host = &q ...
- Spark Structured SQL报错:Stream stream joins without equality predicate is not supported
1.背景 写一个Spark Structured SQL 任务,任务的功能是对kafka的两个topic进行join处理. select q.sysdt, q.systm, q.event_time ...
- spark sql uv_使用Spark Streaming SQL进行PV/UV统计-阿里云开发者社区
作者:关文选,花名云魄,阿里云E-MapReduce 高级开发工程师,专注于流式计算,Spark Contributor 1.背景介绍 PV/UV统计是流式分析一个常见的场景.通过PV可以对访问的网站 ...
- Spark——Spark概述
一.Spark是什么 二.Spark and Hadoop 在之前的学习中,Hadoop的MapReduce是大家广为熟知的计算框架,那为什么咱们还要学习新的计算框架Spark呢,这里就不得不提到Sp ...
最新文章
- 英雄难过棍子关html游戏开发,《英雄难过棍子关》评测:看我变长再变长!
- 高并发-【抢红包案例】之二:使用悲观锁方式修复红包超发的bug
- php公告滚动源码,10行js代码实现上下滚动公告效果方法
- 肖仰华 | 知识图谱落地的基本原则与最佳实践
- Docker存储空间不够,如何Docker修改存储位置以进行扩容
- MAC下 Intellij IDEA GO语言插件安装及简单案例
- Oracle和Mysql中的字符串的拼接
- c语言循环嵌套说课,C语言FOR循环说课稿.doc
- 小学生计算机学科竞赛类活动,自主招生/综合评价认可哪些科创类赛事?2020届参考...
- 最简单的php导出excel文件方法
- Git — 解决“requested upstream branch ‘origin/master‘ does not exist“
- 书写技术文档的模板技术调研文档书写规范
- 计算机生成目录步骤word,word生成目录步骤,word怎样做目录
- Cesium开发基础获——取鼠标点击的经纬度(lon、lat)、高度(height)、相机的视角(heading、pitch、roll)
- 巴菲特致股东的信pdf_巴菲特历年股东大会股东信问答实录集合(共7份)
- 无线信道仿真 matlab,基于Matlab的无线信道仿真.doc
- Vue el-menu-item路由跳转
- 图片识别不了小程序怎么办_图片转文字【小程序】
- 幼儿园带括号算式口诀_幼儿园括号题教案
- 深度学习,提高分类精度