文章目录

  • 零、本讲学习目标
  • 一、RDD算子
  • 二、准备工作
    • (一)准备文件
      • 1、准备本地系统文件
      • 2、准备HDFS系统文件
    • (二)启动Spark Shell
      • 1、启动HDFS服务
      • 2、启动Spark服务
      • 3、启动Spark Shell
  • 三、转化算子
    • (一)映射算子 - map()
      • 1、映射算子功能
      • 2、映射算子案例
        • 任务1、将rdd1每个元素翻倍得到rdd2
        • 任务2、将rdd1每个元素平方得到rdd2
    • (二)过滤算子 - filter()
      • 1、过滤算子功能
      • 2、过滤算子案例
        • 任务1、过滤出列表中的偶数
        • 任务2、过滤出文件中包含`spark`的行
    • (三)扁平映射算子 - flatMap()
      • 1、扁平映射算子功能
      • 2、扁平映射算子案例
        • 任务1、统计文件中单词个数
        • 任务2、统计不规则二维列表元素个数
    • (四)按键归约算子 - reduceByKey()
      • 1、按键归约算子功能
      • 2、按键归约算子案例
        • 任务:计算学生总分
    • (五)合并算子 - union()
      • 1、合并算子功能
      • 2、合并算子案例
    • (六)排序算子 - sortBy()
      • 1、排序算子功能
      • 2、排序算子案例
    • (七)按键排序算子 - sortByKey()
      • 1、按键排序算子功能
      • 2、按键排序算子案例
    • (八)连接算子
      • 1、内连接算子 - join()
        • (1)内连接算子功能
        • (2)内连接算子案例
      • 2、左外连接算子 - leftOuterJoin()
        • (1)左外连接算子功能
        • (2)左外连接算子案例
      • 3、右外连接算子 - rightOuterJoin()
        • (1)右外连接算子功能
        • (2)右外连接算子案例
      • 4、全外连接算子 - fullOuterJoin()
        • (1)全外连接算子功能
        • (2)全外连接算子案例
    • (九)交集算子 - intersection()
      • 1、交集算子功能
      • 2、交集算子案例
    • (十)去重算子 - distinct()
      • 1、去重算子功能
      • 2、去重算子案例
    • (十一)组合分组算子 - cogroup()
      • 1、组合分组算子功能
      • 2、组合分组算子案例
  • 四、行动算子
    • (一)归约算子 - reduce()
      • 1、归约算子功能
      • 2、归约算子案例
    • (三)按键计数算子 - countByKey()
      • 1、按键计数算子功能
      • 2、按键计数算子案例
    • (四)前截取算子 - take(n)
      • 1、前截取算子功能
      • 2、前截取算子案例
    • (五)遍历算子 - foreach()
      • 1、遍历算子功能
      • 2、遍历算子案例
    • (六)存文件算子 - saveAsFile()
      • 1、存文件算子功能
      • 2、存文件算子案例

零、本讲学习目标

  1. 掌握转化算子的使用
  2. 掌握行动算子的使用

一、RDD算子

  • RDD被创建后是只读的,不允许修改。Spark提供了丰富的用于操作RDD的方法,这些方法被称为算子。一个创建完成的RDD只支持两种算子:转化(Transformation)算子和行动(Action)算子。

二、准备工作

(一)准备文件

1、准备本地系统文件

  • /home目录里创建words.txt

2、准备HDFS系统文件

  • words.txt上传到HDFS系统的/park目录里
  • 说明:/park是在上一讲我们创建的目录

(二)启动Spark Shell

1、启动HDFS服务

  • 执行命令:start-dfs.sh

2、启动Spark服务

  • 进入Spark的sbin目录执行命令:./start-all.sh

3、启动Spark Shell

  • 执行名命令: spark-shell --master spark://master:7077

三、转化算子

  • 转化算子负责对RDD中的数据进行计算并转化为新的RDD。Spark中的所有转化算子都是惰性的,因为它们不会立即计算结果,而只是记住对某个RDD的具体操作过程,直到遇到行动算子才会与行动算子一起执行。

(一)映射算子 - map()

