spark封神之路(7)-RDD算子详解第一部分
1 算子简介
算子是一个函数空间到函数空间上的[映射]O:X→X。广义上的算子可以推广到任何空间,如[内积空间]等。
RDD上的方法称为算子
在 RDD 上支持 2 种操作:
transformation转换
从一个已知的 RDD 中创建出来一个新的 RDD 例如: map就是一个transformation.
*action *行动
在数据集上计算结束之后, 给驱动程序返回一个值. 例如: reduce就是一个action.
在 Spark 中几乎所有的transformation操作都是懒执行的(lazy), 也就是说transformation操作并不会立即计算他们的结果, 而是记住了这个操作.
只有当通过一个action来获取结果返回给驱动程序的时候这些转换操作才开始计算.这种设计可以使 Spark 运行起来更加的高效.默认情况下, 你每次在一个 RDD 上运行一个action的时候, 前面的每个transformed RDD 都会被重新计算.
但是我们可以通过persist (or cache)方法来持久化一个 RDD 在内存中, 也可以持久化到磁盘上, 来加快访问速度. 后面有专门的章节学习这种持久化技术.根据 RDD 中数据类型的不同,
整体分为 2 种 RDD:
Value类型
Key-Value类型(其实就是存一个二维的元组)
1.1 map算子
以分区为单位 ,内部处理每个元素
def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))}
def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getScval rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 2)val f = (e: Int) => {e * 2}//处理rdd负责的每个元素 返回一个新的RDDrdd.map(f) //传入一个函数rdd.map((e: Int) => {e * 2})rdd.map(e => e * 2)//调用map算子 返回一个新的RDDval rdd2: RDD[Int] = rdd.map(_ * 2)rdd2.collect().foreach(println)sc.stop()}
演示处理并行处理和数据处理细节
def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getSc //设置数据只有一个分区 , 处理的所有的数据在同一个分区中 // 如果是多个分区 ,资源充足的情况下分区额数据就是处理数据的并行度val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 1)//我们这里多次处理 , 同一个分区中的数据是顺序执行的val rdd2 = rdd.map(e=>{println("处理元素----1:"+e)e}).map(e =>{println("处理元素----2:"+e)e})/*处理元素----1:1处理元素----2:1处理元素----1:2处理元素----2:2处理元素----1:3处理元素----2:3处理元素----1:4处理元素----2:4处理元素----1:5处理元素----2:5*/// 触发转换算子执行rdd2.collect()sc.stop()}
案例需求 : 获取mysql中的数据
<!--添加MYSQL的驱动--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.48</version></dependency>
package com._51doit.spark.day02.transimport java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}import com._51doit.spark.util.SparkUtils
import com.mysql.jdbc.Driver
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD/*** Author: Hang.Z* Date: 21/06/02 * Description:* 1 注册驱动* 2 获取连接* 3 获取执行sql的statement对象* 4 编写SQL 执行* 5 返回结果* 6 释放资源*/
object _03Map {def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtils.getSparkContext()val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2)// 注册驱动classOf[Driver]// 根据RDD处理的数据 1 2 3 4 熊mysql中获取用户名/*** 每个元素在每个分区中都会执行一遍 获取连接 获取 ps 执行sql 释放资源* 效率低* 增加mysql的请求压力* 出现并发*/val rdd2 = rdd1.map(uid=>{// 获取连接val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/doit20" , "root","root")// 获取执行SQL的对象val ps: PreparedStatement = conn.prepareStatement("select * from tb_user where uid = ?")// 预编译sqlps.setInt(1,uid)// 执行SQL查询val set: ResultSet = ps.executeQuery()var name:String = ""// 如果有数据if(set.next()){// 获取数据val username: String = set.getString("username")name = username}set.close()ps.close()conn.close()name})rdd2.foreach(println)sc.stop()}
}
1.2 mapPartition
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
val sc: SparkContext = SparkUtil.getSparkContext()
//创建RDD
val rdd: RDD[Int] = sc.parallelize(1 to 10, 3)
// 传递的是一个迭代器 , 三个分区只执行三次 效率优于map
val res: RDD[Int] = rdd.mapPartitions(iters=>{for(e <- iters) yield e*10
})
val arr: Array[Int] = res.collect()
arr.foreach(println)
sc.stop()
map():每次处理一条数据。 e
mapPartitions():每次处理一个分区的数据 iters
,这个分区的数据处理完后,原 RDD 中该分区的数据才能释放,可能导致 OOM。 Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions.算子是以分区为单位进行批处理操作。 功能的角度 Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用map操作。
mapPartitions():每次处理一个分区的数据如果每条数据需要去请求外部的资源 , 可以使用此函数减少外部资源的加载次数
开发指导:当内存空间较大的时候建议使用mapPartitions(),以提高处理效率。
获取每个分区的最大值/或者是最小值
val rdd2: RDD[Int] = rdd.mapPartitions(iters => {println("hello.....")List(iters.max).toIterator})
写入数据库的正确姿势
/*** 以分区为单位处理数据 (iters迭代器)* 是有返回值*/rdd1.mapPartitions(iter=>{// 本地集合的mapiter.map(_*10)})/*** 以分区为单位处理数据 (iters迭代器)* 但是没有返回值* 一般用于数据的输出* 将数据输出到mysql*/rdd1.foreachPartition(iter=>{val ints: Iterator[Int] = iter.map(_ * 10)//输出数据})
---将数据输出到MYSQL 中rdd4.foreachPartition(iters=>{// 获取连接classOf[Driver]val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/doit20", "root", "root")val ps: PreparedStatement = conn.prepareStatement("insert into tb_user values(?,?,?,?)")for (user <- iters) yield {var b = false// 预编译try {ps.setString(1, user.id)ps.setString(2, user.name)ps.setInt(3,user.age)ps.setString(4, user.gender)// 执行插入b = ps.execute()} catch {case e:Exception => b = true}}})
1.3 mapPartitionsWithIndex
与mapPartitions类似,但需要提供一个表示分区索引值的整型值作为参数,因此function必须是(int, Iterator<T>)=>Iterator<U>类型的。
def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getSparkContext()val rdd: RDD[Int] = sc.parallelize(1 to 10, 3)// (p,iters) 参数一 分区号 参数二分区中的数据val res = rdd.mapPartitionsWithIndex((p, iters) => {for (elem <- iters) yield (p, elem * 10)})res.collect().foreach(println)sc.stop()
}
1.4 flatMap
map是对RDD中元素逐一进行函数操作映射为另外一个RDD,而flatMap操作是将函数应用于RDD之中的每一个元素,将返回的迭代器的所有内容构成新的RDD。
flatMap与map区别在于map为“映射”,而flatMap“先映射,后扁平化”,map对每一次(func)都产生一个元素,返回一个对象,而flatMap多一步就是将所有对象合并为一个对象。
作用: 类似于map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以func应该返回一个序列,而不是单一元素 T => TraversableOnce[U])
private def testFlatMap = {val sc: SparkContext = SparkUtil.getSparkContext()val rdd: RDD[String] = sc.textFile("d://word.txt")// 将每行数据切割后返回一个个数组 并没有将数据扁平化// rdd.map(e=>e.split("\\s+")).collect().foreach(println)// 将数据扁平化成一个个单词rdd.flatMap(e => e.split("\\s+")).collect().foreach(println)sc.stop()
}
1.5 glom
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getScval rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)//将每个分区的数据收集到一个数组集合中 , 多个分区就会有多个数组集合val rdd2: RDD[Array[Int]] = rdd.glom()// 打印数据rdd2.collect().map(arr=>arr.toList).foreach(println)// 求每个分区的最大值的和println(rdd2.map(_.max).collect().toList.sum)sc.stop()}
1.6 filter
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getSparkContext()val rdd: RDD[Int] = sc.parallelize(1 to 10, 3)val rdd2: RDD[Int] = rdd.filter(e=>e>2 && e%2==0)rdd2.collect().foreach(println) // 4 6 8 10sc.stop()
}
1.7 groupBy
将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
func返回值作为 key, 对应的值放入一个迭代器中. 返回的 RDD: RDD[(K, Iterable[T])
每组内元素的顺序不能保证, 并且甚至每次调用得到的顺序也有可能不同.
[K,V]的RDD之间的操作 ,可以根据K或者V进行分组
def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getSparkContext()val arr1 = Array(("a", 11), ("a", 11), ("c", 22), ("d", 33))val rdd1: RDD[(String, Int)] = sc.parallelize(arr1)val rdd11: RDD[(String, Iterable[(String, Int)])] = rdd1.groupBy(_._1)// val rdd22: RDD[(Int, Iterable[(String, Int)])] = rdd1.groupBy(_._2)rdd11.map(e => {val k: String = e._1val arr: Iterable[Int] = for (x <- e._2) yield x._2(k, arr)}).collect().foreach(println)sc.stop()
}[V]类型的RDD只能根据某种条件表达式来分组 两组(一组满足条件, 一组不满足条件)
def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getSparkContext()val arr1 = Array(1,2,3,4,5,5,5)val rdd: RDD[Int] = sc.parallelize(arr1)// 条件表达式分两组val res1: RDD[(Boolean, Iterable[Int])] = rdd.groupBy(_>2)val res3: RDD[(Boolean, Iterable[Int])] = rdd.groupBy(_%2==0)sc.stop()
}
课堂代码
def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtils.getSparkContext()val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)val rdd2: RDD[(Int, Iterable[Int])] = rdd1.groupBy(_ % 2)// 1 分区个数没有变化println(rdd2.partitions.size)/*** (1,CompactBuffer(1, 3))_ 0* (0,CompactBuffer(2, 4))_ 1*/val rdd3: RDD[List[Int]] = rdd2.map(_._2.toList)// 查看每个分区中的数据val rdd4: RDD[(Int, List[Int])] = rdd3.mapPartitionsWithIndex((p, ls) => {for (elem <- ls) yield {(p, elem)}})rdd4.foreach(println)sc.stop()
案例 统计每个年龄阶段的平均薪资
数据
数据:
1,zss,23,10000
2,lss,24,30000
3,wbb,34,10000
4,DL,39,8000
5,XG,35,8000
代码
/*** Author: Hang.Z* Date: 21/06/05 * Description:* 加载数据* 统计每个年龄阶段的平均薪资*/
object _01Demo {def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtils.getSparkContext()// 读取数据 val rdd1: RDD[String] = sc.textFile("data/user/")// 处理每行val rdd2: RDD[(String, String, Int, Int)] = rdd1.map(line => {val arr: Array[String] = line.split(",")// 将每行的用户信息分租在元组中(arr(0), arr(1), arr(2).toInt, arr(3).toInt)})// 添加 年龄阶段 信息val rdd3: RDD[(String, String, Int, Int, String)] = rdd2.map(tp=>{// 获取年龄val age: Int = tp._3// 根据判断添加阶段信息if(age>=20 && age<30){(tp._1,tp._2,tp._3,tp._4,"20~30")}else if (age>=30 && age<40){(tp._1,tp._2,tp._3,tp._4,"30~40")}else{(tp._1,tp._2,tp._3,tp._4,"other")}})// 分租val groupedRDD : RDD[(String, Iterable[(String, String, Int, Int, String)])] = rdd3.groupBy(_._5)// 计算val res: RDD[(String, Double)] = groupedRDD.map(tp=>{// 年龄阶段val stage: String = tp._1// 总工资var totalSal:Double = 0// 计数var cnt = 0// 尽量不要本地集合操作//tp._2.toList.sizefor (elem <- tp._2) {totalSal+= elem._4cnt+=1}// 求平均工资val avg = totalSal/cnt(stage , avg)})// 检查结果res.foreach(println)sc.stop()}}
spark封神之路(7)-RDD算子详解第一部分相关推荐
- spark封神之路(1)-spark简介
1 Spark简介 Spark是一种快速.通用.可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apa ...
- spark封神之路(2)-spark运行模式
本专栏系列视频教程 2 spark运行模式入门 1.官网地址 http://spark.apache.org/ 2.文档查看地址 https://spark.apache.org/docs/2.1.1 ...
- “自由主义教皇” 、Linux 之父的封神之路
作者 | 年素清 责编 | 王晓曼 出品 | 程序人生(ID:coder_life) "有些人生来就具有统率百万人的领袖风范:另一些人则是为写出颠覆世界的软件而生.唯一一个能同时做到这两者的 ...
- GitHub 的“封神”之路!
2008年,在无数人为代码协作方式而感到异常头疼的时候,GitHub 横空出世,完美契合了市场的迫切需求.自此,它以惊人的速度成长起来并蔓延到全世界,彻底融入开发编程的日常之中.仅仅十年,现在的 Gi ...
- 微服务架构师封神之路09-Springboot多数据源,Hikari连接池和事务配置
微服务架构师封神之路09-Springboot多数据源,Hikari连接池,和事务的配置 application.yml 初始化DataSource DataSourceConfig的两种写法 写法一 ...
- python猜数字1001untitled_pytest封神之路第零步 快速入门
背景:本文是在系列第五篇发表后的补充篇章,第一篇介绍了tep,可能对不熟悉pytest的朋友不够友好,特意补充入门篇,帮大家快速了解如何动手写pytest.如果你是从这篇文章第一次阅读,那么请忽略以上 ...
- 微服务架构师封神之路02-为你的微服务应用添加日志
微服务架构师封神之路02-为你的微服务应用添加日志 关于Kubernetes日志架构 我们的目标 helloworld project 项目结构 pom.xml Dockerfile AppMain. ...
- spark 算子 详解
参考文档:Spark算子详解及案例分析(分类助记) - 云+社区 - 腾讯云 1.combineByKey .作为spark 的核心算子之一,有必要详细了解.reduceByKey 和groupByK ...
- Spark RDD 论文详解(五)实现
前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...
最新文章
- 如何提高UI自动化测试的质量
- 请别再拿“String s = new String(xyz);创建了多少个String实例”来面试了吧---转
- 报告解读丨企服必看!如何高效搭建规模化营销获客体系?
- ASP.NET基础教程-以查询字符串的方式在两个页面之间传递信息
- 使用字符串切割,使手机号中间四位隐藏
- ping服务器请求超时_高防服务器能防哪些恶意网络攻击?
- ListView若干点
- Notes配置初始化和重新设置(不卸载)
- DeepFace人脸检测(python实现)
- 配对t检验的应用条件是什么_配对t检验在实际工作中的应用
- 模拟题【枚举计数】咒语
- Node.js项目总结及常用技巧
- 跳棋最少移动次数 java,跳棋
- [转载] 百科全说——陈焕然:揭秘高科技美容(10-02-22)
- 关于ubuntu系统不显示wifi图标或WiFi无法连接问题解答
- 使用HTML5和Less框架3的自适应Web设计
- 二十.组织级项目管理与大项目管理
- 论《赢在中国》的五大收获与五大遗憾
- 华强北发挥作用的时候到了!iPhone12更换零件需要官方授权
- CMDB——概念详解