1 算子简介

算子是一个函数空间到函数空间上的[映射]O:X→X。广义上的算子可以推广到任何空间,如[内积空间]等。

RDD上的方法称为算子

在 RDD 上支持 2 种操作:

  • transformation转换

从一个已知的 RDD 中创建出来一个新的 RDD 例如: map就是一个transformation.

  • *action *行动

在数据集上计算结束之后, 给驱动程序返回一个值. 例如: reduce就是一个action.

在 Spark 中几乎所有的transformation操作都是懒执行的(lazy), 也就是说transformation操作并不会立即计算他们的结果, 而是记住了这个操作.

只有当通过一个action来获取结果返回给驱动程序的时候这些转换操作才开始计算.这种设计可以使 Spark 运行起来更加的高效.默认情况下, 你每次在一个 RDD 上运行一个action的时候, 前面的每个transformed RDD 都会被重新计算.

但是我们可以通过persist (or cache)方法来持久化一个 RDD 在内存中, 也可以持久化到磁盘上, 来加快访问速度. 后面有专门的章节学习这种持久化技术.根据 RDD 中数据类型的不同,

整体分为 2 种 RDD:

  1. Value类型

  2. Key-Value类型(其实就是存一个二维的元组)

1.1 map算子

以分区为单位 ,内部处理每个元素

 def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))}
def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getScval rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 2)val f = (e: Int) => {e * 2}//处理rdd负责的每个元素 返回一个新的RDDrdd.map(f)  //传入一个函数rdd.map((e: Int) => {e * 2})rdd.map(e => e * 2)//调用map算子 返回一个新的RDDval rdd2: RDD[Int] = rdd.map(_ * 2)rdd2.collect().foreach(println)sc.stop()}

演示处理并行处理和数据处理细节

 def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getSc //设置数据只有一个分区 , 处理的所有的数据在同一个分区中 // 如果是多个分区 ,资源充足的情况下分区额数据就是处理数据的并行度val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 1)//我们这里多次处理 , 同一个分区中的数据是顺序执行的val rdd2 = rdd.map(e=>{println("处理元素----1:"+e)e}).map(e =>{println("处理元素----2:"+e)e})/*处理元素----1:1处理元素----2:1处理元素----1:2处理元素----2:2处理元素----1:3处理元素----2:3处理元素----1:4处理元素----2:4处理元素----1:5处理元素----2:5*/// 触发转换算子执行rdd2.collect()sc.stop()}

案例需求 : 获取mysql中的数据

  <!--添加MYSQL的驱动--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.48</version></dependency>
package com._51doit.spark.day02.transimport java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}import com._51doit.spark.util.SparkUtils
import com.mysql.jdbc.Driver
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD/*** Author:   Hang.Z* Date:     21/06/02 * Description:* 1 注册驱动* 2 获取连接* 3 获取执行sql的statement对象* 4 编写SQL  执行* 5 返回结果* 6 释放资源*/
object _03Map {def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtils.getSparkContext()val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2)// 注册驱动classOf[Driver]// 根据RDD处理的数据  1 2 3 4   熊mysql中获取用户名/*** 每个元素在每个分区中都会执行一遍  获取连接  获取 ps  执行sql 释放资源* 效率低* 增加mysql的请求压力* 出现并发*/val rdd2 =  rdd1.map(uid=>{// 获取连接val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/doit20" , "root","root")// 获取执行SQL的对象val ps: PreparedStatement = conn.prepareStatement("select * from tb_user where uid = ?")// 预编译sqlps.setInt(1,uid)// 执行SQL查询val set: ResultSet = ps.executeQuery()var name:String = ""// 如果有数据if(set.next()){// 获取数据val username: String = set.getString("username")name = username}set.close()ps.close()conn.close()name})rdd2.foreach(println)sc.stop()}
}

1.2  mapPartition

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