1、映射算子功能

  • map()是一种转化算子,它接收一个函数作为参数,并把这个函数应用于RDD的每个元素,最后将函数的返回结果作为结果RDD中对应元素的值。

2、映射算子案例

  • 预备工作:创建一个RDD - rdd1

任务1、将rdd1每个元素翻倍得到rdd2

  • 对rdd1应用map()算子,将rdd1中的每个元素平方并返回一个名为rdd2的新RDD
  • 上述代码中,向算子map()传入了一个函数x = > x * 2。其中,x为函数的参数名称,也可以使用其他字符,例如a => a * 2。Spark会将RDD中的每个元素传入该函数的参数中。
  • rdd1和rdd2中实际上没有任何数据,因为parallelize()map()都为转化算子,调用转化算子不会立即计算结果。若需要查看计算结果,则可使用行动算子collect()
  • 执行rdd2.collect()进行计算,并将结果以数组的形式收集到当前Driver。因为RDD的元素为分布式的,数据可能分布在不同的节点上。
  • 其实,向map()算子传入函数的参数可以用下划线_来代替,并且可以简化成_ * 2

把 x => func(x) 简化为 func _ 或 func 的过程称为eta-conversion
把 func 或 func _ 展开为 x => func(x) 的过程为eta-expansion

  • 上述使用map()算子的运行过程如下图所示

任务2、将rdd1每个元素平方得到rdd2

  • 方法一、采用普通函数作为参数传给map()算子
  • 方法二、采用下划线表达式作为参数传给map()算子
  • 刚才翻倍用的是map(_ * 2),很自然地想到平方应该是map(_ * _)
  • 报错,(_ * _)经过eta-expansion变成普通函数,不是我们预期的x => x * x,而是(x$1, x$2) => (x$1 * x$2),不是一元函数,而是二元函数,系统立马就蒙逼了,不晓得该怎么取两个参数来进行乘法运算。
  • 难道就不能用下划线参数了吗?当然可以,但是必须保证下划线表达式里下划线只出现1次。引入幂函数scala.math.pow就可以搞定。
  • 但是有点瑕疵,rdd2的元素变成了双精度实数,得转化成整数

(二)过滤算子 - filter()

1、过滤算子功能

  • 通过函数func对源RDD的每个元素进行过滤,并返回一个新的RDD。

2、过滤算子案例

任务1、过滤出列表中的偶数

  • 基于列表创建RDD,然后利用过滤算子得到偶数构成的新RDD

任务2、过滤出文件中包含spark的行

  • 执行命令: val lines= sc.textFile("hdfs://master:9000/park/words.txt"),读取文件/park/words.txt生成RDD - lines
  • 执行命令:val sparkLines = lines.filter(_.contains("spark")),过滤包含spark的行生成RDD - sparkLines
  • 执行命令:sparkLines.collect(),查看sparkLines内容

(三)扁平映射算子 - flatMap()

1、扁平映射算子功能

  • flatMap()算子与map()算子类似,但是每个传入给函数func的RDD元素会返回0到多个元素,最终会将返回的所有元素合并到一个RDD。

2、扁平映射算子案例

任务1、统计文件中单词个数

  • 读取文件,生成RDD - rdd1
  • 按空格拆分,做扁平映射,生成新RDD - rdd2
  • 大家可以看到,经过扁平映射,生成的RDD是一个单词构成一个元素,原先的RDD是五行单词构成五个元素
  • 执行命令:rdd2.count(),即可知单词个数
  • 扁平映射算子的运行过程,rdd1的5个元素经过扁平映射变成了rdd2的20个元素

任务2、统计不规则二维列表元素个数

[78151049728142147−4]\left[ \begin{matrix} 7 & 8 & 1 & 5 &\\ 10 & 4 & 9 \\ 7 & 2 & 8 & 1 & 4 \\ 21 & 4 & 7 & -4 \end{matrix} \right]⎣⎢⎢⎡​710721​8424​1987​51−4​4​⎦⎥⎥⎤​

  • 方法一、利用Scala来实现
