https://blog.csdn.net/zzh118/article/details/51998163

1、创建RDD的两种方式: 
(1)、从HDFS读入数据产生RDD; 
(2)、有其他已存在的RDD转换得到新的RDD;

scala> val textFile = sc.textFile("hdfs://192.169.26.58:9000/home/datamining/zhaozhuohui/workspace/test01.txt")
scala> val tf2 = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
scala> tf2.clollect

2、RDD主要有三种类型的操作:Transformation、Action、Persist。

3、Transformation操作的懒加载机制:避免产生中间结果数据,在Action操作时才进行真正的操作。这样一连串的操作一起执行就有优化的空间。

4、Transformation的map操作:map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 

scala> val rdd1 = sc.parallelize(1 to 9, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27
scala> val rdd2 = rdd1.map(_ * 2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at map at <console>:29
scala> rdd2.collect
res12: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18) 

5、flatMap:与map类似,flatMap会将经过函数处理的元素生成到一个RDD中。 

scala> val rdd1 = sc.parallelize(1 to 4, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:27
scala> val rdd2 = rdd1.flatMap(x => 1 to x)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at flatMap at <console>:29
scala> rdd2.collect
res14: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)   

6、mapPartitions:map的一个变种。输入函数作用于每个分区,每个分区作为整体来处理。输入函数的参数是迭代器,返回值也是一个迭代器。处理后的合并结果会自动转化成一个新的RDD。

scala> val rdd1 = sc.parallelize(1 to 9, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:27
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {|   var res = List[(T, T)]()|   var pre = iter.next|   while (iter.hasNext)|   {|     val cur = iter.next;|     res .::= (pre, cur)|     pre = cur;|   }|   res.iterator| }
myfunc: [T](iter: Iterator[T])Iterator[(T, T)]scala> rdd1.mapPartitions(myfunc).collect
res17: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

7、glom:将每个分区中的元素转换成Array,这样每个分区就只有一个数组元素,最终返回一个RDD 

scala> var rdd1 = sc.makeRDD(1 to 10, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at makeRDD at <console>:27
scala> rdd1.partitions.size
res18: Int = 3
scala> rdd1.glom().collect
res19: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

8、Transformation的filter操作:筛选出输入函数计算结果为true的元素,放入到一个新的RDD中。

def funOps2(): Unit = {val a = sc.parallelize(1 to 10, 3)val b = a.filter(_ % 2 == 0)b.collect
}

9、Transformation的distinct操作

scala> val a = sc.parallelize(List("tom", "jim", "sherry", "dog", "tom"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> a.distinct.collect
res1: Array[String] = Array(jim, tom, dog, sherry)
scala> val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 1, 3, 2))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27
scala> b.distinct(2).partitions.length
res2: Int = 2

10、Transformation的cartesian计算两个RDD的笛卡尔积

scala> val x = sc.parallelize(List(1, 2, 3, 4, 5))
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:27
scala> val y = sc.parallelize(List(6, 7, 8, 9, 10))
y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:27
scala> x.cartesian(y).collect
res3: Array[(Int, Int)] = Array((1,6), (1,7), (2,6), (2,7), (1,8), (1,9), (1,10), (2,8), (2,9), (2,10), (3,6), (3,7), (4,6), (4,7), (5,6), (5,7), (3,8), (3,9), (3,10), (4,8), (4,9), (4,10), (5,8), (5,9), (5,10))

11、Transformation的union,++操作,两个RDD取并集

scala> val a = sc.parallelize(3 to 6, 1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:27
scala> val b = sc.parallelize(5 to 7, 1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27
scala> (a ++ b).collect
res4: Array[Int] = Array(3, 4, 5, 6, 5, 6, 7)

12、Transformation的mapValues操作,处理两个元素的tuple构成的RDD。把mapValue参数传入的输入函数应用到每个value上,生成一个由两个元素的tuple构成的新的RDD。

scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panda", "eagle"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:27
scala> val b = a.map(x => (x.length, x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[15] at map at <console>:31
scala> b.mapValues("x" + _ + "x").collect
res6: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (5,xpandax), (5,xeaglex))

13、Transformation的subtract操作,取两个RDD的差集,返回一个新的RDD。

scala> val a = sc.parallelize(1 to 9, 1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:27
scala> val b = sc.parallelize(1 to 5, 1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:27
scala> val c = a.subtract(b)
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at subtract at <console>:31
scala> c.collect
res7: Array[Int] = Array(6, 7, 8, 9)

14、Transformation的sample操作,随机从RDD中取出一个片段作为一个新的RDD。

scala> val a = sc.parallelize(1 to 10, 1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:27
scala> a.sample(true, 0.5, 0).count
res9: Long = 4
scala> a.sample(true, 0.2, 12).count
res10: Long = 2

15、takeSample随机取指定数目的元素,返回的是数组不是RDD

scala> val x = sc.parallelize(1 to 10, 1)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:27
scala> x.takeSample(true, 5, 1)
res12: Array[Int] = Array(3, 4, 7, 10, 3)

16、Transformation的groupByKey操作,用于将RDD[K,V]中每个K对应的V值,合并到一个集合Iterable[V]中。

scala> var rdd1 = sc.makeRDD(Array(("A", 0), ("A", 2), ("B", 1), ("c", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at makeRDD at <console>:27
scala> rdd1.groupByKey().collect
res13: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(1)), (A,CompactBuffer(0, 2)), (c,CompactBuffer(1)))

17、Transformation的partitionBy操作,根据传入的分区器进行分区。

18、Transformation的cogroup操作,相当于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空。

scala> var rdd1 = sc.makeRDD(Array(("A", "1"), ("B", "2"), ("C", "3")), 2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at <console>:27
scala> var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")), 2)
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[35] at makeRDD at <console>:27
scala> var rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[37] at cogroup at <console>:31
scala> rdd3.collect
res16: Array[(String, (Iterable[String], Iterable[String]))] = Array((B,(CompactBuffer(2),CompactBuffer())), (D,(CompactBuffer(),CompactBuffer(d))), (A,(CompactBuffer(1),CompactBuffer(a))), (C,(CompactBuffer(3),CompactBuffer(c))))

19、Transformation的combineByKey操作,使用用户设置好的聚合函数对每个key中得value进行组合(combine),可以将输入类型为RDD[(k, v)]转成RDD[(k, c)]。

20、Transformation的reduceByKey操作,该函数用于将RDD[K,V]中每个K对应的V值根据映射函数来两两运算。

scala> var rdd1 = sc.makeRDD(Array(("A", 0), ("A", 2), ("B", 1), ("B", 2),("c", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at makeRDD at <console>:27
scala> rdd1.partitions.length
res14: Int = 2
scala> var rdd2 = rdd1.reduceByKey(_ + _)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at <console>:29
scala> rdd2.collect
res15: Array[(String, Int)] = Array((B,3), (A,2), (c,1))   

21、Transformation的join操作,def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))],要进行join操作的两个RDD的每个元素必须是两个子元素的tuple.

scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[38] at makeRDD at <console>:27
scala> var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[39] at makeRDD at <console>:27
scala> rdd1.join(rdd2).collect
res17: Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))

22、Transformation的leftOuterJoin、rightOuterJoin操作。

Spark RDD的Transformation操作相关推荐

  1. 详解 Spark RDD 的转换操作与行动操作

    前言 本期继续讲解 Spark 核心 RDD 编程部分,内容比较干货也比较长,建议大家先收藏. 学习目标 RDD 的创建 RDD 的转换操作 RDD 的行动操作 惰性求值 1. RDD 的创建 Spa ...

  2. spark Rdd 操作transformaction和action等

    为什么80%的码农都做不了架构师?>>>    transformation是惰性的,只有action操作的时候,才会真正执行.spark有很多api,RDD的api只是spark的 ...

  3. Spark Streaming 实战案例(二) Transformation操作

    本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html DStream Tra ...

  4. Spark RDD使用详解4--Key-Value型Transformation算子

    Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一.聚集.连接操作. 输入分区与输出分区一对一 mapValues mapValues:针对(K ...

  5. Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)...

    本博文的主要内容是: 1.rdd基本操作实战 2.transformation和action流程图 3.典型的transformation和action RDD有3种操作: 1.  Trandform ...

  6. Spark RDD、DataFrame原理及操作详解

    RDD是什么? RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用. RDD内部可以 ...

  7. Spark RDD算子(transformation + action)

    概念 RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.RDD具有数据流模 ...

  8. Spark RDD/Core 编程 API入门系列之动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战(二)...

    1.动手实战和调试Spark文件操作 这里,我以指定executor-memory参数的方式,启动spark-shell. 启动hadoop集群 spark@SparkSingleNode:/usr/ ...

  9. Spark RDD使用详解3--Value型Transformation算子

    处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型  2)输入分区与输出分区多对一型  3)输 ...

  10. Spark——RDD操作详解

    转载自:https://blog.csdn.net/zhaojw_420/article/details/53261965 一.基本RDD 1.针对各个元素的转化操作  最常用的转化操作是map()和 ...

最新文章

  1. 批量创建用户和设置密码
  2. LeetCode实战:最长回文子串
  3. NetBeans 7.4的本机Java打包
  4. 项目案例:qq数据库管理_2小时元项目:项目管理您的数据科学学习
  5. JVM入门到放弃之基本概念
  6. 第十篇:Spring Boot整合mybatis+逆向工程(Mysql+Oracle) 入门试炼01
  7. 系统磁盘空间满的一个问题
  8. lpc1788的地址空间分配
  9. 【PCB Layout】PCB布局布线经验总结
  10. 给华南x99打鸡血BIOS教程
  11. redis 集群 set key报错CLUSTERDOWN Hash slot not served
  12. web开发设为首页、添加到收藏夹实现方法
  13. Asp.Net Core 密码加密方案
  14. 兔子生崽问题编程_兔子生崽问题
  15. MBR“主引导记录”的局限性与GPT GUID分区表的优势
  16. HTML和CSS实现图片翻转效果
  17. 使用rest_framework的routers模块添加路由
  18. 天津情侣朋友游玩项目
  19. 投影机基本故障及解决方法
  20. 我与梅西粉丝们的世界杯观球日常

热门文章

  1. 字符串分割和数组组合(spilt,join)
  2. 自然语言处理 情绪识别
  3. 如何将硬盘分区合并不丢失数据,合并两个硬盘分区不删除数据
  4. windows磁盘分区合并(比如合并到C盘/分区扩容)问题/删除恢复分区
  5. Android 短视频SDK
  6. python禁用路径长度限制有啥影响吗_为什么Windows中存在260个字符的路径长度限制?...
  7. 【OCP学习1z0-053记录74】151 DBMS_TDB
  8. java 发送html格式邮件 样式混乱解决
  9. sqlplus执行语句报错:unknown command beginning解决方案
  10. excel表格打印每页都有表头_分享|1分钟学会,让打印的表格每页自带标题行