val sc: SparkContext = SparkUtil.getSparkContext()
//创建RDD
val rdd: RDD[Int] = sc.parallelize(1 to 10, 3)
// 传递的是一个迭代器 , 三个分区只执行三次 效率优于map
val res: RDD[Int] = rdd.mapPartitions(iters=>{for(e <- iters) yield e*10
})
val arr: Array[Int] = res.collect()
arr.foreach(println)
sc.stop()

map():每次处理一条数据。 e

mapPartitions():每次处理一个分区的数据 iters

,这个分区的数据处理完后,原 RDD 中该分区的数据才能释放,可能导致 OOM。 Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions.算子是以分区为单位进行批处理操作。 功能的角度 Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据

Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用map操作。

mapPartitions():每次处理一个分区的数据如果每条数据需要去请求外部的资源 , 可以使用此函数减少外部资源的加载次数

开发指导:当内存空间较大的时候建议使用mapPartitions(),以提高处理效率。

获取每个分区的最大值/或者是最小值

  val rdd2: RDD[Int] = rdd.mapPartitions(iters => {println("hello.....")List(iters.max).toIterator})

写入数据库的正确姿势

 /*** 以分区为单位处理数据  (iters迭代器)* 是有返回值*/rdd1.mapPartitions(iter=>{// 本地集合的mapiter.map(_*10)})/*** 以分区为单位处理数据  (iters迭代器)* 但是没有返回值*    一般用于数据的输出*       将数据输出到mysql*/rdd1.foreachPartition(iter=>{val ints: Iterator[Int] = iter.map(_ * 10)//输出数据})
---将数据输出到MYSQL 中rdd4.foreachPartition(iters=>{// 获取连接classOf[Driver]val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/doit20", "root", "root")val ps: PreparedStatement = conn.prepareStatement("insert into tb_user  values(?,?,?,?)")for (user <- iters) yield {var b = false// 预编译try {ps.setString(1, user.id)ps.setString(2, user.name)ps.setInt(3,user.age)ps.setString(4, user.gender)// 执行插入b =  ps.execute()} catch {case e:Exception => b =  true}}})

1.3 mapPartitionsWithIndex

与mapPartitions类似,但需要提供一个表示分区索引值的整型值作为参数,因此function必须是(int, Iterator<T>)=>Iterator<U>类型的。

def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getSparkContext()val rdd: RDD[Int] = sc.parallelize(1 to 10, 3)// (p,iters)  参数一 分区号  参数二分区中的数据val res = rdd.mapPartitionsWithIndex((p, iters) => {for (elem <- iters) yield (p, elem * 10)})res.collect().foreach(println)sc.stop()
}

1.4 flatMap

map是对RDD中元素逐一进行函数操作映射为另外一个RDD,而flatMap操作是将函数应用于RDD之中的每一个元素,将返回的迭代器的所有内容构成新的RDD。

flatMap与map区别在于map为“映射”,而flatMap“先映射,后扁平化”,map对每一次(func)都产生一个元素,返回一个对象,而flatMap多一步就是将所有对象合并为一个对象。

作用: 类似于map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以func应该返回一个序列,而不是单一元素 T => TraversableOnce[U]

private def testFlatMap = {val sc: SparkContext = SparkUtil.getSparkContext()val rdd: RDD[String] = sc.textFile("d://word.txt")// 将每行数据切割后返回一个个数组 并没有将数据扁平化//  rdd.map(e=>e.split("\\s+")).collect().foreach(println)// 将数据扁平化成一个个单词rdd.flatMap(e => e.split("\\s+")).collect().foreach(println)sc.stop()
}

1.5 glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getScval rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)//将每个分区的数据收集到一个数组集合中  , 多个分区就会有多个数组集合val rdd2: RDD[Array[Int]] = rdd.glom()// 打印数据rdd2.collect().map(arr=>arr.toList).foreach(println)// 求每个分区的最大值的和println(rdd2.map(_.max).collect().toList.sum)sc.stop()}

1.6 filter

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getSparkContext()val rdd: RDD[Int] = sc.parallelize(1 to 10, 3)val rdd2: RDD[Int] = rdd.filter(e=>e>2 && e%2==0)rdd2.collect().foreach(println)  // 4  6  8 10sc.stop()
}

1.7 groupBy

将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中

一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

func返回值作为 key, 对应的值放入一个迭代器中. 返回的 RDD: RDD[(K, Iterable[T])

每组内元素的顺序不能保证, 并且甚至每次调用得到的顺序也有可能不同.

[K,V]的RDD之间的操作 ,可以根据K或者V进行分组
def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getSparkContext()val arr1 = Array(("a", 11), ("a", 11), ("c", 22), ("d", 33))val rdd1: RDD[(String, Int)] = sc.parallelize(arr1)val rdd11: RDD[(String, Iterable[(String, Int)])] = rdd1.groupBy(_._1)// val rdd22: RDD[(Int, Iterable[(String, Int)])] = rdd1.groupBy(_._2)rdd11.map(e => {val k: String = e._1val arr: Iterable[Int] = for (x <- e._2) yield x._2(k, arr)}).collect().foreach(println)sc.stop()
}[V]类型的RDD只能根据某种条件表达式来分组  两组(一组满足条件, 一组不满足条件)
def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getSparkContext()val arr1 = Array(1,2,3,4,5,5,5)val rdd: RDD[Int] = sc.parallelize(arr1)// 条件表达式分两组val res1: RDD[(Boolean, Iterable[Int])] = rdd.groupBy(_>2)val res3: RDD[(Boolean, Iterable[Int])] = rdd.groupBy(_%2==0)sc.stop()
}