val m = List(List(7, 8, 1, 5),List(10, 4, 9), List(7, 2, 8, 1, 4),List(21, 4, 7, -4))val arr = m.flattenarr.size
  • 方法二、利用Spark来实现
val rdd1 = sc.makeRDD(List(List(7, 8, 1, 5),List(10, 4, 9), List(7, 2, 8, 1, 4),List(21, 4, 7, -4)))val rdd2 = rdd1.flatMap(x => x.toString.substring(5, x.toString.length - 1).split(", "))rdd2.collect()scala> rdd2.count()

(四)按键归约算子 - reduceByKey()

1、按键归约算子功能

  • reduceByKey()算子的作用对像是元素为(key,value)形式(Scala元组)的RDD,使用该算子可以将相同key的元素聚集到一起,最终把所有相同key的元素合并成一个元素。该元素的key不变,value可以聚合成一个列表或者进行求和等操作。最终返回的RDD的元素类型和原有类型保持一致。

2、按键归约算子案例

任务:计算学生总分

  • 成绩表
姓名 语文 数学 英语
张钦林 78 90 76
陈燕文 95 88 98
卢志刚 78 80 60
  • 创建成绩列表scores,基于成绩列表创建rdd1,对rdd1按键归约得到rdd2,然后查看rdd2内容
val scores = List(("张钦林", 78), ("张钦林", 90), ("张钦林", 76),("陈燕文", 95), ("陈燕文", 88), ("陈燕文", 98),("卢志刚", 78),     ("卢志刚", 80), ("卢志刚", 60))val rdd1 = sc.parallelize(scores)val rdd2 = rdd1.reduceByKey((x, y) => x + y)rdd2.collect()
  • 不仅可以在Spark Shell里完成任务,也可以编写Scala程序生成jar提交到Spark服务器运行。有兴趣的同学不妨参看《Spark案例:两种方式计算学生总分》

(五)合并算子 - union()

1、合并算子功能

  • union()算子将两个RDD合并为一个新的RDD,主要用于对不同的数据来源进行合并,两个RDD中的数据类型要保持一致。

2、合并算子案例

  • 创建两个RDD,合并成一个新RDD

(六)排序算子 - sortBy()

1、排序算子功能

  • sortBy()算子将RDD中的元素按照某个规则进行排序。该算子的第一个参数为排序函数,第二个参数是一个布尔值,指定升序(默认)或降序。若需要降序排列,则需将第二个参数置为false。

2、排序算子案例

  • 一个数组中存放了三个元组,将该数组转为RDD集合,然后对该RDD按照每个元素中的第二个值进行降序排列。
  • sortBy(x=>x._2,false)中的x代表rdd1中的每个元素。由于rdd1的每个元素是一个元组,因此使用x._2取得每个元素的第二个值。当然,sortBy(x=>x._2,false)也可以直接简化为sortBy(_._2,false)

(七)按键排序算子 - sortByKey()

1、按键排序算子功能

  • sortByKey()算子将(key,value)形式的RDD按照key进行排序。默认升序,若需降序排列,则可以传入参数false。

2、按键排序算子案例

  • 将三个二元组构成的RDD按键先降序排列,然后升序排列

(八)连接算子

1、内连接算子 - join()

(1)内连接算子功能

  • join()算子将两个(key, value)形式的RDD根据key进行连接操作,相当于数据库的内连接(Inner Join),只返回两个RDD都匹配的内容。

(2)内连接算子案例

  • 将rdd1与rdd2进行内连接

2、左外连接算子 - leftOuterJoin()

