【Spark】Spark Quick Start(快速入门翻译)
本文主要是翻译Spark官网Quick Start。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷
目录导航在右上角,感谢两个大佬(孤傲苍狼 JavaScript自动生成博文目录导航 和 juejiang 为博客园添加目录的配置总结)提供的帮助。这篇文章还有个问题 scala/python/java 使用 Spark 的介绍不能像官网那样可以通过点击导航来显示不同的内容,很影响阅读。我在想办法改进
Quick Start
这个指南提供了使用Spark的快速介绍。我们会首先介绍Spark 交互式编程(使用Python或者Scala)的 API, 然后展示如何用Java、Scala 和 Python来编写应用程序。
为了使用这个指南,您需要先从 Spark 网页 下载打包发布的Spark安装包。由于我们将不会(在指南中)使用HDFS, 您可以下载任意版本的Hadoop安装包。
需要注意的是,Spark2.0 之前, Spark的主要编程接口是弹性分布式数据集(Resilient Distributed Dataset (RDD))。Spark2.0 之后, RDD 被 Dataset 取代,Dataset 和 RDD 一样是强类型,但是在底层进行了更多的优化。Spark2.0 之后仍然支持 RDD 接口,并且您可以从RDD编程指南中 获取更详细的参考。当然,我们强烈建议您选择使用Dataset, 因为它的性能比RDD更好。 查看 SQL编程指南 以得到更多关于Dataset的信息。
使用 Spark Shell 交互式编程
基本操作
Spark Shell 提供了一个简单的方式去学习 API,同时也提供了一个强大的交互式数据分析工具。它可以基于 Scala(一种在java 虚拟机上运行并因此可以很好地使用已有的java库的编程语言)或 Python 使用。在 Spark 目录下运行以下内容来开始(Sprk Shell):
Scala 版
./bin/pyspark
Python 版
./bin/pyspark
如果你当前环境使用pip下载了 PySpark,可以使用如下下方式调用
pyspark
Spark 主要的抽象是一个被叫做 Dataset 的分布式集合。 Dataset 可以通过 Hadoop InputFormat(比如HDFS文件)或者 转换其他 Dataset 中创建。让我们通过 Spark 源目录下的 README 文件内容创建一个新的 Dataset:
Scala 版
scala> val textFile = spark.read.textFile("README.md") textFile: org.apache.spark.sql.Dataset[String] = [value: string]
Python 版
>>> textFile = spark.read.text("README.md")
你可以直接从Dataset中, 通过调用一些操作或者转化Dataset以获得一个新的Dataset来获取它的值。请阅读 API 文档(Scala / Python) 以获取更多细节
Scala 版
scala> textFile.count() // 该Dataset中的成员数量 res0: Long = 126 // 由于README.md 会随着时间的推移不断改变,所以结果可能会有所不同, 其他输出也有类似情况 scala> textFile.first() // 该Dataset的第一个成员 res1: String = # Apache Spark
Python 版
>>> textFile.count() # 该DataFrame中的行数 126>>> textFile.first() # 该DataFrame的第一行 Row(value=u'# Apache Spark')
现在让我们使用该Dataset来转换成一个新的Dataset。 我们调用 filter 来返回一个新的Dataset, 其中包含这个文件内容的子集。
Scala 版
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
Python 版
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
我们可以将数据集转换和数据集操作串接在一起
Scala 版
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15
Python 版
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"? 15
更多关于Dataset的操作
Dataset操作和转换可以用来做更复杂的计算。假设我们想要找到单词数量最多的那行:
Scala 版
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15
这首先将文件中的一行映射成一个整数值,并创建一个新的Dataset。调用该 Dataset 的 reduce 方法以找到最大的单词计数。map 和 reduce 的参数是 Scala 的函数字面量(闭包),并且可以使用任何语言的特性或者 Scala/Java 库。 比如, 我们可以很荣誉地调用任何地方声明地函数(方法)。我们将使用 Math.max() 方法以使这段代码易于理解:
scala> import java.lang.Math import java.lang.Mathscala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15
MapReduce是一种常见的数据流格式, 这是由Hadoop推广的。Spark 可以很容易地实现MapReduce流:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count() wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
这里,我们调用 flatMap 来将一个行级(以文本中的一行为一个成员(Item))的 Dataset 转换成一个 单词 级 的Dataset,然后串接调用 groupByKey 和 count 方法 来计算文件中的每个单词的数量作为(String, Long)数据对形式 的Dateset。 为了在我们的shell中统计出单词的数量, 我们可以调用 collect 方法:
scala> wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
Python 版
>>> from pyspark.sql.functions import * >>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect() [Row(max(numWords)=15)]
这首先将文件中的一行映射成一个整数值 并取一个为 “numWords” 的别名,同时创建一个新的DataFrame。调用该 Dataset 的 agg 方法以找到最大的单词计数。select 和 agg 的参数都是 Colum,我们可以使用 df.colName 方法来从一个DataFrame中获得一个 colum。我们同样可以导入 pyspark.sql.functions, 它提供了很多简易的方法从一个已有的 Colum 构建一个新的 Colum。
MapReduce是一种常见的数据流格式, 这是由Hadoop推广的。Spark 可以很容易地实现MapReduce流:
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
这里,我们在 select 方法中使用了 explode 方法来将一个行级(以文本中的一行为一个成员(Item))的 Dataset 转换成一个 单词 级 的Dataset。然后串接调用 groupByKey 和 count 方法 来计算文件中的每个单词的数量作为一个拥有两个Colum:“word” 和 “count” 的DataFrame。 为了在我们的shell中统计出单词的数量, 我们可以调用 collect 方法:
>>> wordCounts.collect() [Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
缓存(Caching)
Spark同样支持将数据集加入到一个集群中的内存缓存中。当数据被重复访问时,这是非常有用的。比如查询一个小的热点数据集 或者 运行像PageRank 这样的迭代算法。让我们标记我们的 linesWithSpark
作为缓存数据 来作为一个例子:
Scala 版
scala> linesWithSpark.cache() res7: linesWithSpark.type = [value: string]scala> linesWithSpark.count() res8: Long = 15scala> linesWithSpark.count() res9: Long = 15
Python 版
>>> linesWithSpark.cache()>>> linesWithSpark.count() 15>>> linesWithSpark.count() 15
使用Spark来探索和缓存一个100行的文本文件看起来很蠢。有趣的是,这些方法同样可以作用在非常大的数据集中,哪怕它们被分布在数十个或上百个节点中。正如 RDD编程指南 中描述的那样, 您可以通过连接 bin/spark-shell 到一个集群中来进行以上交互式操作。
独立的应用程序
假设我们希望使用 Spark API 编写一个独立的 应用程序。 我们将分别使用Scala(带sbt),Java(带Maven) 和 Python(pip) 编写一个简单的应用程序。
Scala
我们将在 Scala 中创建一个Spark 应用程序——非常简单。 实际上,它被命名为 SimleApp.scala
/* SimpleApp.scala */ import org.apache.spark.sql.SparkSessionobject SimpleApp {def main(args: Array[String]) {val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your systemval spark = SparkSession.builder.appName("Simple Application").getOrCreate()val logData = spark.read.textFile(logFile).cache()val numAs = logData.filter(line => line.contains("a")).count()val numBs = logData.filter(line => line.contains("b")).count()println(s"Lines with a: $numAs, Lines with b: $numBs")spark.stop()} }
注意,这个应用程序需要定义一个 main() 方法 而不是 继承 scala.App. scala.App 的子类可能无法正常地工作。
这个程序只是统计 Spark README 文件中包含 “a” 的行数和 包含"b" 的行数。 注意, 您需要使用 Spark 的安装位置 来代替 YOUR_SPARK_HOME。与之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我们初始化一个SparkSeesion作为程序的一部分。
我们调用 SparkSession.builder 来构造一个 【SparkSession】,然后设置应用的名字, 最后调用 getOrCreate 方法获取一个 【SparkSession】实例。
我们的应用程序取决于Spark API, 所以我们同样需要一个 sbt 配置文件, build.sbt, 这表示 Spark 是一个依赖组件。
name := "Simple Project"version := "1.0"scalaVersion := "2.11.8"libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
为了使 sbt 能够正常工作, 我们需要根据经典的目录结构布局 SimpleApp.scala 和 build.sbt。一旦完成这些,我们就可以创建一个包含这个应用程序源代码的JAR包, 然后使用 spark-submit 脚本运行我们的程序。
# Your directory layout should look like this $ find . . ./build.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala# Package a jar containing your application $ sbt package ... [info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \--class "SimpleApp" \--master local[4] \target/scala-2.11/simple-project_2.11-1.0.jar ... Lines with a: 46, Lines with b: 23
Java
这个例子将会使用 Maven 编译一个JAR 应用程序,但是很多类似的构建系统都可以完成这些工作。
我们将创建一个简单的Spark应用程序, SimpleApp.java
/* SimpleApp.java */ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.Dataset;public class SimpleApp {public static void main(String[] args) {String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your systemSparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();Dataset<String> logData = spark.read().textFile(logFile).cache();long numAs = logData.filter(s -> s.contains("a")).count();long numBs = logData.filter(s -> s.contains("b")).count();System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);spark.stop();} }
这个程序只是统计 Spark README 文件中包含 “a” 的行数和 包含"b" 的行数。 注意, 您需要使用 Spark 的安装位置 来代替 YOUR_SPARK_HOME。与之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我们初始化一个SparkSeesion作为程序的一部分。
为了构建这个程序, 我们同样要编写一个 Maven pom.xml 文件,这个文件将 Spark 列为一个依赖组件。请注意,Spark 构件 被标记为Scala版本
<project><groupId>edu.berkeley</groupId><artifactId>simple-project</artifactId><modelVersion>4.0.0</modelVersion><name>Simple Project</name><packaging>jar</packaging><version>1.0</version><dependencies><dependency> <!-- Spark dependency --><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.3.1</version></dependency></dependencies> </project>
我们根据规范的Maven目录结构列出这些文件
$ find . ./pom.xml ./src ./src/main ./src/main/java ./src/main/java/SimpleApp.java
现在,我们可以使用 Maven 打包这个应用程序并且 通过 ./bin/spark-submit
. 执行
# Package a JAR containing your application $ mvn package ... [INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \--class "SimpleApp" \--master local[4] \target/simple-project-1.0.jar ... Lines with a: 46, Lines with b: 23
Python
这里我们将展示如何使用Python API(PySpark)来编写一个应用程序
如果你正构建一个打包的 PySpark应用程序或库,你可以将它添加到你的 setup.py 文件中, 如下:
install_requires=['pyspark=={site.SPARK_VERSION}']
作为示例,我们将创建一个简单的 Spark 应用程序, SimpleApp.py:
"""SimpleApp.py""" from pyspark.sql import SparkSessionlogFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system spark = SparkSession.builder.appName("SimpleApp").getOrCreate() logData = spark.read.text(logFile).cache()numAs = logData.filter(logData.value.contains('a')).count() numBs = logData.filter(logData.value.contains('b')).count()print("Lines with a: %i, lines with b: %i" % (numAs, numBs))spark.stop()
这个程序只是统计 Spark README 文件中包含 “a” 的行数和 包含"b" 的行数。 注意, 您需要使用 Spark 的安装位置 来代替 YOUR_SPARK_HOME。与之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我们初始化一个SparkSeesion作为程序的一部分。和 Scala 和 Java 例子一样, 我们使用 SparkSession 来创建 Dataset 。 对于使用自定义类或者第三方库的应用程序, 我们同样可以通过它的 --py-- files 参数将代码和依赖打包成zip文件(使用 spark-submit --help 查看细节)的形式 添加到 spark-submit。 SimpleApp 足够简单, 所以我们不用指定任何代码依赖组件。
我们使用 bin/spark-submit 脚本运行这个程序
# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \--master local[4] \SimpleApp.py ... Lines with a: 46, Lines with b: 23
如果您将PySpark通过 pip 安装到了您的环境中(eg. pip install pyspark),根据您的喜好,可以使用常规的Python解释器 或者 使用 spark-submit 来运行您的程序
# Use the Python interpreter to run your application $ python SimpleApp.py ... Lines with a: 46, Lines with b: 23
下一步
祝贺您运行了您的第一个 Spark 应用程序
关于API的深入概述,请从 RDD 编程指南 和 SQL 编程指南 开始, 或者 查看编程指南菜单 以了解其他组件
关于使用集群运行应用程序,请移步 部署概述
最后, Spark 包含了几个简单的例子, 它们被保存在 example 目录下(Scala, Java, Python, R),你可以按照以下方式运行它们:
# For Scala and Java, use run-example: ./bin/run-example SparkPi# For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py# For R examples, use spark-submit directly: ./bin/spark-submit examples/src/main/r/dataframe.R
【Spark】Spark Quick Start(快速入门翻译)相关推荐
- spark之1:快速入门
spark之1:快速入门 @(SPARK)[spark, 大数据] spark可以通过交互式命令行及编程两种方式来进行调用: 前者支持scala与python 后者支持scala.python与jav ...
- 【Qt5开发】Qt Quick/QML快速入门视频教程
learnqml 免费试看地址:https://jiaoyu.taobao.com/course/QKT_2081022 完整视频购买地址:https://item.taobao.com/item.h ...
- Spark快速入门指南 – Spark安装与基础使用
本文转载自Spark快速入门指南 – Spark安装与基础使用 Apache Spark 是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象.Spark 正如其名,最大的特点就是快(Lightni ...
- Apache Spark 2.2.0 中文文档 - 快速入门 | ApacheCN
快速入门 使用 Spark Shell 进行交互式分析 基础 Dataset 上的更多操作 缓存 独立的应用 快速跳转 本教程提供了如何使用 Spark 的快速入门介绍.首先通过运行 Spark 交互 ...
- spark SQL快速入门 1-9 慕课网
1.hadoop安装 1.修改hadoop配置文件hadoop-env.shexport JAVA_HOME=/home/hadoop/app/jdk1.8.0_91core-site.xml< ...
- [学习笔记]黑马程序员Spark全套视频教程,4天spark3.2快速入门到精通,基于Python语言的spark教程
文章目录 视频资料: 思维导图 一.Spark基础入门(环境搭建.入门概念) 第二章:Spark环境搭建-Local 2.1 课程服务器环境 2.2 Local模式基本原理 2.3 安装包下载 2.4 ...
- 【Spark】介绍 快速入门
目录 介绍 Spark and Hadoop Spark or Hadoop 核心模块 Spark Core Spark SQL Spark Streaming Spark MLlib Spark G ...
- 一二, Spark概述和快速入门
一, Spark概述 1.1 什么是Spark Spark是一种基于内存的快速,通用,可扩展的大数据分析计算引擎; "Apache Spark" is a unified anal ...
- Spark 安装部署与快速上手
核心概念 Spark 是 UC Berkeley AMP lab 开发的一个集群计算的框架,类似于 Hadoop,但有很多的区别. 最大的优化是让计算任务的中间结果可以存储在内存中,不需要每次都写入 ...
最新文章
- u-charts 曲线图中间有部分没数据,导致点和点无法连成线的问题解决
- 《jQuery Mobile入门经典》—— 第 1 章 了解jQuery Mobile
- 【直播回顾】云栖社区特邀专家徐雷Java Spring Boot开发实战系列课程(第19讲):Java Spring Cloud微服务架构模式与开发实战...
- python shell怎么调字体_Python3设置在shell脚本中自动补全功能的方法
- Web开发-Django模型层
- 教你玩转CSS Float(浮动)
- Linux学习笔记14
- ubuntu10.04以及10.10安装配置tftp服务
- 图片、图标等网址推荐
- 优启通制作系统u盘_如何用优启通制作U盘启动盘
- mysql中查看密码有效期_Mysql5.7.9密码已过有效期的处理过程
- 微软 游戏服务器,微软正式公布游戏串流服务「Project xCloud」
- 建议收藏!最全自然语言处理时事简报
- Sedawk笔记之awk篇
- QtScrcpy手机投屏电脑利器连接Android设备
- FCPX:镜头故障抖动效果TOBK TWITCH for Mac
- JS获取IP地址,登录地点的方法
- 混合云爆发,F5席卷“代码到应用”全程的“野心”
- mongodb集群 java_Mongodb集群操作的JAVA代码详解
- Linux服务器 | 01.服务器购买与基本配置