Spark相比于Mapreduce的一大优势就是提供了很多的方法,可以直接使用;另一个优势就是执行速度快,这要得益于DAG的调度,想要理解这个调度规则,还要理解函数之间的依赖关系。

本篇就着重描述下Spark提供的Transformations方法.

依赖关系

宽依赖和窄依赖

窄依赖(narrow dependencies)

窄依赖是指父RDD仅仅被一个子RDD所使用,子RDD的每个分区依赖于常数个父分区(O(1),与数据规模无关)。

  • 输入输出一对一的算子,且结果RDD的分区结构不变。主要是map/flatmap
  • 输入输出一对一的算子,但结果RDD的分区结构发生了变化,如union/coalesce
  • 从输入中选择部分元素的算子,如filter、distinct、substract、sample
宽依赖(wide dependencies)

宽依赖是指父RDD被多个子分区使用,子RDD的每个分区依赖于所有的父RDD分区(O(n),与数据规模有关)

  • 对单个RDD基于key进行重组和reduce,如groupByKey,reduceByKey
  • 对两个RDD基于key进行join和重组,如join(父RDD不是hash-partitioned )
  • 需要进行分区,如partitionBy

Transformations转换方法实例

map(func)

map用于遍历rdd中的每个元素,可以针对每个元素做操作处理:

scala> var data = sc.parallelize(1 to 9,3)
//内容为 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> data.map(x=>x*2).collect()
//输出内容 Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

filter(func)

filter用于过滤元素信息,仅仅返回满足过滤条件的元素

scala> var data = sc.parallelize(1 to 9,3)
//内容为 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> data.filter(x=> x%2==0).collect()
//输出内容 Array[Int] = Array(2, 4, 6, 8)

flatMap(func)

flatMap与map相比,不同的是可以输出多个结果,比如

scala> var data = sc.parallelize(1 to 4,1)
//输出内容为 Array[Int] = Array(1, 2, 3, 4)scala> data.flatMap(x=> 1 to x).collect()
//输出内容为 Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

mapPartitions(func)

mapPartitions与map类似,只不过每个元素都是一个分区的迭代器,因此内部可以针对分区为单位进行处理。

比如,针对每个分区做和

//首先创建三个分区
scala> var data = sc.parallelize(1 to 9,3)
//输出为 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)//查看分区的个数
scala> data.partitions.size
//输出为 Int = 3//使用mapPartitions
scala> var result = data.mapPartitions{ x=> {| var res = List[Int]()| var i = 0| while(x.hasNext){| i+=x.next()| }| res.::(i).iterator| }}scala> result.collect
//输出为 Array[Int] = Array(6, 15, 24)

mapPartitionsWithIndex(func)

这个方法与上面的mapPartitions相同,只不过多提供了一个Index参数。

//首先创建三个分区
scala> var data = sc.parallelize(1 to 9,3)
//输出为 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)//查看分区的个数
scala> data.partitions.size
//输出为 Int = 3scala> var result = data.mapPartitionsWithIndex{| (x,iter) => {| var result = List[String]()| var i = 0| while(iter.hasNext){| i += iter.next()| }| result.::( x + "|" +i).iterator| }}result.collect
//输出结果为 Array[String] = Array(0|6, 1|15, 2|24)

sample(withReplacement, fraction, seed)

这个方法可以用于对数据进行采样,比如从1000个数据里面随机5个数据。

  • 第一个参数withReplacement代表是否进行替换,如果选true,上面的例子中,会出现重复的数据
  • 第二个参数fraction 表示随机的比例
  • 第三个参数seed 表示随机的种子
//创建数据
var data = sc.parallelize(1 to 1000,1)//采用固定的种子seed随机
data.sample(false,0.005,0).collect
//输出为 Array[Int] = Array(53, 423, 433, 523, 956, 990)//采用随机种子
data.sample(false,0.005,scala.util.Random.nextInt(1000)).collect
//输出为 Array[Int] = Array(136, 158)

union(otherDataset)

union方法可以合并两个数据集,但是不会去重,仅仅合并而已。

//创建第一个数据集
scala> var data1 = sc.parallelize(1 to 5,1)//创建第二个数据集
scala> var data2 = sc.parallelize(3 to 7,1)//取并集
scala> data1.union(data2).collect
//输出为 Array[Int] = Array(1, 2, 3, 4, 5, 3, 4, 5, 6, 7)

intersection(otherDataset)

这个方法用于取两个数据集的交集