(1)左外连接算子功能

  • leftOuterJoin()算子与数据库的左外连接类似,以左边的RDD为基准(例如rdd1.leftOuterJoin(rdd2),以rdd1为基准),左边RDD的记录一定会存在。例如,rdd1的元素以(k,v)表示,rdd2的元素以(k, w)表示,进行左外连接时将以rdd1为基准,rdd2中的k与rdd1的k相同的元素将连接到一起,生成的结果形式为(k,(v,Some(w))。rdd1中其余的元素仍然是结果的一部分,元素形式为(k,(v,None)。Some和None都属于Option类型,Option类型用于表示一个值是可选的(有值或无值)。若确定有值,则使用Some(值)表示该值;若确定无值,则使用None表示该值。

(2)左外连接算子案例

  • rdd1与rdd2进行左外连接

3、右外连接算子 - rightOuterJoin()

(1)右外连接算子功能

  • rightOuterJoin()算子的使用方法与leftOuterJoin()算子相反,其与数据库的右外连接类似,以右边的RDD为基准(例如rdd1.rightOuterJoin(rdd2),以rdd2为基准),右边RDD的记录一定会存在。

(2)右外连接算子案例

  • rdd1与rdd2进行右外连接

4、全外连接算子 - fullOuterJoin()

(1)全外连接算子功能

  • fullOuterJoin()算子与数据库的全外连接类似,相当于对两个RDD取并集,两个RDD的记录都会存在。值不存在的取None。

(2)全外连接算子案例

  • rdd1与rdd2进行全外连接

(九)交集算子 - intersection()

1、交集算子功能

  • intersection()算子对两个RDD进行交集操作,返回一个新的RDD。要求两个算子类型要一致。

2、交集算子案例

  • rdd1与rdd2进行交集操作

(十)去重算子 - distinct()

1、去重算子功能

  • distinct()算子对RDD中的数据进行去重操作,返回一个新的RDD。有点类似与集合的不允许重复元素。

2、去重算子案例

  • 去掉rdd中重复的元素

(十一)组合分组算子 - cogroup()

1、组合分组算子功能

  • cogroup()算子对两个(key, value)形式的RDD根据key进行组合,相当于根据key进行并集操作。例如,rdd1的元素以(k, v)表示,rdd2的元素以(k, w)表示,执行rdd1.cogroup(rdd2)生成的结果形式为(k, (Iterable, Iterable))。

2、组合分组算子案例

  • rdd1与rdd2进行组合分组操作

四、行动算子

  • Spark中的转化算子并不会马上进行运算,而是在遇到行动算子时才会执行相应的语句,触发Spark的任务调度。

(一)归约算子 - reduce()

1、归约算子功能

  • reduce()算子按照传入的函数进行归约计算

2、归约算子案例

  • 计算1+2+3+……+1001 + 2 + 3 + …… + 1001+2+3+……+100的值
  • 计算12+22+32+42+521^2 + 2^2 + 3^2 + 4^2 + 5^212+22+32+42+52的值

(三)按键计数算子 - countByKey()

1、按键计数算子功能

  • 按键统计RDD键值出现的次数,返回由键值和次数构成的映射。

2、按键计数算子案例

  • List集合中存储的是键值对形式的元组,使用该List集合创建一个RDD,然后对其进行countByKey()的计算。

(四)前截取算子 - take(n)

1、前截取算子功能

  • 返回RDD的前n个元素(同时尝试访问最少的partitions),返回结果是无序的,测试使用。

2、前截取算子案例

  • 返回集合中前5个元素组成的数组

(五)遍历算子 - foreach()

1、遍历算子功能

  • 计算 RDD中的每一个元素,但不返回本地(只是访问一遍数据),可以配合println()友好打印数据。

2、遍历算子案例

  • 将RDD里的每个元素平方后输出

(六)存文件算子 - saveAsFile()

1、存文件算子功能

  • 将RDD数据保存到本地文件或HDFS文件

2、存文件算子案例

  • 将rdd内容保存到HDFS的/park/out.txt

Spark基础学习笔记17:掌握RDD算子相关推荐

  1. Spark基础学习笔记20:RDD持久化、存储级别与缓存

    文章目录 零.本讲学习目标 一.RDD持久化 (一)引入持久化的必要性 (二)案例演示持久化操作 1.RDD的依赖关系图 2.不采用持久化操作 3.采用持久化操作 二.存储级别 (一)持久化方法的参数 ...

  2. Spark基础学习笔记19:RDD的依赖与Stage划分

    文章目录 零.本讲学习目标 一.RDD的依赖 (一)窄依赖 1.map()与filter()算子 2.union()算子 3.join()算子 (二)宽依赖 1.groupBy()算子 2.join( ...

  3. 2022年Spark基础学习笔记目录

    一.Spark学习笔记 在私有云上创建与配置虚拟机 Spark基础学习笔记01:初步了解Spark Spark基础学习笔记02:Spark运行时架构 Spark基础学习笔记03:搭建Spark单机版环 ...

  4. Spark基础学习笔记16:创建RDD

    文章目录 零.本讲学习目标 一.RDD为何物 (一)RDD概念 (二)RDD示例 (三)RDD主要特征 二.做好准备工作 (一)准备文件 1.准备本地系统文件 2.准备HDFS系统文件 (二)启动Sp ...

  5. Spark基础学习笔记10:Scala集成开发环境

    文章目录 零.本讲学习目标 一.搭建Scala的Eclipse开发环境 (一)安装Scala插件 (二)创建Scala项目 二.搭建Scala的IntelliJ IDEA开发环境 (一)启动IDEA ...

  6. 2022年Spark基础学习笔记

    一.Spark学习笔记 在OpenStack私有云上创建与配置虚拟机 Spark基础学习笔记01:初步了解Spark Spark基础学习笔记02:Spark运行时架构 Spark基础学习笔记03:搭建 ...

  7. Spark基础学习笔记22:Spark RDD案例分析

    文章目录 零.本讲学习目标 一.案例分析:Spark RDD实现单词计数 (一)案例概述 (二)实现步骤 1.新建Maven管理的Spark项目 2.添加Scala和Spark依赖 3.创建WordC ...

  8. Spark基础学习笔记23:DataFrame与Dataset

    文章目录 零.本讲学习目标 一.Spark SQL (一)Spark SQL概述 (二)Spark SQL主要特点 1.将SQL查询与Spark应用程序无缝组合 2.Spark SQL以相同方式连接多 ...

  9. Spark基础学习笔记(1)

    一.案例分析:Spark RDD实现单词计数 (一)案例概述 单词计数是学习分布式计算的入门程序,有很多种实现方式,例如MapReduce:使用Spark提供的RDD算子可以更加轻松地实现单词计数. ...

最新文章

  1. First non repeating word in a file? File size can be 100GB.
  2. T-Sql(七)用户权限操作(grant)
  3. 中国中医药产业未来投资规划与前景风险预测报告2022-2027年版
  4. Android 手机卫士--9patch图
  5. Spring(三)——HelloSpring、IOC创建对象的方式、属性注入、自动装配、使用注解开发
  6. 光流 | 基于Matlab实现Lucas-Kanade方法:方法2(附源代码)
  7. XCTF-高手进阶区:i-got-id-200
  8. 机器学习-有监督-SVM
  9. 给网站文字添加图标-Font Awesome
  10. JS左侧竖向滑动菜单
  11. Chap-3 Section 3.3 ELF文件头
  12. linux创建根目录代码,Linux文件系统之目录的建立
  13. 对于最小割的进一步理解
  14. 2011最新XP系统盘下载大全 都是2011最新的系统
  15. 不管是蓝牙耳机还是有线耳机长时间佩戴都是有危害的,这些问题不容小觑!
  16. Base64编码原理分析
  17. 计算机网络基础(TCP/IP)
  18. 大型企业中复杂数据库存储过程的修改方法:7步法教你高效完成任务
  19. 基于在线学习行为的评价模型的设计与实现
  20. 前端常用js方法工具封装

热门文章

  1. 有图有真相:带你实现当下流行的权限验证
  2. 关于软件定义IT基础设施的未来,深信服是这么思考的
  3. 【带着canvas去流浪(13)】用Three.js制作简易的MARVEL片头动画(下)#华为云·寻找黑马程序员#
  4. lora终端连接云服务器_物联网通讯技术三足鼎立形成:NB-IoT、eMTC、LoRa各有千秋...
  5. memcache入门
  6. 韩顺平php视频笔记70 面向对象编程的三大特征1 抽象 封装
  7. vim插件管理器minpac安装及使用
  8. python3 try except or_Python基础10:try except异常处理详解
  9. android引导用户打开位置权限_想升级 App?先要个权限吧!!!
  10. python运行input不出结果_Python中print和input调用了Python中底层的什么方法