RDD -- Transformation算子分析
RDD
RDD(Resilient Distributed Datasets) ,弹性分布式数据集, 是分布式内存的一个抽象概念,RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建,然而这些限制使得实现容错的开销很低。对开发者而言,RDD可以看作是Spark的一个对象,它本身运行于内存中,如读文件是一个RDD,对文件计算是一个RDD,结果集也是一个RDD ,不同的分片、 数据之间的依赖 、key-value类型的map数据都可以看做RDD。(注意:来自百度百科)
RDD 操作分类
RDD操作分为两种算子:Transformation 和 Actions。这两种算子区分本质是否触发任务提交。
Transformation:只是把依赖关系和转换关系记录在血统中并不会触发任务提交。
Actions:遇到这种算子就会触发任务提交,并把结果返回。
Transformation:
map(func)
filter(func)
flatMap(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numPartitions]))
groupByKey([numPartitions])
reduceByKey(func, [numPartitions])
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
sortByKey([ascending], [numPartitions])
join(otherDataset, [numPartitions])
cogroup(otherDataset, [numPartitions])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)
RDD 继承关系
map
官网 API 介绍
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
源码
/*** Return a new RDD by applying a function to all elements of this RDD.*/def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}
mapPartitions
官网 API 介绍
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.
源码
/*** Return a new RDD by first applying a function to all elements of this* RDD, and then flattening the results.*/def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))}
Transformations 算子都是一样,创建一个新的RDD,并没有去提交计算任务。
例子
map
map:对集合中每个元素操作
def map[U: ClassTag](f: T => U): RDD[U]
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)val b = a.map(x => (x.length, x))b.collect.foreach(println)// (3,dog)// (5,tiger)// (4,lion)// (3,cat)// (7,panther)// (5,eagle)
filter
filter:过滤
def filter(f: T => Boolean): RDD[T]
val a = sc.parallelize(1 to 10, 3)val b = a.filter(_ % 2 == 0)b.collect.foreach(println)// 2// 4// 6// 8// 10
flatMap
flatMap和map很像,多了一个压扁过程
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
val a = sc.parallelize(1 to 10, 5)a.flatMap(1 to _).collect.foreach(println)// 1// 1// 2// 1// 2// 3// 1// 2// 3// 4// 1// 2// 3// 4// 5// 1// 2// 3// 4// 5// 6// 1// 2// 3// 4// 5// 6// 7// 1// 2// 3// 4// 5// 6// 7// 8// 1// 2// 3// 4// 5// 6// 7// 8// 9// 1// 2// 3// 4// 5// 6// 7// 8// 9// 10
mapPartitions
mapPartitions:在每个分区中执行map操作,和map操作的单位为单个元素,mapPartitions操作的单位为分区,在map操作数据库等消耗资源时,用mapPartitions优化。
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
val a = sc.parallelize(1 to 9, 3)def myfunc[T](iter: Iterator[T]): Iterator[(T, T)] = {var res = List[(T, T)]()var pre = iter.nextwhile (iter.hasNext) {val cur = iter.nextres.::=(pre, cur)pre = cur}res.iterator}a.mapPartitions(myfunc).collect.foreach(println)// (2,3)// (1,2)// (5,6)// (4,5)// (8,9)// (7,8)
mapPartitionsWithIndex
mapPartitionsWithIndex:函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引
def main(args: Array[String]): Unit = {//first()//second()third()}def first(): Unit = {val x = sc.parallelize(List(1, 2, 3, 4, 5, 7, 8, 9, 10), 3)def myfunc1(index: Int, iter: Iterator[Int]): Iterator[String] = {iter.map(x => index + ", " + x)}x.mapPartitionsWithIndex(myfunc1).collect().foreach(println)// 0, 1// 0, 2// 0, 3// 1, 4// 1, 5// 1, 7// 2, 8// 2, 9// 2, 10}def second(): Unit = {val randRDD = sc.parallelize(List((2, "cat"), (6, "mouse"), (7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3)val rPartitioner = new RangePartitioner(3, randRDD)val partitioned = randRDD.partitionBy(rPartitioner)def myfunc2(index: Int, iter: Iterator[(Int, String)]): Iterator[String] = {iter.map(x => "[partID: " + index + ", val:" + x + "]")}partitioned.mapPartitionsWithIndex(myfunc2).collect().foreach(println)// [partID: 0, val:(2,cat)]// [partID: 0, val:(3,book)]// [partID: 0, val:(1,screen)]// [partID: 1, val:(4,tv)]// [partID: 1, val:(5,heater)]// [partID: 2, val:(6,mouse)]// [partID: 2, val:(7,cup)]}def third(): Unit = {val z = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)def myfunc3(index: Int, iter: Iterator[Int]): Iterator[String] = {iter.map(x => "[partID:" + index + ", val:" + x + "]")}z.mapPartitionsWithIndex(myfunc3).collect().foreach(println)// [partID:0, val:1]// [partID:0, val:2]// [partID:0, val:3]// [partID:1, val:4]// [partID:1, val:5]// [partID:1, val:6]}
sample
sample : 从原来RDD随机抽样出一部分元素组成一个新的RDD
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T]
def main(args: Array[String]): Unit = {first()}def first(): Unit ={val a = sc.parallelize(1 to 10000,3)a.sample(false,0.001,444).collect().foreach(println)}// 120// 424// 477// 2349// 2691// 2773// 2988// 5143// 6449// 6659// 9820
union, ++
union:对于两个数据集进行合并操作(不会去除重复元素)
def ++(other: RDD[T]): RDD[T]
def union(other: RDD[T]): RDD[T]
val a = sc.parallelize(1 to 7,1)val b = sc.parallelize(5 to 10,2)a.union(b).collect().foreach(println)a.++(b).collect().foreach(println)// 1// 2// 3// 4// 5// 6// 7// 5// 6// 7// 8// 9// 10
intersection
intersection : 求这个数据集的交集(会去除重复元素)
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T]): RDD[T]
val x = sc.parallelize(1 to 20)val y = sc.parallelize(5 to 25)x.intersection(y).sortBy(x => x,true).collect().foreach(println)// 5// 6// 7// 8// 9// 10// 11// 12// 13// 14// 15// 16// 17// 18// 19// 20
distinct
distinct:去重
def distinct(): RDD[T]
def distinct(numPartitions: Int): RDD[T]
val x = sc.parallelize(1 to 10)x.union(x).distinct().collect().foreach(println)// 8// 1// 9// 10// 2// 3// 4// 5// 6// 7
groupByKey
groupByKey和reduceByKey虽然两个函数都能得出正确的结果, 但reduceByKey函数更适合使用在大数据集上。 这是因为Spark知道它可以在每个分区移动数据之前将输出数据与一个共用的key结合。
reduceByKey
reduceByKey:类似于mapreduce的reduce阶段
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
val a = sc.parallelize(List("dog","cat","owl","gnu","ant",""))val animal = sc.parallelize(List("Lion","Deer","Leopard","Monkey","Elephant","Chimpanzees","Horse","Bear","Donkey","Kangaroo","Ox","Hedgehog","Sheep","Rhinoceros"))val b = a.union(animal).map(x => (x.length,x))b.reduceByKey((x,y)=> x+",\t"+y).collect().foreach(println)// (0,)// (8,Elephant, Kangaroo, Hedgehog)// (10,Rhinoceros)// (2,Ox)// (11,Chimpanzees)// (3,dog, cat, owl, gnu, ant)// (4,Lion, Deer, Bear)// (5,Horse, Sheep)// (6,Monkey, Donkey)// (7,Leopard)
aggregateByKey
aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U) 先局部操作,再全局操作
zeroValue:分区操作初始值
seqOp:分区内操作规则
combOp:全局操作规则
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
val pairRDD = sc.parallelize(List(("cat", 2), ("cat", 5), ("mouse", 4), ("cat", 12), ("dog", 12), ("mouse", 2)))println(pairRDD.partitions.length)def func(index: Int, iter: Iterator[(String, Int)]): Iterator[String] = {iter.map(x => "partID:" + index + ",val:" + x)}pairRDD.mapPartitionsWithIndex(func).collect().foreach(println)// partID:0,val:(cat,2)// partID:1,val:(cat,5)// partID:1,val:(mouse,4)// partID:2,val:(cat,12)// partID:3,val:(dog,12)// partID:3,val:(mouse,2)pairRDD.aggregateByKey(0)(math.max(_, _), math.max(_, _)).collect().foreach(println)// (dog,12)// (mouse,4)// (cat,12)pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect().foreach(println)// (dog,12)// (mouse,6)// (cat,19)pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect().foreach(println)// (dog,100)// (mouse,200)// (cat,300)pairRDD.aggregateByKey(100)(_ + _, _ + _).collect().foreach(println)// (dog,112)// (mouse,206)// (cat,319)
join
join:相同的key join
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
val a = sc.parallelize(List("dog","salmon","salmon","rat","elephant"))val b = a.keyBy(_.length)b.collect().foreach(println)val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"))val d = c.keyBy(_.length)d.collect().foreach(println)b.join(d).collect().foreach(println)
cogroup, groupWith
cogroup / groupWith : 是对最多三个RDD里key相同的,合并成集合
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], IterableW1], Iterable[W2]))]
val a = sc.parallelize(List(1,2,1,3,4,5,1,2,3,1,2,3))val b = a.map(x=>(x,"b"))b.collect().foreach(println)val c = a.map((_,"c"))val d = a.map(x=>(x,"d"))c.collect().foreach(println)d.collect().foreach(println)b.cogroup(c).collect().foreach(println)b.groupWith(c).collect().foreach(println)b.cogroup(c,d).collect().foreach(println)val x = sc.parallelize(List((1,"apple"),(2,"banana"),(3,"orange"),(4,"kiwi")))val y = sc.parallelize(List((5,"computer"),(1,"laptop"),(1,"desktop"),(4,"iPad")))x.cogroup(y).collect().foreach(println)
repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions :根据给定的分区程序重新分区RDD,并在每个结果分区中根据键对记录进行排序。
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
val randRDD = sc.parallelize(List((2,"cat"),(6,"mouse"),(7,"cup"),(3,"book"),(4,"tv"),(1,"screen"),(5,"heater")),3)val rPartitioner = new RangePartitioner(3,randRDD)val partitioned2 = randRDD.repartitionAndSortWithinPartitions(rPartitioner)partitioned2.mapPartitionsWithIndex(myfunc).collect().foreach(println)def myfunc2(index:Int,iter:Iterator[(Int,String)]):Iterator[String] = {iter.map(x => "partID:"+index+", val:"+x)}partitioned2.mapPartitionsWithIndex(myfunc2).collect().foreach(println)
车遥遥,马憧憧。
君游东山东复东,安得奋飞逐西风。
愿我如星君如月,夜夜流光相皎洁。
月暂晦,星常明。
留明待月复,三五共盈盈。
RDD -- Transformation算子分析相关推荐
- Spark RDD使用详解3--Value型Transformation算子
处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型 2)输入分区与输出分区多对一型 3)输 ...
- RDD之四:Value型Transformation算子
处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型 2)输入分区与输出分区多对一型 3)输 ...
- 一个略显复杂的transformation算子_distinct
进入distinct方法中 /*** Return a new RDD containing the distinct elements in this RDD.*/def distinct(numP ...
- Spark transformation算子案例
Spark支持两种RDD操作:transformation和action 在本文中,将对几个常用的transformation算子进行案例演示,采用Java和Scala两种语言对代码进行编写 其中 ...
- 深圳大数据培训:Transformation算子演示
深圳大数据培训:Transformation算子演示 val conf = new SparkConf().setAppName("Test").setMaster("l ...
- Spark的RDD转换算子
目录 RDD转换算子 Value 类型 1. map 2. mapPartitions map 和mapPartitions 的区别 3. mapPartitionsWithIndex 4. flat ...
- Spark RDD Transformation
RDD Transformation特点 lazy 只记录操作,不计算结果,类似于tensorflow中图的概念 转换 含义 map(func) 返回一个新的RDD,该RDD由每一个输入元素经过fun ...
- Spark学习笔记(7)——RDD行动算子
RDD方法又称RDD算子. 算子 : Operator(操作) RDD的方法和Scala集合对象的方法不一样,集合对象的方法都是在同一个节点的内存中完成的.RDD的方法可以将计算逻辑发送到Execut ...
- Spark RDD使用详解4--Key-Value型Transformation算子
Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一.聚集.连接操作. 输入分区与输出分区一对一 mapValues mapValues:针对(K ...
最新文章
- 在 CentOS 7.0 上源码安装 Xen 4.5
- java中oriqinal_Java集合10 (NavigableMap)
- 【51单片机快速入门指南】8:看门狗定时器(WDT)
- Storm原理与实现
- 电大本科免考英语和计算机,网络教育本科英语和计算机统考免考条件是什么?...
- React实战入门课程:dva开发一个cnode网站(3)
- 那些让您相见恨晚的app
- 《数学之友》期刊简介及投稿要求
- 计算机ps工具字母,PS如何制作m字母教程
- MySQL临时表的作用
- 软件人才争夺战日趋白热化
- Unity学习资源(超全)汇总 基础+项目+进阶+面试
- linux下root权限管理账号
- RMAN-06817: Pluggable Database CHARLESPDB cannot be backed up in NOARCHIVELOG mode.
- 六、入门python第六课
- 怎样解锁CAD图纸中被锁定的图层?
- 构造方法的作用和特点
- 关于计算机老师的话,关于赞美老师的话
- Android实现图片放大缩小
- 怎么把OFD文件转换成Word?分享轻松转换的方法
热门文章
- 【ironic】ironic介绍与原理
- php随机分配班级座位,浅谈班级座位安排原则与方法
- Paper Reading 《SimCSE》
- 度度熊与邪恶大魔王 (百度之星之资格赛)
- android 服务自动运行怎么办,怎么解决安卓软件服务停止后又自启?
- 点击按钮触发声音(xaml实现)
- Ext 仿QQ邮箱全选效果
- 重庆2018年度进出口总值时隔四年重返5000亿元规模
- 互联网成数据宝库,网络数据采集技术推动人工智能发展
- 前端项目中package.json到底是什么,又充当着什么作用呢?一文来带你了解package.json!