//创建第一个数据集
scala> var data1 = sc.parallelize(1 to 5,1)//创建第二个数据集
scala> var data2 = sc.parallelize(3 to 7,1)//取交集
scala> data1.intersection(data2).collect
//输出为 Array[Int] = Array(4, 3, 5)

distinct([numTasks]))

这个方法用于对本身的数据集进行去重处理。

//创建数据集
scala> var data = sc.parallelize(List(1,1,1,2,2,3,4),1)//执行去重
scala> data.distinct.collect
//输出为 Array[Int] = Array(4, 1, 3, 2)//如果是键值对的数据,kv都相同,才算是相同的元素
scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))//执行去重
scala> data.distinct.collect
//输出为 Array[(String, Int)] = Array((A,1), (B,1), (A,2))

groupByKey([numTasks])

这个方法属于宽依赖的方法,针对所有的kv进行分组,可以把相同的k的聚合起来。如果要想计算sum等操作,最好使用reduceByKey或者combineByKey

//创建数据集
scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))//分组输出
scala> data.groupByKey.collect
//输出为 Array[(String, Iterable[Int])] = Array((B,CompactBuffer(1)), (A,CompactBuffer(1, 1, 2)))

reduceByKey(func, [numTasks])

这个方法用于根据key作分组计算,但是它跟reduce不同,它还是属于transfomation的方法。

//创建数据集
scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))scala> data.reduceByKey((x,y) => x+y).collect
//输出为 Array[(String, Int)] = Array((B,1), (A,4))

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

aggregateByKey比较复杂,我也不是很熟练,不过试验了下,大概的意思是针对分区内部使用seqOp方法,针对最后的结果使用combOp方法。

比如,想要统计分区内的最大值,然后再全部统计加和:

scala> var data = sc.parallelize(List((1,1),(1,2),(1,3),(2,4)),2)
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[54] at parallelize at <console>:27scala> def sum(a:Int,b:Int):Int = { a+b }
sum: (a: Int, b: Int)Intscala> data.aggregateByKey(0)(sum,sum).collect
res42: Array[(Int, Int)] = Array((2,4), (1,6))scala> def max(a:Int,b:Int):Int = { math.max(a,b) }
max: (a: Int, b: Int)Intscala> data.aggregateByKey(0)(max,sum).collect
res44: Array[(Int, Int)] = Array((2,4), (1,5))

sortByKey([ascending], [numTasks])

sortByKey用于针对Key做排序,默认是按照升序排序。