课堂代码

def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtils.getSparkContext()val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)val rdd2: RDD[(Int, Iterable[Int])] = rdd1.groupBy(_ % 2)// 1 分区个数没有变化println(rdd2.partitions.size)/*** (1,CompactBuffer(1, 3))_ 0* (0,CompactBuffer(2, 4))_ 1*/val rdd3: RDD[List[Int]] = rdd2.map(_._2.toList)// 查看每个分区中的数据val rdd4: RDD[(Int, List[Int])] = rdd3.mapPartitionsWithIndex((p, ls) => {for (elem <- ls) yield {(p, elem)}})rdd4.foreach(println)sc.stop()

案例 统计每个年龄阶段的平均薪资

数据

数据:
1,zss,23,10000
2,lss,24,30000
3,wbb,34,10000
4,DL,39,8000
5,XG,35,8000

代码

/*** Author:   Hang.Z* Date:     21/06/05 * Description:* 加载数据*    统计每个年龄阶段的平均薪资*/
object _01Demo {def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtils.getSparkContext()// 读取数据 val rdd1: RDD[String] = sc.textFile("data/user/")// 处理每行val rdd2: RDD[(String, String, Int, Int)] = rdd1.map(line => {val arr: Array[String] = line.split(",")// 将每行的用户信息分租在元组中(arr(0), arr(1), arr(2).toInt, arr(3).toInt)})// 添加 年龄阶段 信息val rdd3: RDD[(String, String, Int, Int, String)] = rdd2.map(tp=>{// 获取年龄val age: Int = tp._3// 根据判断添加阶段信息if(age>=20 && age<30){(tp._1,tp._2,tp._3,tp._4,"20~30")}else if (age>=30 && age<40){(tp._1,tp._2,tp._3,tp._4,"30~40")}else{(tp._1,tp._2,tp._3,tp._4,"other")}})// 分租val groupedRDD : RDD[(String, Iterable[(String, String, Int, Int, String)])] = rdd3.groupBy(_._5)// 计算val res: RDD[(String, Double)] = groupedRDD.map(tp=>{// 年龄阶段val stage: String = tp._1// 总工资var totalSal:Double = 0// 计数var cnt = 0// 尽量不要本地集合操作//tp._2.toList.sizefor (elem <- tp._2) {totalSal+= elem._4cnt+=1}// 求平均工资val avg = totalSal/cnt(stage , avg)})// 检查结果res.foreach(println)sc.stop()}}

spark封神之路(7)-RDD算子详解第一部分相关推荐

  1. spark封神之路(1)-spark简介

