基本概要

Spark 是一种快速、通用、可扩展的大数据分析引擎,是基于内存计算的大数据并行计算框架。Spark 在 2009 年诞生于加州大学伯克利分校 AMP 实验室,2010 年开源,2014 年 2月成为 Apache 顶级项目。

本文是 Spark 系列教程的第一篇,通过大数据中的 “Hello World” – Word Count 实验带领大家快速上手 Spark。Word Count 顾名思义就是对单词进行计数,我们首先会对文件中的单词做统计计数,然后输出出现次数最多的 3 个单词。

前提条件

本文中会使用 spark-shell 来演示 Word Count 示例的执行过程。spark-shell 是提交 Spark 作业众多方式中的一种,提供了交互式运行环境(REPL,Read-Evaluate-Print-Loop),在 spark-shell 上输入代码后就可以立即得到响应。spark-shell 在运行的时候,依赖于 Java 和 Scala 语言环境。因此,为了保证 spark-shell 的成功启动,需要在本地预装 Java 与 Scala。

本地安装 Spark

下载并解压安装包

从 Spark 官网 下载安装包,选择最新的预编译版本即可,然后将安装包解压到本地电脑的任意目录。

设置环境变量

为了在本地电脑的任意目录下都可以直接运行 Spark 相关的命令,我们需要设置一下环境变量。我本地的 Mac 电脑使用的是 zsh 作为终端 shell,编辑 ~/.zshrc 文件设置环境变量,如果是 bash 可以编辑 /etc/profile 文件。

export SPARK_HOME=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2
export PATH=$PATH:$SPARK_HOME/bin

加载环境变量:

source ~/.zshrc

在终端输入 spark-shelll --version 命令,如果显示以下内容,表示我们已经成功在本地安装好了 Spark。