//创建数据集
scala> var data = sc.parallelize(List(("A",2),("B",2),("A",1),("B",1),("C",1)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:27//对数据集按照key进行默认排序
scala> data.sortByKey().collect
res23: Array[(String, Int)] = Array((A,2), (A,1), (B,2), (B,1), (C,1))//升序排序
scala> data.sortByKey(true).collect
res24: Array[(String, Int)] = Array((A,2), (A,1), (B,2), (B,1), (C,1))//降序排序
scala> data.sortByKey(false).collect
res25: Array[(String, Int)] = Array((C,1), (B,2), (B,1), (A,2), (A,1))

join(otherDataset, [numTasks])

join方法为(K,V)和(K,W)的数据集调用,返回相同的K,所组成的数据集。相当于sql中的按照key做连接。

有点类似于 select a.value,b.value from a inner join b on a.key = b.key;

举个例子

//创建第一个数据集
scala> var data1 = sc.parallelize(List(("A",1),("B",2),("C",3)))//创建第二个数据集
scala> var data2 = sc.parallelize(List(("A",4)))//创建第三个数据集
scala> var data3 = sc.parallelize(List(("A",4),("A",5)))data1.join(data2).collect
//输出为 Array[(String, (Int, Int))] = Array((A,(1,4)))data1.join(data3).collect
//输出为 Array[(String, (Int, Int))] = Array((A,(1,4)), (A,(1,5)))

cogroup(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的数据集上调用,返回一个 (K, (Seq[V], Seq[W]))元组的数据集。

//创建第一个数据集
scala> var data1 = sc.parallelize(List(("A",1),("B",2),("C",3)))//创建第二个数据集
scala> var data2 = sc.parallelize(List(("A",4)))//创建第三个数据集
scala> var data3 = sc.parallelize(List(("A",4),("A",5)))scala> data1.cogroup(data2).collect
//输出为 Array[(String, (Iterable[Int], Iterable[Int]))] = Array((B,(CompactBuffer(2),CompactBuffer())), (A,(CompactBuffer(1),CompactBuffer(4))), (C,(CompactBuffer(3),CompactBuffer())))scala> data1.cogroup(data3).collect
//输出为 Array[(String, (Iterable[Int], Iterable[Int]))] = Array((B,(CompactBuffer(2),CompactBuffer())), (A,(CompactBuffer(1),CompactBuffer(4, 5))), (C,(CompactBuffer(3),CompactBuffer())))

cartesian(otherDataset)

这个方法用于计算两个(K,V)数据集之间的笛卡尔积

//创建第一个数据集
scala> var a = sc.parallelize(List(1,2))//创建第二个数据集
scala> var b = sc.parallelize(List("A","B"))//计算笛卡尔积
scala> a.cartesian(b).collect
//输出结果 res2: Array[(Int, String)] = Array((1,A), (1,B), (2,A), (2,B))

pipe(command, [envVars])

pipe方法用于针对每个分区的RDD执行一个shell脚本命令,可以使perl或者bash。分区的元素将会被当做输入,脚本的输出则被当做返回的RDD值。

//创建数据集
scala> var data = sc.parallelize(1 to 9,3)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:27//测试脚本
scala> data.pipe("head -n 1").collect
res26: Array[String] = Array(1, 4, 7)scala> data.pipe("tail -n 1").collect
res27: Array[String] = Array(3, 6, 9)scala> data.pipe("tail -n 2").collect
res28: Array[String] = Array(2, 3, 5, 6, 8, 9)

coalesce(numPartitions)

这个方法用于对RDD进行重新分区,第一个参数是分区的数量,第二个参数是是否进行shuffle

//创建数据集
scala> var data = sc.parallelize(1 to 9,3)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27//查看分区的大小
scala> data.partitions.size
res3: Int = 3//不使用shuffle重新分区
scala> var result = data.coalesce(2,false)
result: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[19] at coalesce at <console>:29scala> result.partitions.length
res12: Int = 2scala> result.toDebugString
res13: String =
(2) CoalescedRDD[19] at coalesce at <console>:29 []|  ParallelCollectionRDD[9] at parallelize at <console>:27 []//使用shuffle重新分区
scala> var result = data.coalesce(2,true)
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[23] at coalesce at <console>:29scala> result.partitions.length
res14: Int = 2scala> result.toDebugString
res15: String =
(2) MapPartitionsRDD[23] at coalesce at <console>:29 []|  CoalescedRDD[22] at coalesce at <console>:29 []|  ShuffledRDD[21] at coalesce at <console>:29 []+-(3) MapPartitionsRDD[20] at coalesce at <console>:29 []|  ParallelCollectionRDD[9] at parallelize at <console>:27 []

repartition(numPartitions)

这个方法作用于coalesce一样,重新对RDD进行分区,相当于shuffle版的calesce

//创建数据集
scala> var data = sc.parallelize(1 to 9,3)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27//查看分区的大小
scala> data.partitions.size
res3: Int = 3scala> var result = data.repartition(2)
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at repartition at <console>:29scala> result.partitions.length
res16: Int = 2scala> result.toDebugString
res17: String =
(2) MapPartitionsRDD[27] at repartition at <console>:29 []|  CoalescedRDD[26] at repartition at <console>:29 []|  ShuffledRDD[25] at repartition at <console>:29 []+-(3) MapPartitionsRDD[24] at repartition at <console>:29 []|  ParallelCollectionRDD[9] at parallelize at <console>:27 []scala>

repartitionAndSortWithinPartitions(partitioner)

这个方法是在分区中按照key进行排序,这种方式比先分区再sort更高效,因为相当于在shuffle阶段就进行排序。

下面的例子中,由于看不到分区里面的数据。可以通过设置分区个数为1,看到排序的效果。

scala> var data = sc.parallelize(List((1,2),(1,1),(2,3),(2,1),(1,4),(3,5)),2)
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[60] at parallelize at <console>:27scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(2)).collect
res52: Array[(Int, Int)] = Array((2,3), (2,1), (1,2), (1,1), (1,4), (3,5))scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(1)).collect
res53: Array[(Int, Int)] = Array((1,2), (1,1), (1,4), (2,3), (2,1), (3,5))scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(3)).collect
res54: Array[(Int, Int)] = Array((3,5), (1,2), (1,1), (1,4), (2,3), (2,1))

参考

spark 官方文档

转载于:https://www.cnblogs.com/xing901022/p/5944297.html