    1 Spark简介 Spark是一种快速.通用.可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apa ...

  2. spark封神之路(2)-spark运行模式

    本专栏系列视频教程 2 spark运行模式入门 1.官网地址 http://spark.apache.org/ 2.文档查看地址 https://spark.apache.org/docs/2.1.1 ...

  3. “自由主义教皇” 、​Linux 之父的封神之路

    作者 | 年素清 责编 | 王晓曼 出品 | 程序人生(ID:coder_life) "有些人生来就具有统率百万人的领袖风范:另一些人则是为写出颠覆世界的软件而生.唯一一个能同时做到这两者的 ...

  4. GitHub 的“封神”之路!

    2008年,在无数人为代码协作方式而感到异常头疼的时候,GitHub 横空出世,完美契合了市场的迫切需求.自此,它以惊人的速度成长起来并蔓延到全世界,彻底融入开发编程的日常之中.仅仅十年,现在的 Gi ...

  5. 微服务架构师封神之路09-Springboot多数据源,Hikari连接池和事务配置

    微服务架构师封神之路09-Springboot多数据源,Hikari连接池,和事务的配置 application.yml 初始化DataSource DataSourceConfig的两种写法 写法一 ...

  6. python猜数字1001untitled_pytest封神之路第零步 快速入门

    背景:本文是在系列第五篇发表后的补充篇章,第一篇介绍了tep,可能对不熟悉pytest的朋友不够友好,特意补充入门篇,帮大家快速了解如何动手写pytest.如果你是从这篇文章第一次阅读,那么请忽略以上 ...

  7. 微服务架构师封神之路02-为你的微服务应用添加日志

    微服务架构师封神之路02-为你的微服务应用添加日志 关于Kubernetes日志架构 我们的目标 helloworld project 项目结构 pom.xml Dockerfile AppMain. ...

  8. spark 算子 详解

    参考文档:Spark算子详解及案例分析(分类助记) - 云+社区 - 腾讯云 1.combineByKey .作为spark 的核心算子之一,有必要详细了解.reduceByKey 和groupByK ...

  9. Spark RDD 论文详解(五)实现

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

最新文章

  1. 如何提高UI自动化测试的质量
  2. 请别再拿“String s = new String(xyz);创建了多少个String实例”来面试了吧---转
  3. 报告解读丨企服必看!如何高效搭建规模化营销获客体系?
  4. ASP.NET基础教程-以查询字符串的方式在两个页面之间传递信息
  5. 使用字符串切割,使手机号中间四位隐藏
  6. ping服务器请求超时_高防服务器能防哪些恶意网络攻击?
  7. ListView若干点
  8. Notes配置初始化和重新设置(不卸载)
  9. DeepFace人脸检测(python实现)
  10. 配对t检验的应用条件是什么_配对t检验在实际工作中的应用
  11. 模拟题【枚举计数】咒语
  12. Node.js项目总结及常用技巧
  13. 跳棋最少移动次数 java,跳棋
  14. [转载] 百科全说——陈焕然:揭秘高科技美容(10-02-22)
  15. 关于ubuntu系统不显示wifi图标或WiFi无法连接问题解答
  16. 使用HTML5和Less框架3的自适应Web设计
  17. 二十.组织级项目管理与大项目管理
  18. 论《赢在中国》的五大收获与五大遗憾
  19. 华强北发挥作用的时候到了!iPhone12更换零件需要官方授权
  20. CMDB——概念详解

热门文章

  1. HTTP、FTP状态码 (share)
  2. StringUtils字符串工具类
  3. java程序员自己的图片转文字OCR识图工具
  4. 热敏电阻VS模拟温度传感器
  5. 算法语言Scheme修订6报告 R6RS简体中文翻译
  6. 【转摘】著名画家毕建勋 学画笔记
  7. 关于kernels启动报错的问题
  8. Dynamo 如何生成管道
  9. 北大培训课动态规划----神奇的口袋(百练2755)
  10. CASIA -HWDB2.0-2.2和OLHWDB2.0-2.2数据集解析