❯ spark-shell --version
Welcome to____              __/ __/__  ___ _____/ /___\ \/ _ \/ _ `/ __/  '_//___/ .__/\_,_/_/ /_/\_\   version 3.1.2/_/Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_302
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z
Revision de351e30a90dd988b133b3d00fa6218bfcaba8b8
Url https://github.com/apache/spark
Type --help for more information.

Spark 基本概念

在开始实验之前,先介绍 3 个 Spark 中的概念,分别是 spark、sparkContext 和 RDD。

  • spark 和 sparkContext 分别是两种不同的开发入口实例:

    • spark 是开发入口 SparkSession 实例(Instance),SparkSession 在 spark-shell 中会由系统自动创建;
    • sparkContext 是开发入口 SparkContext 实例。在 Spark 版本演进的过程中,从 2.0 版本开始,SparkSession 取代了 SparkContext,成为统一的开发入口。本文中使用 sparkContext 进行开发。
  • RDD 的全称是 Resilient Distributed Dataset,意思是“弹性分布式数据集”。RDD 是 Spark 对于分布式数据的统一抽象,它定义了一系列分布式数据的基本属性与处理方法。

实现 Word Count

Word Count 的整体执行过程示意图如下,接下来按照读取内容、分词、分组计数、排序、取 Top3 出现次数的单词这 5 个步骤对文件中的单词进行处理。

准备文件

/Users/chengzhiwei/tmp/wordcount.txt 文件中写入以下内容:

Spark Hive Hadoop
Kubernetes Elasticsearch Spark
Doris Zookeeper Hadoop
Spark Hive Hudi Iceberg
Kafka Pulsar RocketMQ Hadoop Spark

第 1 步:读取文件

首先,我们调用 SparkContext 的 textFile 方法,读取源文件,生成 RDD[String] 类型的 RDD,文件中的每一行是数组中的一个元素。

//导包
import org.apache.spark.rdd.RDD// 文件路径
val file: String = "/Users/chengzhiwei/tmp/wordcount.txt"// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)

第 2 步:分词

“分词”就是把“数组”的行元素打散为单词。要实现这一点,我们可以调用 RDD 的 flatMap 方法来完成。flatMap 操作在逻辑上可以分成两个步骤:映射展平

// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))

首先使用空格作为分隔符,将 lineRDD 中的行元素转换为单词,分割之后,每个行元素就都变成了单词数组,元素类型也从 String 变成了 Array[String],像这样以元素为单位进行转换的操作,统一称作“映射”。

映射过后,RDD 类型由原来的 RDD[String]变为 RDD[Array[String]]。如果把 RDD[String]看成是“数组”的话,那么 RDD[Array[String]]就是一个“二维数组”,它的每一个元素都是单词。接下来我们需要对这个“二维数组”做展平,也就是去掉内层的嵌套结构,把“二维数组”还原成“一维数组”。

第 3 步:分组计数

在 RDD 的开发框架下,聚合类操作,如计数、求和、求均值,需要依赖键值对(key value pair)类型的数据元素。因此,在调用聚合算子做分组计数之前,我们要先把 RDD 元素转换为(key,value)的形式,也就是把 RDD[String] 映射成 RDD[(String, Int)]。

使用 map 方法将 word 映射成 (word,1) 的形式,所有的 value 的值都设置为 1,对于同一个的单词,在后续的计数运算中,我们只要对 value 做累加即可。

// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = wordRDD.map(word => (word, 1))

完成了形式的转换之后,我们就该正式做分组计数了。分组计数其实是两个步骤,也就是先“分组”,再“计数”。我们使用聚合算子 reduceByKey 来同时完成分组和计数这两个操作。对于 kvRDD 这个键值对“数组”,reduceByKey 先是按照 Key(也就是单词)来做分组,分组之后,每个单词都有一个与之对应的 value 列表。然后根据用户提供的聚合函数,对同一个 key 的所有 value 做 reduce 运算,这里就是对 value 进行累加。

// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)

第 4 步:排序

现在得到的 wordCounts RDD 中 key 是单词,value 是这个单词出现的次数,我们最终要取 Top3 出现次数的单词,首先要根据单词出现的次数进行逆序排序。

先交换 wordCounts RDD 中的 key 和 value 中的位置,方便下一步排序。

// 交换 key 和 value 的位置
val exchangeRDD: RDD[(Int, String)] = wordCounts.map{case (k,v)=>(v,k)}

根据单词出现的次数逆序排序,false 表示逆序排序。

// 根据单词出现的次数逆序排序
val sortRDD: RDD[(Int, String)] = exchangeRDD.sortByKey(false)

第 5 步:取 Top3 出现次数的单词

使用 take 方法获取排序后数组中前 3 个元素。

// 取 Top3 出现次数的单词
sortRDD.take(3)

完整代码

将以下代码在 spark-shell 中执行:

//导包
import org.apache.spark.rdd.RDD//第 1 步:读取文件
// 文件路径
val file: String = "/Users/chengzhiwei/tmp/wordcount.txt"// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file) //第 2 步:分词
// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))// 第 3 步:分组计数
// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = wordRDD.map(word => (word, 1)) // 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y) //第 4 步:排序
// 交换 key 和 value 的位置
val exchangeRDD: RDD[(Int, String)] = wordCounts.map{case (k,v)=>(v,k)}// 根据单词出现的次数逆序排序
val sortRDD: RDD[(Int, String)] = exchangeRDD.sortByKey(false)// 第 5 步:取 Top3 出现次数的单词
sortRDD.take(3)

输出结果如下,可以看到 Top3 出现次数的单词分别是 Spark,Hadoop,Hive。到此为止,我们成功实现了 Word Count 的功能。

Array[(Int, String)] = Array((4,Spark), (3,Hadoop), (2,Hive))

简化写法

上面实现 Word Count 的代码看起来稍稍有些复杂,我们可以使用链式调用的写法将上面的代码简化成一行代码,通过 . 的方式调用 RDD 中的方法,返回结果是新的 RDD,可以继续用 . 调用新 RDD 中的方法。

//读取文件
//sc 表示 sparkContext 实例
sc.textFile("/Users/chengzhiwei/tmp/wordcount.txt").
//根据空格分词
flatMap(line => line.split(" ")).
//分组,统一把 value 设置为 1
map(word => (word,1)).
//对相同 key 的 value 进行累加
reduceByKey((k,v) => (k+v)).
//把(key,value)对调,目的是按照计数来排序,(Spark,4) => (4,Spark)
map{case (k,v)=>(v,k)}.
//降序排序
sortByKey(false).
//取前 3
take(3)

Scala 语言为了让函数字面量更加精简,还可以使用下划线 _ 作为占位符,用来表示一个或多个参数。我们用来表示的参数必须满足只在函数字面量中出现一次。因此上面的写法可以进一步简化为以下代码:

//读取文件
sc.textFile("/Users/chengzhiwei/tmp/wordcount.txt").
//根据空格分词
flatMap(_.split(" ")).
//分组,统一把 value 设置为 1
map((_,1)).
//对相同 key 的 value 进行累加
reduceByKey(_+_).
//把(key,value)对调,目的是按照计数来排序,(Spark,4) => (4,Spark)
map{case (k,v)=>(v,k)}.
//降序排序
sortByKey(false).
//取前 3
take(3)

欢迎关注

Spark 系列教程(1)Word Count相关推荐

  1. 史上最简单的spark系列教程 | 完结

    <史上最简单的spark系列教程>系列: 与其说是教程不如说是改良后的个人学习笔记|| 教程参考自:<spark官方文档>,<spark快速分析>,<图解Sp ...

  2. spark python教程_spark2.x由浅入深深到底系列七之python开发spark环境配置

    课程咨询以及领取大额优惠请加微信:bigdatatang01 以下是在mac操作系统上配置用python开发spark的环境 一.安装python spark2.2.0需要python的版本是Pyth ...

  3. WORD系列教程-多级编号制作合同

    我们先看一下要制作的合同效果,在这个合同里面有大的条款如第一条和细节(1.2.).所以就得通过WORD的多级编号来实现.而其里面还有段落没有项目编号.大飞老师今天主要讲一下这个知识点,大家跟着大飞老师 ...

  4. Spark Streaming的Word Count

    Spark Streaming的Word Count 需求&准备 图解 首先在linux服务器上安装nc工具 nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据 ...

  5. 【大数据Spark系列】Spark教程:详细全部

    Spark作为Apache顶级的开源项目,是一个快速.通用的大规模数据处理引擎,和Hadoop的MapReduce计算框架类似,但是相对于MapReduce,Spark凭借其可伸缩.基于内存计算等特点 ...

  6. python数据挖掘案例系列教程——python实现搜索引擎

    全栈工程师开发手册 (作者:栾鹏) python数据挖掘系列教程 今天我们使用python实现一个网站搜索引擎.主要包含两个部分.网站数据库的生成.搜索引擎.其中搜索引擎部分我们使用单词频度算法.单词 ...

  7. python机器学习案例系列教程——文档分类器,朴素贝叶斯分类器,费舍尔分类器

    全栈工程师开发手册 (作者:栾鹏) python数据挖掘系列教程 github地址:https://github.com/626626cdllp/data-mining/tree/master/Bay ...

  8. 2021年最新最全Flink系列教程_Flink快速入门(概述,安装部署)(一)(JianYi收藏)

    文章目录 引言 相关教程直通车: day01_Flink概述.安装部署和入门案例 今日目标 Flink概述 什么是批处理和流处理 为什么需要流计算 Flink的发展史 Flink的官方介绍 Flink ...

  9. spark sql教程_如何使用Spark SQL:动手教程

    spark sql教程 在本系列的第一部分中,我们研究了使用Apache Spark SQL和DataFrames "大规模"利用关系数据库的功能方面的进展. 现在,我们将基于现实 ...

最新文章

  1. P2737 [USACO4.1]麦香牛块Beef McNuggets 数学题 + 放缩思想
  2. python编程基础与应用-Python编程基础与应用
  3. python的输出方式_Python--输出方式
  4. [YTU]_2641 9 填空题:静态成员---计算学生个数)
  5. D1net阅闻:IBM宣布推出全新存储技术 存储速度快70倍
  6. ActiveMQ_3Java实现
  7. SpringBatch tasklet实现和用法(十)
  8. 11.IDA-this指针
  9. linux mmap系统调用
  10. React 第五章 state 组件生命周期
  11. 如何利用jQuery post传递含特殊字符的数据
  12. Thymeleaf无法显示css样式
  13. C语言嵌入式系统编程修炼之软件架构篇
  14. echarts使用之坑 隐藏显示echarts变形 echarts官网访问不了
  15. C# TCP通讯大族激光打标机
  16. win2012 安装华为U2000 U2000V200R018C60SPC200 解决”使用isql命令连接SQL Server服务失败“+
  17. 《超越自卑的活法Live》内容整理
  18. python 缩放图片_Python实现图片尺寸缩放脚本
  19. HASH 、MD、SHA、MAC、HMAC、SM3
  20. 宝塔linux 加载zend,Linux CentOS 安装 Zend Guard Loader 组件

热门文章

  1. 网络新宠儿EyeOS和YouOS:Web版操作系统
  2. 清北学堂noip2019集训D2——数据结构
  3. python爬虫--【百度知道】自动答题
  4. linux找不到telnet命令,Linux安装telnet
  5. 2023美赛F题全部代码+数据+结果 数学建模
  6. 推荐系统-召回-概述(一):内容为王
  7. 燃气管道泄漏监测监控报警管理系统软件解决方案
  8. 小程序使用three.js开发VR场景漫游
  9. 临界时间节点外卖业务订单如何处理订单
  10. 递推算法题:王小二刀工之王小二切大饼