文章目录

  • 前言
  • 一、map
  • 二、mapPartition
  • 三、mapPartitionsWithIndex
  • 四、flatMap
  • 五、glom
  • 六、mapValues
  • 七、filter
  • 八、keyBy
  • 九、groupBy
  • 十、reduceByKey
  • 十一、foldByKey
  • 提示

前言

以下将会介绍常用RDD算子的使用介绍


一、map

    /*** 转换算子: map* 逻辑: 对RDD中的每一个元素进行映射,映射为指定的值* 对每一个分区中的每一个数据进行映射*/@Test def mapTest(): Unit = {// 1. 实例化一个集合val array: Array[String] = Array("dog", "cat", "elephent", "lion", "tiger", "monkey", "panda")// 2. 通过集合,创建RDDval rdd1: RDD[String] = sc.parallelize(array)// 3. 映射元素(需求:元素都替换成自己的长度)val rdd2: RDD[Int] = rdd1.map(_.length)// 4. 输出rdd2中描述的数据rdd2.foreach(println)// 5. 映射元素(需求:元素都替换成(元素, 长度))val rdd3: RDD[(String, Int)] = rdd1.map(word => (word, word.length))rdd3.foreach(println)}

二、mapPartition

/*** 转换算子: mapPartitions* 逻辑: 也是一个映射,类似于map,但是与map是不一样* 和map的区别:*     - map: 作用在每一个分区的每一个元素上的*     - mapPartitions: 作用在一个分区上的** mapPartitions: 会将一个分区的数据,作为一个整体,来进行整体的映射*/@Test def mapPartitionsTest(): Unit = {// 1. 实例化一个集合val array: Array[String] = Array("宋江", "卢俊义", "吴用", "公孙胜", "关胜")// 2. 创建一个RDDval rdd: RDD[String] = sc.parallelize(array, 2)//2->分区,将这个集合切分成两个分区// 3. 元素映射(参数是一个迭代器 iterator,返回值要求也是一个迭代器 iterator)val rdd1: RDD[Int] = rdd.mapPartitions(iterator => iterator.map(_.length))rdd1.foreach(println)}

三、mapPartitionsWithIndex

/*** 转换算子: mapPartitionsWithIndex* 逻辑: 等同于mapPartitions,也是对一个分区的映射*      对比mapPartitions,多出一个分区的下标*/@Test def mapPartitionsWithIndexTest(): Unit = {// 1. 通过集合,创建RDDval rdd: RDD[String] = sc.parallelize(Array("金莲小姐姐", "婆惜小姐姐", "三娘小姐姐", "师师小姐姐"), 3)// 2. 需求: 将原来的元素映射成 分区号+名字+名字的长度val rdd1: RDD[(Int, String, Int)] = rdd.mapPartitionsWithIndex((index, iterator) => iterator.map(str => (index, str, str.length)))rdd1.foreach(println)}

四、flatMap

/*** 转换算子: flatMap* 逻辑: 扁平化映射,针对RDD中存储的元素是集合的情况,将所有的集合中的元素提取到一个RDD中*/@Test def flatMapTest(): Unit = {// 1. 通过集合,创建RDDval rdd1: RDD[Array[String]] = sc.parallelize(Array(Array("赵云", "关羽", "张飞", "黄忠", "马超"), Array("张辽", "许褚", "典韦"), Array("陆逊", "张昭", "周瑜")))val rdd2: RDD[String] = rdd1.flatMap(_.iterator)  // _.toListrdd2.foreach(println)// 2. 通过集合,创建RDDval rdd3: RDD[String] = sc.parallelize(Array("赵云  关羽  张飞", "诸葛亮  司马懿  周瑜", "黄盖  陆逊"))val rdd4: RDD[String] = rdd3.flatMap(_.split(" +"))rdd4.foreach(println)// 3.val rdd5: RDD[Array[String]] = sc.parallelize(Array(Array("赵云  关羽   张飞", "诸葛亮 司马懿 周瑜"), Array("孙策  大乔", "周瑜  小乔")))// val rdd6: RDD[String] = rdd5.flatMap(array => array.flatMap(_.split(" +")))val rdd6: RDD[String] = rdd5.flatMap(_.flatMap(_.split(" +")))rdd6.foreach(println)// Array("a b c ", "d e f")  =>  Array ("a", "b", "c", "d", "e", "f")}

五、glom

/*** 转换算子: glom* 逻辑: 将一个分区中的所有的元素,聚合成一个数组,放回到之前的分区中*      不会修改分区的数量*/@Test def glomTest(): Unit = {// 1. 通过集合,创建RDD(将1到20的数字,分到了4个分区中)val rdd1: RDD[Int] = sc.parallelize(1 to 20, 4)// 2. 将每一个分区的元素做成一个数组val rdd2: RDD[Array[Int]] = rdd1.glom()rdd2.foreach(array => println(array.mkString(", ")))// 输出RDD中的分区数量println(rdd2.getNumPartitions)}

六、mapValues

/*** 转换算子: mapValues* 注意: 只针对PariedRDD,也就是说RDD描述的数据是若干个键值对*      (其实,这里可以操作的数据,可以可以是RDD(Tuple2))* 逻辑: 对键值对的值做映射,不对键做任何处理*/@Test def mapValues(): Unit = {// 1. 通过集合创建RDDval rdd1: RDD[String] = sc.parallelize(Array("贾宝玉", "林黛玉", "薛宝钗", "探春", "迎春", "惜春"))// 2. 对元素做映射,以名字的长度作为键,以名字作为值val rdd2: RDD[(Int, String)] = rdd1.map(n => (n.length, n))// 3. 让每一个人的名字后面添加一个叹号val rdd3: RDD[(Int, String)] = rdd2.mapValues(_ + "!")rdd3.foreach(println)//val array1: Array[String] = Array("贾宝玉", "林黛玉", "薛宝钗", "探春")val array2: Array[Int] = Array(18, 19, 17, 16)val pairs: Array[(String, Int)] = array1.zip(array2)val rdd4: RDD[(String, Int)] = sc.parallelize(pairs)// 需求: 让每一个人的年龄增20岁!val rdd5: RDD[(String, Int)] = rdd4.mapValues(_ + 20)rdd5.foreach(println)}

七、filter

/*** 转换算子: filter* 逻辑: 保留RDD中满足条件的数据*/@Test def filterTest(): Unit = {// 1. 通过集合,创建RDDval rdd1: RDD[(String, Int)] = sc.parallelize(List(("贾宝玉", 18), ("林黛玉", 17), ("薛宝钗", 18), ("探春", 16)))// 2. 保留所有的成年的数据val rdd2: RDD[(String, Int)] = rdd1.filter(_._2 >= 18)rdd2.foreach(println)}

八、keyBy

/*** 转换算子: keyBy* 逻辑: 将RDD中的数据,以指定的逻辑得到的结果作为键,数据本身作为值*/@Test def keyByTest(): Unit = {// 1. 通过集合,创建RDDval rdd1: RDD[String] = sc.parallelize(Array("张角", "张宝", "董卓", "貂蝉", "诸葛亮", "司马懿", "关云长", "张翼德", "诸葛武侯"))// 2. 给这些元素找键val rdd2: RDD[(Int, String)] = rdd1.keyBy(_.length)rdd2.foreach(println)}

九、groupBy

/*** 转换算子: groupBy* 逻辑: 对RDD中的元素,按照指定的逻辑得到的结果,进行分组*/@Test def groupByTest(): Unit = {// 1. 通过集合,创建RDDval rdd1: RDD[String] = sc.parallelize(Array("张角", "张宝", "董卓", "貂蝉", "诸葛亮", "司马懿", "关云长", "张翼德", "诸葛武侯"))// 2. 将所有的相同的长度的名字视为一个分组val rdd2: RDD[(Int, Iterable[String])] = rdd1.groupBy(_.length)// val rdd3: RDD[(Int, List[String])] = rdd2.mapValues(_.toList)//rdd2.foreach(x=>println(x))// 3. 遍历输出结果rdd2.foreach(tuple => println(s"${tuple._1} => ${tuple._2}"))}

十、reduceByKey

/*** 转换算子: reduceByKey* 逻辑: 针对PariedRDD,按照键进行分组,将所有的值进行运算*/@Test def reduceByKeyTest(): Unit = {// 1. 通过集合,创建一个RDDval rdd1: RDD[String] = sc.parallelize(Array("Tom  Jerry  Tom  Jerry", "Tom  Jerry  Tom  Jerry", "Hank  Hank  Tom  Jerry"))// 2. 计算wordcountval rdd2: RDD[String] = rdd1.flatMap(_.split(" +"))// 3. 以单词为键,1作为值,构成一个PairedRDDval rdd3: RDD[(String, Int)] = rdd2.map(n => (n, 1))// 4. 将相同的键视为一个分组,将值进行累加val rdd4: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)rdd4.foreach(println)}

十一、foldByKey

    /*** 转换算子: foldByKey* 逻辑: 和reduceByKey差不多,在每次进行聚合运算的时候,都会添加上一个默认的值*/@Test def foldByKeyTest(): Unit = {// 1. 通过集合,准备RDDval rdd1: RDD[String] = sc.parallelize(Array("三国演义", "水浒传", "红楼梦", "西游记", "诛仙", "神墓", "斗罗大陆", "斗破苍穹", "武动乾坤", "大主宰", "遮天"))// 2. 添加键// rdd1.map(n => (n.length, n))val rdd2: RDD[(Int, String)] = rdd1.keyBy(_.length)rdd2.foreach(println)val rdd3: RDD[(Int, String)] = rdd2.reduceByKey(_ + "," + _)rdd3.foreach(println)// 3.val rdd4: RDD[(Int, String)] = rdd2.foldByKey("书名: ")(_ + ", " + _)rdd4.foreach(println)}

提示

建议自己跑一下程序,感受一下细节之处。

RDD编程-RDD算子的使用相关推荐

  1. spark应用程序转换_Spark—RDD编程常用转换算子代码实例

    Spark-RDD编程常用转换算子代码实例 Spark rdd 常用 Transformation 实例: 1.def map[U: ClassTag](f: T => U): RDD[U]  ...

  2. 【大数据开发】SparkCore——Spark作业执行流程、RDD编程的两种方式、简单算子

    文章目录 一.Spark作业执行流程(重点) 二.RDD编程 2.1创建RDD的⼆种⽅式: 2.2Transformation算⼦ 2.3Action算子 三.简单算子(必须掌握) 3.1 map.m ...

  3. Spark:RDD编程总结(概述、算子、分区、共享变量)

    目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...

  4. spark算子大全glom_2小时入门Spark之RDD编程

    公众号后台回复关键字:pyspark,获取本项目github地址. 本节将介绍RDD数据结构的常用函数.包括如下内容: 创建RDD 常用Action操作 常用Transformation操作 常用Pa ...

  5. Spark综合大作业:RDD编程初级实践

    Spark综合大作业:RDD编程初级实践 实验配置:操作系统:Ubuntu16.04 | 环境:Spark版本:2.4.0 | 软件:Python版本:3.4.3. 文章目录 一.实验目的 二.实验平 ...

  6. spark之RDD的转换算子与行为算子的具体使用

    文章目录 1.Transform算子 1.1 map 1.2 flatmap 1.3 groupBy和groupBykey 1.4 filter 1.5 Mappartitions 1.6 mapVa ...

  7. RDD编程模型笔记(一)

    1.RDD编程模型 在 Spark 中,RDD 被表示为对象,通过对象上的方法调用来对 RDD 进行转换. 在Spark中,只有遇到action,才会执行 RDD 的计算(即延迟计算),这样在运行时可 ...

  8. Spark入门系列(二)| 1小时学会RDD编程

    作者 | 梁云1991 转载自Python与算法之美(ID:Python_Ai_Road) 导读:本文为 Spark入门系列的第二篇文章,主要介绍 RDD 编程,实操性较强,感兴趣的同学可以动手实现一 ...

  9. 《Spark快速大数据分析》—— 第三章 RDD编程

    本文转自博客园xingoo的博客,原文链接:<Spark快速大数据分析>-- 第三章 RDD编程,如需转载请自行联系原博主.

最新文章

  1. 人大附中高中生学Python获数据挖掘竞赛一等奖,将去旷视科技实习
  2. 精选NLP、CV领域论文TOP10(附链接)
  3. finished with exit code -1073740791 (0xC0000409)
  4. ai作文批改_英语写作怎么提升?讯飞智能学习机AI作文批改带你实战练习
  5. google protobuf安装与使用
  6. 【渝粤题库】陕西师范大学209011商业银行信贷管理Ⅱ 作业(专升本)
  7. linux java uml_简单实用UML关系图解
  8. linux qt手册,明远智睿I.MX6 Linux-4.1.15 QT5 程序编译手册
  9. 爬虫3 requests基础之 乱码编码问题
  10. java从入门到精通_Java---开发从入门到精通,分享视频学习教程
  11. Pandas 文本数据方法 wrap( )
  12. 用循环语句编程打印如下图案
  13. Nginx 限制并发连接数。
  14. linux如何配置自定义命令,[shell脚本]Linux自定义命令并启用应用
  15. html 文字 向上滚动代码,文字向上滚动代码
  16. 如何使用mp3转换器将wav转换成mp3格式
  17. 【自然语言处理】词性标注
  18. 云计算概念及发展历程
  19. 【Unite Tokyo 2018】虚拟YouTuber电脑少女Siro「2018年资源推荐
  20. device-mapper: multipath: Failing path recovery

热门文章

  1. Spring Cloud H (五)初战服务降级和熔断Hystrix(豪猪哥)
  2. 手机之家证书申请_资质证书是能力的证明,EPC工程总承包的设计监理需要资质吗?...
  3. UWB 技术原理与应用详解
  4. 母婴产品如何做做品牌营销?母婴品牌如何在知乎上做营销?
  5. Postman发post请求读取不到文件的情况
  6. PowerMill宏命令的二次开发利用,优化加工流程,提高生产效率。
  7. ZZULIOJ-1015,计算时间间隔(Java)
  8. JAVA基础 多线程技术学习笔记(V1.0)
  9. 使用whistle进行手机抓包并调试
  10. SAP-修改系统表数据的方法-该表实现回退物料账期