Spark RDD的Transformation操作
https://blog.csdn.net/zzh118/article/details/51998163
1、创建RDD的两种方式:
(1)、从HDFS读入数据产生RDD;
(2)、有其他已存在的RDD转换得到新的RDD;
scala> val textFile = sc.textFile("hdfs://192.169.26.58:9000/home/datamining/zhaozhuohui/workspace/test01.txt")
scala> val tf2 = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
scala> tf2.clollect
2、RDD主要有三种类型的操作:Transformation、Action、Persist。
3、Transformation操作的懒加载机制:避免产生中间结果数据,在Action操作时才进行真正的操作。这样一连串的操作一起执行就有优化的空间。
4、Transformation的map操作:map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。
scala> val rdd1 = sc.parallelize(1 to 9, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27
scala> val rdd2 = rdd1.map(_ * 2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at map at <console>:29
scala> rdd2.collect
res12: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
5、flatMap:与map类似,flatMap会将经过函数处理的元素生成到一个RDD中。
scala> val rdd1 = sc.parallelize(1 to 4, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:27
scala> val rdd2 = rdd1.flatMap(x => 1 to x)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at flatMap at <console>:29
scala> rdd2.collect
res14: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)
6、mapPartitions:map的一个变种。输入函数作用于每个分区,每个分区作为整体来处理。输入函数的参数是迭代器,返回值也是一个迭代器。处理后的合并结果会自动转化成一个新的RDD。
scala> val rdd1 = sc.parallelize(1 to 9, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:27
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {| var res = List[(T, T)]()| var pre = iter.next| while (iter.hasNext)| {| val cur = iter.next;| res .::= (pre, cur)| pre = cur;| }| res.iterator| }
myfunc: [T](iter: Iterator[T])Iterator[(T, T)]scala> rdd1.mapPartitions(myfunc).collect
res17: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
7、glom:将每个分区中的元素转换成Array,这样每个分区就只有一个数组元素,最终返回一个RDD
scala> var rdd1 = sc.makeRDD(1 to 10, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at makeRDD at <console>:27
scala> rdd1.partitions.size
res18: Int = 3
scala> rdd1.glom().collect
res19: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
8、Transformation的filter操作:筛选出输入函数计算结果为true的元素,放入到一个新的RDD中。
def funOps2(): Unit = {val a = sc.parallelize(1 to 10, 3)val b = a.filter(_ % 2 == 0)b.collect
}
9、Transformation的distinct操作
scala> val a = sc.parallelize(List("tom", "jim", "sherry", "dog", "tom"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> a.distinct.collect
res1: Array[String] = Array(jim, tom, dog, sherry)
scala> val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 1, 3, 2))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27
scala> b.distinct(2).partitions.length
res2: Int = 2
10、Transformation的cartesian计算两个RDD的笛卡尔积
scala> val x = sc.parallelize(List(1, 2, 3, 4, 5))
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:27
scala> val y = sc.parallelize(List(6, 7, 8, 9, 10))
y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:27
scala> x.cartesian(y).collect
res3: Array[(Int, Int)] = Array((1,6), (1,7), (2,6), (2,7), (1,8), (1,9), (1,10), (2,8), (2,9), (2,10), (3,6), (3,7), (4,6), (4,7), (5,6), (5,7), (3,8), (3,9), (3,10), (4,8), (4,9), (4,10), (5,8), (5,9), (5,10))
11、Transformation的union,++操作,两个RDD取并集
scala> val a = sc.parallelize(3 to 6, 1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:27
scala> val b = sc.parallelize(5 to 7, 1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27
scala> (a ++ b).collect
res4: Array[Int] = Array(3, 4, 5, 6, 5, 6, 7)
12、Transformation的mapValues操作,处理两个元素的tuple构成的RDD。把mapValue参数传入的输入函数应用到每个value上,生成一个由两个元素的tuple构成的新的RDD。
scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panda", "eagle"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:27
scala> val b = a.map(x => (x.length, x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[15] at map at <console>:31
scala> b.mapValues("x" + _ + "x").collect
res6: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (5,xpandax), (5,xeaglex))
13、Transformation的subtract操作,取两个RDD的差集,返回一个新的RDD。
scala> val a = sc.parallelize(1 to 9, 1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:27
scala> val b = sc.parallelize(1 to 5, 1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:27
scala> val c = a.subtract(b)
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at subtract at <console>:31
scala> c.collect
res7: Array[Int] = Array(6, 7, 8, 9)
14、Transformation的sample操作,随机从RDD中取出一个片段作为一个新的RDD。
scala> val a = sc.parallelize(1 to 10, 1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:27
scala> a.sample(true, 0.5, 0).count
res9: Long = 4
scala> a.sample(true, 0.2, 12).count
res10: Long = 2
15、takeSample随机取指定数目的元素,返回的是数组不是RDD
scala> val x = sc.parallelize(1 to 10, 1)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:27
scala> x.takeSample(true, 5, 1)
res12: Array[Int] = Array(3, 4, 7, 10, 3)
16、Transformation的groupByKey操作,用于将RDD[K,V]中每个K对应的V值,合并到一个集合Iterable[V]中。
scala> var rdd1 = sc.makeRDD(Array(("A", 0), ("A", 2), ("B", 1), ("c", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at makeRDD at <console>:27
scala> rdd1.groupByKey().collect
res13: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(1)), (A,CompactBuffer(0, 2)), (c,CompactBuffer(1)))
17、Transformation的partitionBy操作,根据传入的分区器进行分区。
18、Transformation的cogroup操作,相当于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空。
scala> var rdd1 = sc.makeRDD(Array(("A", "1"), ("B", "2"), ("C", "3")), 2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at <console>:27
scala> var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")), 2)
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[35] at makeRDD at <console>:27
scala> var rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[37] at cogroup at <console>:31
scala> rdd3.collect
res16: Array[(String, (Iterable[String], Iterable[String]))] = Array((B,(CompactBuffer(2),CompactBuffer())), (D,(CompactBuffer(),CompactBuffer(d))), (A,(CompactBuffer(1),CompactBuffer(a))), (C,(CompactBuffer(3),CompactBuffer(c))))
19、Transformation的combineByKey操作,使用用户设置好的聚合函数对每个key中得value进行组合(combine),可以将输入类型为RDD[(k, v)]转成RDD[(k, c)]。
20、Transformation的reduceByKey操作,该函数用于将RDD[K,V]中每个K对应的V值根据映射函数来两两运算。
scala> var rdd1 = sc.makeRDD(Array(("A", 0), ("A", 2), ("B", 1), ("B", 2),("c", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at makeRDD at <console>:27
scala> rdd1.partitions.length
res14: Int = 2
scala> var rdd2 = rdd1.reduceByKey(_ + _)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at <console>:29
scala> rdd2.collect
res15: Array[(String, Int)] = Array((B,3), (A,2), (c,1))
21、Transformation的join操作,def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))],要进行join操作的两个RDD的每个元素必须是两个子元素的tuple.
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[38] at makeRDD at <console>:27
scala> var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[39] at makeRDD at <console>:27
scala> rdd1.join(rdd2).collect
res17: Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))
22、Transformation的leftOuterJoin、rightOuterJoin操作。
Spark RDD的Transformation操作相关推荐
- 详解 Spark RDD 的转换操作与行动操作
前言 本期继续讲解 Spark 核心 RDD 编程部分,内容比较干货也比较长,建议大家先收藏. 学习目标 RDD 的创建 RDD 的转换操作 RDD 的行动操作 惰性求值 1. RDD 的创建 Spa ...
- spark Rdd 操作transformaction和action等
为什么80%的码农都做不了架构师?>>> transformation是惰性的,只有action操作的时候,才会真正执行.spark有很多api,RDD的api只是spark的 ...
- Spark Streaming 实战案例(二) Transformation操作
本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html DStream Tra ...
- Spark RDD使用详解4--Key-Value型Transformation算子
Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一.聚集.连接操作. 输入分区与输出分区一对一 mapValues mapValues:针对(K ...
- Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)...
本博文的主要内容是: 1.rdd基本操作实战 2.transformation和action流程图 3.典型的transformation和action RDD有3种操作: 1. Trandform ...
- Spark RDD、DataFrame原理及操作详解
RDD是什么? RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用. RDD内部可以 ...
- Spark RDD算子(transformation + action)
概念 RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.RDD具有数据流模 ...
- Spark RDD/Core 编程 API入门系列之动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战(二)...
1.动手实战和调试Spark文件操作 这里,我以指定executor-memory参数的方式,启动spark-shell. 启动hadoop集群 spark@SparkSingleNode:/usr/ ...
- Spark RDD使用详解3--Value型Transformation算子
处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型 2)输入分区与输出分区多对一型 3)输 ...
- Spark——RDD操作详解
转载自:https://blog.csdn.net/zhaojw_420/article/details/53261965 一.基本RDD 1.针对各个元素的转化操作 最常用的转化操作是map()和 ...
最新文章
- 批量创建用户和设置密码
- LeetCode实战:最长回文子串
- NetBeans 7.4的本机Java打包
- 项目案例:qq数据库管理_2小时元项目:项目管理您的数据科学学习
- JVM入门到放弃之基本概念
- 第十篇:Spring Boot整合mybatis+逆向工程(Mysql+Oracle) 入门试炼01
- 系统磁盘空间满的一个问题
- lpc1788的地址空间分配
- 【PCB Layout】PCB布局布线经验总结
- 给华南x99打鸡血BIOS教程
- redis 集群 set key报错CLUSTERDOWN Hash slot not served
- web开发设为首页、添加到收藏夹实现方法
- Asp.Net Core 密码加密方案
- 兔子生崽问题编程_兔子生崽问题
- MBR“主引导记录”的局限性与GPT GUID分区表的优势
- HTML和CSS实现图片翻转效果
- 使用rest_framework的routers模块添加路由
- 天津情侣朋友游玩项目
- 投影机基本故障及解决方法
- 我与梅西粉丝们的世界杯观球日常
热门文章
- 字符串分割和数组组合(spilt,join)
- 自然语言处理 情绪识别
- 如何将硬盘分区合并不丢失数据,合并两个硬盘分区不删除数据
- windows磁盘分区合并(比如合并到C盘/分区扩容)问题/删除恢复分区
- Android 短视频SDK
- python禁用路径长度限制有啥影响吗_为什么Windows中存在260个字符的路径长度限制?...
- 【OCP学习1z0-053记录74】151 DBMS_TDB
- java 发送html格式邮件 样式混乱解决
- sqlplus执行语句报错:unknown command beginning解决方案
- excel表格打印每页都有表头_分享|1分钟学会,让打印的表格每页自带标题行