collect

 val listRDD = sc.parallelize[Int](Seq(1, 2, 4, 3, 3, 6), 2)listRDD.collect() // 收集rdd的所有数据

take

 listRDD.take(2)  // 取前两

top

listRDD.top(2)  // 取最大的两个

first

listRDD.first()  // 取第一个

min

listRDD.min()   // 取最小的

max

class MyOrderingNew extends Ordering[Int] {override def compare(x: Int, y: Int): Int = {y - x}
}
listRDD.max()(new MyOrderingNew)  // 逆序取最大其实就是取最小

takeOrdered

listRDD.takeOrdered(2) //  按照自然顺序找两个
listRDD.takeOrdered(2)(new MyOrderingNew)  // 按照逆序去取两个,其实就是最大的

foreach

listRDD.foreach(x => {  // 应用到每个变量上val initNumber = getInitNumber("foreach")println(x + initNumber + "==================")})

foreachPartition

 // foreach和foreachPartition 这两个和map那两个差不多listRDD.foreachPartition(iterator => {// 和foreach api的功能是一样,只不过一个是将函数应用到每一条记录,这个是将函数应用到每一个partition//如果有一个比较耗时的操作,只需要每一分区执行一次这个操作就行,则用这个函数//这个耗时的操作可以是连接数据库等操作,不需要计算每一条时候去连接数据库,一个分区只需连接一次就行val initNumber = getInitNumber("foreachPartition")iterator.foreach(x => println(x + initNumber + "================="))})

reduce

listRDD.reduce((x, y) => x + y)   //各元素相加

treeReduce

// 当分区数据量太多时候,有可能造成最后的计算时,会有各种问题// treeReduce 先对部分分区进行累加,再汇总listRDD.treeReduce((x, y) => x + y)

fold

//和reduce的功能类似,只不过是在计算每一个分区的时候需要加上初始值1,最后再将每一个分区计算出来的值相加再加上这个初始值listRDD.fold(0)((x, y) => x + y)

aggregate

//先初始化一个我们想要的返回的数据类型的初始值//然后在每一个分区对每一个元素应用函数一(acc, value) => (acc._1 + value, acc._2 + 1)进行聚合//最后将每一个分区生成的数据应用函数(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)进行聚合listRDD.aggregate((0, 0))((acc, value) => (acc._1 + value, acc._2 + 1),(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

treeAggregate

listRDD.treeAggregate((0, 0))((acc, value) => (acc._1 + value, acc._2 + 1),(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

broadcast

 val lookupTable = Map("plane" -> "sky", "fish" -> "sea", "people" -> "earth")
// 广播出去val lookupTableB = sc.broadcast(lookupTable)
// 获取
val replaceStrOpt = lookupTableB.value.get("plane")

map

// 作用在每一个元素上
val keyValueWordsRDD = wordsRDD.map(word => (word, 1))

reduceByKey

// 对相同的key进行聚合计算
val wordCountRDD = keyValueWordsRDD.reduceByKey((x, y) => x + y)

flatMap

// 将值打平val flatMapRDD = listRDD.flatMap(x => x.to(3))flatMapRDD.collect()

filter

// 过滤
val filterRDD = listRDD.filter(x => x != 1)
filterRDD.collect()

glom

 //将rdd的每一个分区的数据转成一个数组,进而将所有的分区数据转成一个二维数组val glomRDD = listRDD.glom()glomRDD.collect() //Array(Array(1, 2), Array(3, 3))

mapPartitions

// 类似于foreachPartition,作用于每一个分区上
val mapPartitionRDD = listRDD.mapPartitions (iterator => {//和map api的功能是一样,只不过map是将函数应用到每一条记录,而这个是将函数应用到每一个partition//如果有一个比较耗时的操作,只需要每一分区执行一次这个操作就行,则用这个函数//这个耗时的操作可以是连接数据库等操作,不需要计算每一条时候去连接数据库,一个分区只需连接一次就行val initNumber = getInitNumber("mapPartitions")iterator.map(x => x + initNumber)})mapPartitionRDD.collect()

mapPartitionsWithIndex

val mapPartitionWithIndexRDD = listRDD.mapPartitionsWithIndex((index, iterator) => {iterator.map(x => x + index)  // index是第几个分区})mapPartitionWithIndexRDD.collect()

pipe

//表示执行一个本地脚本(可以是shell,python,java等各种能通过java的Process启动起来的脚本进程)//dataRDD的数据就是脚本的输入数据,脚本的输出数据会生成一个RDD即pipeRDDval pipeRDD = dataRDD.pipe(Seq("python", "/home/hadoop-jrq/spark-course/echo.py"),env, printPipeContext, printRDDElement, false)pipeRDD.glom().collect()

textFile

//1: 从一个稳定的存储系统中,比如hdfs文件,或者本地文件系统val hdfsFileRDD = sc.textFile("hdfs://master:8020/users/hadoop-jrq/word.txt")hdfsFileRDD.count()

range

 val rangeRDD = sc.range(0, 10, 2, 4)rangeRDD.collect()

makeRDD

 val makeRDD = sc.makeRDD(Seq(1, 2, 3, 3))makeRDD.collect()

parallelize

 val defaultPartitionRDD = sc.parallelize[Int](Seq(1, 2, 3, 3, 4))defaultPartitionRDD.partitions

persist

//存储级别://MEMORY_ONLY: 只存在内存中//DISK_ONLY: 只存在磁盘中//MEMORY_AND_DISK: 先存在内存中,内存不够的话则存在磁盘中//OFF_HEAP: 存在堆外内存中hdfsFileRDD.persist(StorageLevel.MEMORY_ONLY)hdfsFileRDD.getStorageLevel

cache

mapRDD.cache() //表示只存在内存中

unpersist

// 清除存在内存中的数据
mapRDD.unpersist()

sample

 //第一个参数为withReplacement//如果withReplacement=true的话表示有放回的抽样,采用泊松抽样算法实现//如果withReplacement=false的话表示无放回的抽样,采用伯努利抽样算法实现//第二个参数为:fraction,表示每一个元素被抽取为样本的概率,并不是表示需要抽取的数据量的因子//比如从100个数据中抽样,fraction=0.2,并不是表示需要抽取100 * 0.2 = 20个数据,//而是表示100个元素的被抽取为样本概率为0.2;样本的大小并不是固定的,而是服从二项分布//当withReplacement=true的时候fraction>=0//当withReplacement=false的时候 0 < fraction < 1//第三个参数为:reed表示生成随机数的种子,即根据这个reed为rdd的每一个分区生成一个随机种子val sampleRDD = listRDD.sample(false, 0.5, 100)sampleRDD.glom().collect()

randomSplit

/按照权重对RDD进行随机抽样切分,有几个权重就切分成几个RDD//随机抽样采用伯努利抽样算法实现val splitRDD = listRDD.randomSplit(Array(0.2, 0.8))splitRDD.sizesplitRDD(0).glom().collect()splitRDD(1).glom().collect()

takeSample

//随机抽样指定数量的样本数据listRDD.takeSample(false, 1, 100)

sampleByKey

//分层采样val fractions = Map(1 -> 0.3, 3 -> 0.6, 5 -> 0.3)val sampleByKeyRDD = pairRDD.sampleByKey(true, fractions)sampleByKeyRDD.glom().collect()

union

// 连接两个RDD
val thirdRDD = sc.parallelize(Seq(5, 5, 5), 3)val unionAllRDD = sc.union(Seq(oneRDD, otherRDD, thirdRDD))oneRDD.union(otherRDD).union(thirdRDD).collect()

subtract

// oneRDD 减掉 otherRDD 减掉相同的val subtractRDD = oneRDD.subtract(otherRDD)

cartesian

 val cartesianRDD = oneRDD.cartesian(otherRDD)  // 笛卡尔积cartesianRDD.collect()

zip

 //要求两个RDD有相同的元素个数, 分区也得是一样的val zipRDD = oneRDD.zip(otherRDD)zipRDD.collect() // Array[(Int, Int)] = Array((1,3), (2,4), (3,5))

zipPartitions

//要求两个rdd需要有相同的分区数,但是每一个分区可以不需要有相同的元素个数val zipPartitionRDD =oneRDD.zipPartitions(otherRDD)((iterator1, iterator2)=> Iterator(iterator1.sum + iterator2.sum))  // 对每个分区进行sum操作zipPartitionRDD.collect() // Array[Int] = Array(0, 4, 6, 8)

toDebugString

 mapRDD.toDebugString  // 依赖链

localCheckpoint

mapRDD.localCheckpoint()  // 切断依赖 写入的是内存  速度快,数据的可靠性差

checkpoint

sc.setCheckpointDir("hdfs://master:8020/users/hadoop-jrq/checkpoint")  // 设置路径otherMapRDD.checkpoint()  // 此步只是创建了一个目录  使用的是磁盘,数据的可靠性强,速度慢otherMapRDD.toDebugString  // 查看依赖

cogroup

//res0: Array[(Int, (Iterable[Int], Iterable[Int]))]// = Array((4,(CompactBuffer(),CompactBuffer(5))), (1,(CompactBuffer(2),CompactBuffer())),// (5,(CompactBuffer(6),CompactBuffer())), (3,(CompactBuffer(6, 4),CompactBuffer(9))))pairRDD.cogroup(otherRDD).collect()

groupWith

//groupWith是cogroup的别名,效果和cogroup一摸一样pairRDD.groupWith(otherRDD).collect()

join

// Array[(Int, (Int, Int))] = Array((3,(4,9)), (3,(6,9)))pairRDD.join(otherRDD).collect()

leftOuterJoin

// Array[(Int, (Int, Option[Int]))]// = Array((1,(2,None)), (5,(6,None)), (3,(4,Some(9))), (3,(6,Some(9))))pairRDD.leftOuterJoin(otherRDD).collect()

rightOuterJoin

// Array[(Int, (Option[Int], Int))] = Array((4,(None,5)), (3,(Some(4),9)), (3,(Some(6),9)))pairRDD.rightOuterJoin(otherRDD).collect()

fullOuterJoin

// Array[(Int, (Option[Int], Option[Int]))]// = Array((4,(None,Some(5))), (1,(Some(2),None)), (5,(Some(6),None)),// (3,(Some(4),Some(9))), (3,(Some(6),Some(9))))pairRDD.fullOuterJoin(otherRDD).collect()

subtractByKey

// 减掉相同的key, 这个示例减掉了为3的key// Array[(Int, Int)] = Array((1,2), (5,6))pairRDD.subtractByKey(otherRDD).collect()

combineByKey

 val pairStrRDD = sc.parallelize[(String, Int)](Seq(("coffee", 1),("coffee", 2), ("panda", 3), ("coffee", 9)), 2)// 碰到第一个key的时候def createCombiner = (value: Int) => (value, 1)// 碰到第二个key的时候怎么做def mergeValue = {// 这里的acc其实就是("coffee", 2) => (2,1) 上面的value是下面的value(acc: (Int, Int), value: Int) => (acc._1 + value, acc._2 + 1)}// 在不同分区里面进行mergedef mergeCombiners = {// 对两个分区里的数据进行合并,acc1代表一个分区// 第一个int是它的累加值,后一个是出现的次数(acc1: (Int, Int), acc2: (Int, Int)) =>  // (1,1)  (1,2)  => (2,3)(acc1._1 + acc2._1, acc1._2 + acc2._2)}//功能:对pairStrRDD这个RDD统计每一个相同key对应的所有value值的累加值以及这个key出现的次数//需要的三个参数://createCombiner: V => C,  ==> Int -> (Int, Int)//mergeValue: (C, V) => C,    ==> ((Int, Int), Int) -> (Int, Int)//mergeCombiners: (C, C) => C    ==> ((Int, Int), (Int, Int)) -> (Int, Int)val testCombineByKeyRDD =pairStrRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)testCombineByKeyRDD.collect()

count

val numberRDD = sc.parallelize(1 to 10000 , 200)//RDD a内容 union 5次,其中有50000个元素val numbersRDD = numberRDD ++ numberRDD ++ numberRDD ++ numberRDD ++ numberRDDnumbersRDD.count()

countApprox countByValueApprox countApproxDistinct countByKeyApprox countByKey countByKeyApprox

 val numberRDD = sc.parallelize(1 to 10000 , 200)//RDD a内容 union 5次,其中有50000个元素val numbersRDD = numberRDD ++ numberRDD ++ numberRDD ++ numberRDD ++ numberRDDnumbersRDD.count()//第一个参数是超时时间//第二个参数是期望达到近似估计的准确度// 如果你不断用0.9来调用countApprox,则我们期望90%的结果数据是正确的count值//如果count统计在超时时间内执行完,则不会近视估值,而是取正确的值//如果count统计在超时时间内没有执行完,则根据执行完的task的返回值和准确度进行近似估值val resultCount = numbersRDD.countApprox(200, 0.9)resultCount.initialValue.meanresultCount.initialValue.lowresultCount.initialValue.highresultCount.initialValue.confidenceresultCount.getFinalValue().meannumbersRDD.countByValue()val resultCountValue = numbersRDD.countByValueApprox(200, 0.9)resultCountValue.initialValue(1).mean//结果是9760,不传参数,默认是0.05numbersRDD.countApproxDistinct()//结果是9760numbersRDD.countApproxDistinct(0.05)//8224numbersRDD.countApproxDistinct(0.1)//10000  参数越小值越精确numbersRDD.countApproxDistinct(0.006)val pair = sc.parallelize((1 to 10000).zipWithIndex)pair.collect()val pairFive = pair ++ pair ++ pair ++ pair ++ pairpairFive.countByKey()pairFive.countByKeyApprox(10, 0.95)//用HyperLogLogPlus来实现的//也是调用combineByKey来实现的//val createCombiner = (v: V) => {//  val hll = new HyperLogLogPlus(p, sp)//  hll.offer(v)//  hll//}//val mergeValue = (hll: HyperLogLogPlus, v: V) => {//  hll.offer(v)//  hll//}//val mergeCombiner = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {//  h1.addAll(h2)//  h1//}pairFive.countApproxDistinctByKey(0.1).collect().sizepairFive.collectAsMap()pairFive.lookup(5)

keyBy

val personSeqRDD =sc.parallelize(Seq(User("jeffy", 30), User("kkk", 20), User("jeffy", 30), User("kkk", 30)))//将RDD变成二元组类型的RDDval keyByRDD = personSeqRDD.keyBy(x => x.userId)keyByRDD.collect()

groupBy

// 按照user.userId进行分组,也可以多个分组条件val groupByRDD = personSeqRDD.groupBy(user => user.userId)groupByRDD.glom().collect()

partitioner

mapValuesRDD.partitioner //会记住父亲RDD的分区器

sortByKey

// false 升降序
pairRDD.sortByKey(false).collect()

coalesce

val hdfsFileRDD = sc.textFile("hdfs://master:8020/users/hadoop-jrq/word.txt", 1000)hdfsFileRDD.partitions.size // 1000//我们通过coalesce来降低分区数量的目的是://分区太多,每个分区的数据量太少,导致太多的task,我们想减少task的数量,所以需要降低分区数// 如果分区数太多了,就会造成很多task空跑,浪费资源及效率低,因此需要降低其分区数//第一个参数表示我们期望的分区数//第二个参数表示是否需要经过shuffle来达到我们的分区数val coalesceRDD = hdfsFileRDD.coalesce(100, false)coalesceRDD.partitions.size //100//从1000个分区一下子降到2个分区//这里会导致1000个map计算只在2个分区上执行,会导致性能问题// 后面的true是map计算后,再重新分区,进行计算hdfsFileRDD.map(_ + "test").coalesce(2, true)

wholeTextFiles

// wholeTextFiles 读整个文件夹下的文件,键是文件名,值是内容val wholeTextFiles = sc.wholeTextFiles("hdfs://master:8020/users/hadoop-jrq/text/")

binaryFiles

// 类似上面的 key是文件名  值是文件流val binaryFilesRDD = sc.binaryFiles("hdfs://master:8020/users/hadoop-jrq/text/")

spark RDD汇总(一)相关推荐

  1. Spark算子汇总和理解(详细)

    Spark之所以比Hadoop灵活和强大,其中一个原因是Spark内置了许多有用的算子,也就是方法.通过对这些方法的组合,编程人员就可以写出自己想要的功能.说白了spark编程就是对spark算子的使 ...

  2. Spark RDD、DataFrame原理及操作详解

    RDD是什么? RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用. RDD内部可以 ...

  3. spark RDD 打印元素

    最近使用Scala操作Spark,想要输出RDD中相关内容去查找错误. 这样就遇到了一个问题:在单机模式下输出了相关的内容,而在集群模式中的操作却没有输出. 试过了一些方法,包含idea中调试,Pri ...

  4. Spark面试,Spark面试题,Spark面试汇总

    Table of Contents 1.你觉得spark 可以完全替代hadoop 么? 2.Spark消费 Kafka,分布式的情况下,如何保证消息的顺序? 3.对于 Spark 中的数据倾斜问题你 ...

  5. Spark RDD 论文详解(三)Spark 编程接口

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  6. Spark RDD 论文详解(二)RDDs

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  7. Spark RDD案例(五)经纬度转换为地理位置

    Spark RDD案例(五)经纬度转换为地理位置 1. 背景 Spark作为大数据分析引擎,本身可以做离线和准实时数据处理 Spark抽象出的操作对象如RDD.dataSet.dataFrame.DS ...

  8. Spark RDD常用算子使用总结

    文章目录 概述 Transformation(转换算子) 1. map 2. flatMap 3. filter 4. mapPartitions 5. mapPartitionsWithIndex ...

  9. Spark RDD/Core 编程 API入门系列之动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战(二)...

    1.动手实战和调试Spark文件操作 这里,我以指定executor-memory参数的方式,启动spark-shell. 启动hadoop集群 spark@SparkSingleNode:/usr/ ...

最新文章

  1. Linux 操作系统原理 — 零拷贝技术
  2. 【Java面试题】13 Anonymous Inner Class (匿名内部类) 是否可以extends(继承)其它类,是否可以implements(实现)interface(接口)?...
  3. SBT搭建Spark
  4. ImportError: No module named sklearn.metrics
  5. [09]CSS 边框与背景 (上)
  6. UVa 1588 - Kickdown(BUG)
  7. 原创:DELPHI7下动态生成DBCHART,可结合上篇博文,动态生成整个CHART图
  8. 微信人格专业测试软件,如何在微信公众号中关联《九型人格测试专业版》小程序?...
  9. 读书笔记之吴伯凡·认知方法论
  10. 涂鸦小程序——为自己的人生画上一笔
  11. python+django-mezzanine安装
  12. 几何不变矩 Hu 矩
  13. 第八篇《颅骨穿孔——前篇》
  14. 淘宝、天猫API大全,SKU信息,商品详情调用展示
  15. CS6的css的类别,Photoshop CS6使用“样式”面板
  16. 基于LSSVM和PSO进行信号预测(Matlab代码实现)
  17. 浏览器自动旋转图片的问题(Exif的oritetion原因)
  18. 堕落之后重新开始学习
  19. 远程桌面登录,锁定与解锁
  20. 安装使用日语分词工具-----kuromoji

热门文章

  1. dumps-loads dump-load的区别
  2. 数据结构查找-7-4 集合相似度 (25 分)
  3. 通过echarts 使用china.js时出现南沙诸岛乱码问题
  4. leecode-试水
  5. springboot+阿里云OSS分片上传、断点续传、秒传
  6. java创建mysql sche_爱可生详解MySQL|入门必看DBLE中间件使用指南第一章:初识DBLE...
  7. python中常用于输出信息的语句函数是print括号_Python语句 print(type(1/2)的输出结果是...
  8. RTP 上封装H264 数据包
  9. 绝对不亏 | VMvare保姆级安装教程
  10. 计算机软件漏洞防御方法,溢出(漏洞)***的实现及防御