spark RDD汇总(一)
collect
val listRDD = sc.parallelize[Int](Seq(1, 2, 4, 3, 3, 6), 2)listRDD.collect() // 收集rdd的所有数据
take
listRDD.take(2) // 取前两
top
listRDD.top(2) // 取最大的两个
first
listRDD.first() // 取第一个
min
listRDD.min() // 取最小的
max
class MyOrderingNew extends Ordering[Int] {override def compare(x: Int, y: Int): Int = {y - x}
}
listRDD.max()(new MyOrderingNew) // 逆序取最大其实就是取最小
takeOrdered
listRDD.takeOrdered(2) // 按照自然顺序找两个
listRDD.takeOrdered(2)(new MyOrderingNew) // 按照逆序去取两个,其实就是最大的
foreach
listRDD.foreach(x => { // 应用到每个变量上val initNumber = getInitNumber("foreach")println(x + initNumber + "==================")})
foreachPartition
// foreach和foreachPartition 这两个和map那两个差不多listRDD.foreachPartition(iterator => {// 和foreach api的功能是一样,只不过一个是将函数应用到每一条记录,这个是将函数应用到每一个partition//如果有一个比较耗时的操作,只需要每一分区执行一次这个操作就行,则用这个函数//这个耗时的操作可以是连接数据库等操作,不需要计算每一条时候去连接数据库,一个分区只需连接一次就行val initNumber = getInitNumber("foreachPartition")iterator.foreach(x => println(x + initNumber + "================="))})
reduce
listRDD.reduce((x, y) => x + y) //各元素相加
treeReduce
// 当分区数据量太多时候,有可能造成最后的计算时,会有各种问题// treeReduce 先对部分分区进行累加,再汇总listRDD.treeReduce((x, y) => x + y)
fold
//和reduce的功能类似,只不过是在计算每一个分区的时候需要加上初始值1,最后再将每一个分区计算出来的值相加再加上这个初始值listRDD.fold(0)((x, y) => x + y)
aggregate
//先初始化一个我们想要的返回的数据类型的初始值//然后在每一个分区对每一个元素应用函数一(acc, value) => (acc._1 + value, acc._2 + 1)进行聚合//最后将每一个分区生成的数据应用函数(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)进行聚合listRDD.aggregate((0, 0))((acc, value) => (acc._1 + value, acc._2 + 1),(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
treeAggregate
listRDD.treeAggregate((0, 0))((acc, value) => (acc._1 + value, acc._2 + 1),(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
broadcast
val lookupTable = Map("plane" -> "sky", "fish" -> "sea", "people" -> "earth")
// 广播出去val lookupTableB = sc.broadcast(lookupTable)
// 获取
val replaceStrOpt = lookupTableB.value.get("plane")
map
// 作用在每一个元素上
val keyValueWordsRDD = wordsRDD.map(word => (word, 1))
reduceByKey
// 对相同的key进行聚合计算
val wordCountRDD = keyValueWordsRDD.reduceByKey((x, y) => x + y)
flatMap
// 将值打平val flatMapRDD = listRDD.flatMap(x => x.to(3))flatMapRDD.collect()
filter
// 过滤
val filterRDD = listRDD.filter(x => x != 1)
filterRDD.collect()
glom
//将rdd的每一个分区的数据转成一个数组,进而将所有的分区数据转成一个二维数组val glomRDD = listRDD.glom()glomRDD.collect() //Array(Array(1, 2), Array(3, 3))
mapPartitions
// 类似于foreachPartition,作用于每一个分区上
val mapPartitionRDD = listRDD.mapPartitions (iterator => {//和map api的功能是一样,只不过map是将函数应用到每一条记录,而这个是将函数应用到每一个partition//如果有一个比较耗时的操作,只需要每一分区执行一次这个操作就行,则用这个函数//这个耗时的操作可以是连接数据库等操作,不需要计算每一条时候去连接数据库,一个分区只需连接一次就行val initNumber = getInitNumber("mapPartitions")iterator.map(x => x + initNumber)})mapPartitionRDD.collect()
mapPartitionsWithIndex
val mapPartitionWithIndexRDD = listRDD.mapPartitionsWithIndex((index, iterator) => {iterator.map(x => x + index) // index是第几个分区})mapPartitionWithIndexRDD.collect()
pipe
//表示执行一个本地脚本(可以是shell,python,java等各种能通过java的Process启动起来的脚本进程)//dataRDD的数据就是脚本的输入数据,脚本的输出数据会生成一个RDD即pipeRDDval pipeRDD = dataRDD.pipe(Seq("python", "/home/hadoop-jrq/spark-course/echo.py"),env, printPipeContext, printRDDElement, false)pipeRDD.glom().collect()
textFile
//1: 从一个稳定的存储系统中,比如hdfs文件,或者本地文件系统val hdfsFileRDD = sc.textFile("hdfs://master:8020/users/hadoop-jrq/word.txt")hdfsFileRDD.count()
range
val rangeRDD = sc.range(0, 10, 2, 4)rangeRDD.collect()
makeRDD
val makeRDD = sc.makeRDD(Seq(1, 2, 3, 3))makeRDD.collect()
parallelize
val defaultPartitionRDD = sc.parallelize[Int](Seq(1, 2, 3, 3, 4))defaultPartitionRDD.partitions
persist
//存储级别://MEMORY_ONLY: 只存在内存中//DISK_ONLY: 只存在磁盘中//MEMORY_AND_DISK: 先存在内存中,内存不够的话则存在磁盘中//OFF_HEAP: 存在堆外内存中hdfsFileRDD.persist(StorageLevel.MEMORY_ONLY)hdfsFileRDD.getStorageLevel
cache
mapRDD.cache() //表示只存在内存中
unpersist
// 清除存在内存中的数据
mapRDD.unpersist()
sample
//第一个参数为withReplacement//如果withReplacement=true的话表示有放回的抽样,采用泊松抽样算法实现//如果withReplacement=false的话表示无放回的抽样,采用伯努利抽样算法实现//第二个参数为:fraction,表示每一个元素被抽取为样本的概率,并不是表示需要抽取的数据量的因子//比如从100个数据中抽样,fraction=0.2,并不是表示需要抽取100 * 0.2 = 20个数据,//而是表示100个元素的被抽取为样本概率为0.2;样本的大小并不是固定的,而是服从二项分布//当withReplacement=true的时候fraction>=0//当withReplacement=false的时候 0 < fraction < 1//第三个参数为:reed表示生成随机数的种子,即根据这个reed为rdd的每一个分区生成一个随机种子val sampleRDD = listRDD.sample(false, 0.5, 100)sampleRDD.glom().collect()
randomSplit
/按照权重对RDD进行随机抽样切分,有几个权重就切分成几个RDD//随机抽样采用伯努利抽样算法实现val splitRDD = listRDD.randomSplit(Array(0.2, 0.8))splitRDD.sizesplitRDD(0).glom().collect()splitRDD(1).glom().collect()
takeSample
//随机抽样指定数量的样本数据listRDD.takeSample(false, 1, 100)
sampleByKey
//分层采样val fractions = Map(1 -> 0.3, 3 -> 0.6, 5 -> 0.3)val sampleByKeyRDD = pairRDD.sampleByKey(true, fractions)sampleByKeyRDD.glom().collect()
union
// 连接两个RDD
val thirdRDD = sc.parallelize(Seq(5, 5, 5), 3)val unionAllRDD = sc.union(Seq(oneRDD, otherRDD, thirdRDD))oneRDD.union(otherRDD).union(thirdRDD).collect()
subtract
// oneRDD 减掉 otherRDD 减掉相同的val subtractRDD = oneRDD.subtract(otherRDD)
cartesian
val cartesianRDD = oneRDD.cartesian(otherRDD) // 笛卡尔积cartesianRDD.collect()
zip
//要求两个RDD有相同的元素个数, 分区也得是一样的val zipRDD = oneRDD.zip(otherRDD)zipRDD.collect() // Array[(Int, Int)] = Array((1,3), (2,4), (3,5))
zipPartitions
//要求两个rdd需要有相同的分区数,但是每一个分区可以不需要有相同的元素个数val zipPartitionRDD =oneRDD.zipPartitions(otherRDD)((iterator1, iterator2)=> Iterator(iterator1.sum + iterator2.sum)) // 对每个分区进行sum操作zipPartitionRDD.collect() // Array[Int] = Array(0, 4, 6, 8)
toDebugString
mapRDD.toDebugString // 依赖链
localCheckpoint
mapRDD.localCheckpoint() // 切断依赖 写入的是内存 速度快,数据的可靠性差
checkpoint
sc.setCheckpointDir("hdfs://master:8020/users/hadoop-jrq/checkpoint") // 设置路径otherMapRDD.checkpoint() // 此步只是创建了一个目录 使用的是磁盘,数据的可靠性强,速度慢otherMapRDD.toDebugString // 查看依赖
cogroup
//res0: Array[(Int, (Iterable[Int], Iterable[Int]))]// = Array((4,(CompactBuffer(),CompactBuffer(5))), (1,(CompactBuffer(2),CompactBuffer())),// (5,(CompactBuffer(6),CompactBuffer())), (3,(CompactBuffer(6, 4),CompactBuffer(9))))pairRDD.cogroup(otherRDD).collect()
groupWith
//groupWith是cogroup的别名,效果和cogroup一摸一样pairRDD.groupWith(otherRDD).collect()
join
// Array[(Int, (Int, Int))] = Array((3,(4,9)), (3,(6,9)))pairRDD.join(otherRDD).collect()
leftOuterJoin
// Array[(Int, (Int, Option[Int]))]// = Array((1,(2,None)), (5,(6,None)), (3,(4,Some(9))), (3,(6,Some(9))))pairRDD.leftOuterJoin(otherRDD).collect()
rightOuterJoin
// Array[(Int, (Option[Int], Int))] = Array((4,(None,5)), (3,(Some(4),9)), (3,(Some(6),9)))pairRDD.rightOuterJoin(otherRDD).collect()
fullOuterJoin
// Array[(Int, (Option[Int], Option[Int]))]// = Array((4,(None,Some(5))), (1,(Some(2),None)), (5,(Some(6),None)),// (3,(Some(4),Some(9))), (3,(Some(6),Some(9))))pairRDD.fullOuterJoin(otherRDD).collect()
subtractByKey
// 减掉相同的key, 这个示例减掉了为3的key// Array[(Int, Int)] = Array((1,2), (5,6))pairRDD.subtractByKey(otherRDD).collect()
combineByKey
val pairStrRDD = sc.parallelize[(String, Int)](Seq(("coffee", 1),("coffee", 2), ("panda", 3), ("coffee", 9)), 2)// 碰到第一个key的时候def createCombiner = (value: Int) => (value, 1)// 碰到第二个key的时候怎么做def mergeValue = {// 这里的acc其实就是("coffee", 2) => (2,1) 上面的value是下面的value(acc: (Int, Int), value: Int) => (acc._1 + value, acc._2 + 1)}// 在不同分区里面进行mergedef mergeCombiners = {// 对两个分区里的数据进行合并,acc1代表一个分区// 第一个int是它的累加值,后一个是出现的次数(acc1: (Int, Int), acc2: (Int, Int)) => // (1,1) (1,2) => (2,3)(acc1._1 + acc2._1, acc1._2 + acc2._2)}//功能:对pairStrRDD这个RDD统计每一个相同key对应的所有value值的累加值以及这个key出现的次数//需要的三个参数://createCombiner: V => C, ==> Int -> (Int, Int)//mergeValue: (C, V) => C, ==> ((Int, Int), Int) -> (Int, Int)//mergeCombiners: (C, C) => C ==> ((Int, Int), (Int, Int)) -> (Int, Int)val testCombineByKeyRDD =pairStrRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)testCombineByKeyRDD.collect()
count
val numberRDD = sc.parallelize(1 to 10000 , 200)//RDD a内容 union 5次,其中有50000个元素val numbersRDD = numberRDD ++ numberRDD ++ numberRDD ++ numberRDD ++ numberRDDnumbersRDD.count()
countApprox countByValueApprox countApproxDistinct countByKeyApprox countByKey countByKeyApprox
val numberRDD = sc.parallelize(1 to 10000 , 200)//RDD a内容 union 5次,其中有50000个元素val numbersRDD = numberRDD ++ numberRDD ++ numberRDD ++ numberRDD ++ numberRDDnumbersRDD.count()//第一个参数是超时时间//第二个参数是期望达到近似估计的准确度// 如果你不断用0.9来调用countApprox,则我们期望90%的结果数据是正确的count值//如果count统计在超时时间内执行完,则不会近视估值,而是取正确的值//如果count统计在超时时间内没有执行完,则根据执行完的task的返回值和准确度进行近似估值val resultCount = numbersRDD.countApprox(200, 0.9)resultCount.initialValue.meanresultCount.initialValue.lowresultCount.initialValue.highresultCount.initialValue.confidenceresultCount.getFinalValue().meannumbersRDD.countByValue()val resultCountValue = numbersRDD.countByValueApprox(200, 0.9)resultCountValue.initialValue(1).mean//结果是9760,不传参数,默认是0.05numbersRDD.countApproxDistinct()//结果是9760numbersRDD.countApproxDistinct(0.05)//8224numbersRDD.countApproxDistinct(0.1)//10000 参数越小值越精确numbersRDD.countApproxDistinct(0.006)val pair = sc.parallelize((1 to 10000).zipWithIndex)pair.collect()val pairFive = pair ++ pair ++ pair ++ pair ++ pairpairFive.countByKey()pairFive.countByKeyApprox(10, 0.95)//用HyperLogLogPlus来实现的//也是调用combineByKey来实现的//val createCombiner = (v: V) => {// val hll = new HyperLogLogPlus(p, sp)// hll.offer(v)// hll//}//val mergeValue = (hll: HyperLogLogPlus, v: V) => {// hll.offer(v)// hll//}//val mergeCombiner = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {// h1.addAll(h2)// h1//}pairFive.countApproxDistinctByKey(0.1).collect().sizepairFive.collectAsMap()pairFive.lookup(5)
keyBy
val personSeqRDD =sc.parallelize(Seq(User("jeffy", 30), User("kkk", 20), User("jeffy", 30), User("kkk", 30)))//将RDD变成二元组类型的RDDval keyByRDD = personSeqRDD.keyBy(x => x.userId)keyByRDD.collect()
groupBy
// 按照user.userId进行分组,也可以多个分组条件val groupByRDD = personSeqRDD.groupBy(user => user.userId)groupByRDD.glom().collect()
partitioner
mapValuesRDD.partitioner //会记住父亲RDD的分区器
sortByKey
// false 升降序
pairRDD.sortByKey(false).collect()
coalesce
val hdfsFileRDD = sc.textFile("hdfs://master:8020/users/hadoop-jrq/word.txt", 1000)hdfsFileRDD.partitions.size // 1000//我们通过coalesce来降低分区数量的目的是://分区太多,每个分区的数据量太少,导致太多的task,我们想减少task的数量,所以需要降低分区数// 如果分区数太多了,就会造成很多task空跑,浪费资源及效率低,因此需要降低其分区数//第一个参数表示我们期望的分区数//第二个参数表示是否需要经过shuffle来达到我们的分区数val coalesceRDD = hdfsFileRDD.coalesce(100, false)coalesceRDD.partitions.size //100//从1000个分区一下子降到2个分区//这里会导致1000个map计算只在2个分区上执行,会导致性能问题// 后面的true是map计算后,再重新分区,进行计算hdfsFileRDD.map(_ + "test").coalesce(2, true)
wholeTextFiles
// wholeTextFiles 读整个文件夹下的文件,键是文件名,值是内容val wholeTextFiles = sc.wholeTextFiles("hdfs://master:8020/users/hadoop-jrq/text/")
binaryFiles
// 类似上面的 key是文件名 值是文件流val binaryFilesRDD = sc.binaryFiles("hdfs://master:8020/users/hadoop-jrq/text/")
spark RDD汇总(一)相关推荐
- Spark算子汇总和理解(详细)
Spark之所以比Hadoop灵活和强大,其中一个原因是Spark内置了许多有用的算子,也就是方法.通过对这些方法的组合,编程人员就可以写出自己想要的功能.说白了spark编程就是对spark算子的使 ...
- Spark RDD、DataFrame原理及操作详解
RDD是什么? RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用. RDD内部可以 ...
- spark RDD 打印元素
最近使用Scala操作Spark,想要输出RDD中相关内容去查找错误. 这样就遇到了一个问题:在单机模式下输出了相关的内容,而在集群模式中的操作却没有输出. 试过了一些方法,包含idea中调试,Pri ...
- Spark面试,Spark面试题,Spark面试汇总
Table of Contents 1.你觉得spark 可以完全替代hadoop 么? 2.Spark消费 Kafka,分布式的情况下,如何保证消息的顺序? 3.对于 Spark 中的数据倾斜问题你 ...
- Spark RDD 论文详解(三)Spark 编程接口
前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...
- Spark RDD 论文详解(二)RDDs
前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...
- Spark RDD案例(五)经纬度转换为地理位置
Spark RDD案例(五)经纬度转换为地理位置 1. 背景 Spark作为大数据分析引擎,本身可以做离线和准实时数据处理 Spark抽象出的操作对象如RDD.dataSet.dataFrame.DS ...
- Spark RDD常用算子使用总结
文章目录 概述 Transformation(转换算子) 1. map 2. flatMap 3. filter 4. mapPartitions 5. mapPartitionsWithIndex ...
- Spark RDD/Core 编程 API入门系列之动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战(二)...
1.动手实战和调试Spark文件操作 这里,我以指定executor-memory参数的方式,启动spark-shell. 启动hadoop集群 spark@SparkSingleNode:/usr/ ...
最新文章
- Linux 操作系统原理 — 零拷贝技术
- 【Java面试题】13 Anonymous Inner Class (匿名内部类) 是否可以extends(继承)其它类,是否可以implements(实现)interface(接口)?...
- SBT搭建Spark
- ImportError: No module named sklearn.metrics
- [09]CSS 边框与背景 (上)
- UVa 1588 - Kickdown(BUG)
- 原创:DELPHI7下动态生成DBCHART,可结合上篇博文,动态生成整个CHART图
- 微信人格专业测试软件,如何在微信公众号中关联《九型人格测试专业版》小程序?...
- 读书笔记之吴伯凡·认知方法论
- 涂鸦小程序——为自己的人生画上一笔
- python+django-mezzanine安装
- 几何不变矩 Hu 矩
- 第八篇《颅骨穿孔——前篇》
- 淘宝、天猫API大全,SKU信息,商品详情调用展示
- CS6的css的类别,Photoshop CS6使用“样式”面板
- 基于LSSVM和PSO进行信号预测(Matlab代码实现)
- 浏览器自动旋转图片的问题(Exif的oritetion原因)
- 堕落之后重新开始学习
- 远程桌面登录,锁定与解锁
- 安装使用日语分词工具-----kuromoji
热门文章
- dumps-loads dump-load的区别
- 数据结构查找-7-4 集合相似度 (25 分)
- 通过echarts 使用china.js时出现南沙诸岛乱码问题
- leecode-试水
- springboot+阿里云OSS分片上传、断点续传、秒传
- java创建mysql sche_爱可生详解MySQL|入门必看DBLE中间件使用指南第一章:初识DBLE...
- python中常用于输出信息的语句函数是print括号_Python语句 print(type(1/2)的输出结果是...
- RTP 上封装H264 数据包
- 绝对不亏 | VMvare保姆级安装教程
- 计算机软件漏洞防御方法,溢出(漏洞)***的实现及防御