[大数据之Spark]——Transformations转换入门经典实例相关推荐

  1. 离线轻量级大数据平台Spark之读取CSV文件实例

    Spark的RDD数据集很适合处理轻量文件,一般场景下是excel文件,可以将excel文件另存为CSV(逗号分隔),Spark读取CSV文件形成RDD. 1.序列化类Record,用于保存字段 pa ...

  2. 尚硅谷大数据技术Spark教程-笔记09【SparkStreaming(概念、入门、DStream入门、案例实操、总结)】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表[课程资料下载] 视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[SparkCore ...

  3. 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy)】

    视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...

  4. 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】

    视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...

  5. 王家林亲授《DT大数据梦工厂》大数据实战视频“Scala深入浅出实战经典” 第46讲视频、音频和PPT下载!

    王家林亲授<DT大数据梦工厂>大数据实战视频"Scala深入浅出实战经典"视频.音频和PPT下载!第46讲: ClassTag .Manifest.ClassManif ...

  6. 大数据之Spark案例实操完整使用(第六章)

    大数据之Spark案例实操完整使用 一.案例一 1.准备数据 2.需求 1:Top10 热门品类 3.需求说明 方案一. 实现方案二 实现方案三 二 .需求实现 1.需求 2:Top10 热门品类中每 ...

  7. 3000门徒内部训练绝密视频(泄密版)第1课:大数据最火爆语言Scala光速入门

    大数据最火爆语言Scala光速入门 scala 可以使用java的库 scala 的工厂方法:apply 条件表达式有返回值 数组可以用to ,箭头 <- 最后一行内容的值是整个代码块的返回值 ...

  8. 视频教程-跟风舞烟学大数据可视化-Echarts从入门到上手实战-JavaScript

    跟风舞烟学大数据可视化-Echarts从入门到上手实战 网名风舞烟,中国科技大学计算机专业.微软认证讲师(MCE).微软数据分析讲师.10多年软件行业从业经验,参与过数百万的企业级ERP系统,在大数据 ...

  9. 大数据篇--Spark常见面试题总结一

    文章目录 一.Spark 概念.模块 1.相关概念: 2.基本模块: 二.Spark作业提交流程是怎么样的 三.Spark on YARN两种方式的区别以及工作流程 1.Yarn组件简介: 2.Spa ...

  10. 车联网大数据框架_大数据基础:ORM框架入门简介

    作为大数据开发技术者,需要掌握扎实的Java基础,这是不争的事实,所以对于Java开发当中需要掌握的重要框架技术,也需要有相应程度的掌握,比如说ORM框架.今天的大数据基础分享,我们就来具体讲一讲OR ...

最新文章

  1. docker学习路程之部署一个nginx
  2. 物料价格分析取未分摊和未分配价格差异
  3. stl vector 函数_vector :: crend()函数以及C ++ STL中的示例
  4. clickhouse安装教程
  5. java.util.vector中的vector的详细用法
  6. 用手机约会为何胜过电脑?
  7. nginx支持php5,配置nginx支持php
  8. 在ubuntu10.04安装java5和java6
  9. Android EventBus现实 听说你out该
  10. 服务器改文件,服务人员:如何更新服务器上的文件更改缓存?
  11. 用java做一个简易的五子棋
  12. Linux安装显卡驱动后闪屏问题
  13. 华为HG8120C光猫换天邑TEWA-600AGM(百兆换千兆)的过程记录
  14. en60204标准_《EN_60204_机械产品电气安全标准介绍》.pdf
  15. SQL server2008SP3补丁安装教程
  16. linux中MIB与MB单位的区别
  17. 2018DeeCamp面试题目
  18. 【细胞分割】基于中值滤波+分水岭法实现细胞计数matlab源码
  19. JavaScript工具函数
  20. Gradle报错:Could not find ××× ,‘dependencies.× .× ‘ for × must specify an absolute path but is ${env.

热门文章

  1. 20届校招-携程笔试题-表达式解析
  2. springboot2.x整合JavaMail以qq邮箱发送邮件
  3. 使用xadmin更新数据时,报错expected string or bytes-like object
  4. php 判断访问是否是手机或者pc
  5. iOS 力学动画生成器UIKit Dynamics 之碰撞效果解说
  6. Object强转为实体类类型失败!!!!!!
  7. 现代软件工程——第一周博客作业
  8. Jquery遍历之获取子级元素、同级元素和父级元素
  9. 循环链表简单操作 C++
  10. HDU 1166 - 敌兵布阵