RDD

RDD(Resilient Distributed Datasets) ,弹性分布式数据集, 是分布式内存的一个抽象概念,RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建,然而这些限制使得实现容错的开销很低。对开发者而言,RDD可以看作是Spark的一个对象,它本身运行于内存中,如读文件是一个RDD,对文件计算是一个RDD,结果集也是一个RDD ,不同的分片、 数据之间的依赖 、key-value类型的map数据都可以看做RDD。(注意:来自百度百科)


RDD 操作分类

RDD操作分为两种算子:Transformation 和 Actions。这两种算子区分本质是否触发任务提交。
Transformation:只是把依赖关系和转换关系记录在血统中并不会触发任务提交。
Actions:遇到这种算子就会触发任务提交,并把结果返回。


Transformation:

map(func)
filter(func)
flatMap(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numPartitions]))
groupByKey([numPartitions])
reduceByKey(func, [numPartitions])
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
sortByKey([ascending], [numPartitions])
join(otherDataset, [numPartitions])
cogroup(otherDataset, [numPartitions])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)


RDD 继承关系

map

官网 API 介绍

map(func) Return a new distributed dataset formed by passing each element of the source through a function func.

源码

  /*** Return a new RDD by applying a function to all elements of this RDD.*/def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}

mapPartitions

官网 API 介绍

mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.

源码

  /***  Return a new RDD by first applying a function to all elements of this*  RDD, and then flattening the results.*/def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))}

Transformations 算子都是一样,创建一个新的RDD,并没有去提交计算任务。


例子


map

map:对集合中每个元素操作

def map[U: ClassTag](f: T => U): RDD[U]

    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)val b = a.map(x => (x.length, x))b.collect.foreach(println)//    (3,dog)//    (5,tiger)//    (4,lion)//    (3,cat)//    (7,panther)//    (5,eagle)

filter

filter:过滤

def filter(f: T => Boolean): RDD[T]

    val a = sc.parallelize(1 to 10, 3)val b = a.filter(_ % 2 == 0)b.collect.foreach(println)//    2//    4//    6//    8//    10

flatMap

flatMap和map很像,多了一个压扁过程

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

    val a = sc.parallelize(1 to 10, 5)a.flatMap(1 to _).collect.foreach(println)//    1//    1//    2//    1//    2//    3//    1//    2//    3//    4//    1//    2//    3//    4//    5//    1//    2//    3//    4//    5//    6//    1//    2//    3//    4//    5//    6//    7//    1//    2//    3//    4//    5//    6//    7//    8//    1//    2//    3//    4//    5//    6//    7//    8//    9//    1//    2//    3//    4//    5//    6//    7//    8//    9//    10

mapPartitions

mapPartitions:在每个分区中执行map操作,和map操作的单位为单个元素,mapPartitions操作的单位为分区,在map操作数据库等消耗资源时,用mapPartitions优化。

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

    val a = sc.parallelize(1 to 9, 3)def myfunc[T](iter: Iterator[T]): Iterator[(T, T)] = {var res = List[(T, T)]()var pre = iter.nextwhile (iter.hasNext) {val cur = iter.nextres.::=(pre, cur)pre = cur}res.iterator}a.mapPartitions(myfunc).collect.foreach(println)//    (2,3)//    (1,2)//    (5,6)//    (4,5)//    (8,9)//    (7,8)

mapPartitionsWithIndex

mapPartitionsWithIndex:函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引
  def main(args: Array[String]): Unit = {//first()//second()third()}def first(): Unit = {val x = sc.parallelize(List(1, 2, 3, 4, 5, 7, 8, 9, 10), 3)def myfunc1(index: Int, iter: Iterator[Int]): Iterator[String] = {iter.map(x => index + ", " + x)}x.mapPartitionsWithIndex(myfunc1).collect().foreach(println)//    0, 1//    0, 2//    0, 3//    1, 4//    1, 5//    1, 7//    2, 8//    2, 9//    2, 10}def second(): Unit = {val randRDD = sc.parallelize(List((2, "cat"), (6, "mouse"), (7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3)val rPartitioner = new RangePartitioner(3, randRDD)val partitioned = randRDD.partitionBy(rPartitioner)def myfunc2(index: Int, iter: Iterator[(Int, String)]): Iterator[String] = {iter.map(x => "[partID: " + index + ", val:" + x + "]")}partitioned.mapPartitionsWithIndex(myfunc2).collect().foreach(println)//    [partID: 0, val:(2,cat)]//    [partID: 0, val:(3,book)]//    [partID: 0, val:(1,screen)]//    [partID: 1, val:(4,tv)]//    [partID: 1, val:(5,heater)]//    [partID: 2, val:(6,mouse)]//    [partID: 2, val:(7,cup)]}def third(): Unit = {val z = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)def myfunc3(index: Int, iter: Iterator[Int]): Iterator[String] = {iter.map(x => "[partID:" + index + ", val:" + x + "]")}z.mapPartitionsWithIndex(myfunc3).collect().foreach(println)//  [partID:0, val:1]//  [partID:0, val:2]//  [partID:0, val:3]//  [partID:1, val:4]//  [partID:1, val:5]//  [partID:1, val:6]}

sample

sample : 从原来RDD随机抽样出一部分元素组成一个新的RDD

def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T]

  def main(args: Array[String]): Unit = {first()}def first(): Unit ={val a = sc.parallelize(1 to 10000,3)a.sample(false,0.001,444).collect().foreach(println)}//  120//  424//  477//  2349//  2691//  2773//  2988//  5143//  6449//  6659//  9820

union, ++

union:对于两个数据集进行合并操作(不会去除重复元素)

def ++(other: RDD[T]): RDD[T]
def union(other: RDD[T]): RDD[T]

    val a = sc.parallelize(1 to 7,1)val b = sc.parallelize(5 to 10,2)a.union(b).collect().foreach(println)a.++(b).collect().foreach(println)//    1//    2//    3//    4//    5//    6//    7//    5//    6//    7//    8//    9//    10

intersection

intersection : 求这个数据集的交集(会去除重复元素)

def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T]): RDD[T]

    val x = sc.parallelize(1 to 20)val y = sc.parallelize(5 to 25)x.intersection(y).sortBy(x => x,true).collect().foreach(println)//    5//    6//    7//    8//    9//    10//    11//    12//    13//    14//    15//    16//    17//    18//    19//    20

distinct

distinct:去重

def distinct(): RDD[T]
def distinct(numPartitions: Int): RDD[T]

    val x = sc.parallelize(1 to 10)x.union(x).distinct().collect().foreach(println)//    8//    1//    9//    10//    2//    3//    4//    5//    6//    7

groupByKey

groupByKey和reduceByKey虽然两个函数都能得出正确的结果, 但reduceByKey函数更适合使用在大数据集上。 这是因为Spark知道它可以在每个分区移动数据之前将输出数据与一个共用的key结合。

reduceByKey

reduceByKey:类似于mapreduce的reduce阶段

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

    val a = sc.parallelize(List("dog","cat","owl","gnu","ant",""))val animal = sc.parallelize(List("Lion","Deer","Leopard","Monkey","Elephant","Chimpanzees","Horse","Bear","Donkey","Kangaroo","Ox","Hedgehog","Sheep","Rhinoceros"))val b = a.union(animal).map(x => (x.length,x))b.reduceByKey((x,y)=> x+",\t"+y).collect().foreach(println)//    (0,)//    (8,Elephant,   Kangaroo,   Hedgehog)//    (10,Rhinoceros)//    (2,Ox)//    (11,Chimpanzees)//    (3,dog,   cat,    owl,    gnu,    ant)//    (4,Lion,  Deer,   Bear)//    (5,Horse,    Sheep)//    (6,Monkey,  Donkey)//    (7,Leopard)

aggregateByKey

aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U) 先局部操作,再全局操作

zeroValue:分区操作初始值
seqOp:分区内操作规则
combOp:全局操作规则

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

    val pairRDD = sc.parallelize(List(("cat", 2), ("cat", 5), ("mouse", 4), ("cat", 12), ("dog", 12), ("mouse", 2)))println(pairRDD.partitions.length)def func(index: Int, iter: Iterator[(String, Int)]): Iterator[String] = {iter.map(x => "partID:" + index + ",val:" + x)}pairRDD.mapPartitionsWithIndex(func).collect().foreach(println)//    partID:0,val:(cat,2)//    partID:1,val:(cat,5)//    partID:1,val:(mouse,4)//    partID:2,val:(cat,12)//    partID:3,val:(dog,12)//    partID:3,val:(mouse,2)pairRDD.aggregateByKey(0)(math.max(_, _), math.max(_, _)).collect().foreach(println)//    (dog,12)//    (mouse,4)//    (cat,12)pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect().foreach(println)//    (dog,12)//    (mouse,6)//    (cat,19)pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect().foreach(println)//    (dog,100)//    (mouse,200)//    (cat,300)pairRDD.aggregateByKey(100)(_ + _, _ + _).collect().foreach(println)//    (dog,112)//    (mouse,206)//    (cat,319)

join

join:相同的key join

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

    val a = sc.parallelize(List("dog","salmon","salmon","rat","elephant"))val b = a.keyBy(_.length)b.collect().foreach(println)val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"))val d = c.keyBy(_.length)d.collect().foreach(println)b.join(d).collect().foreach(println)

cogroup, groupWith

cogroup / groupWith : 是对最多三个RDD里key相同的,合并成集合

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], IterableW1], Iterable[W2]))]

    val a = sc.parallelize(List(1,2,1,3,4,5,1,2,3,1,2,3))val b = a.map(x=>(x,"b"))b.collect().foreach(println)val c = a.map((_,"c"))val d = a.map(x=>(x,"d"))c.collect().foreach(println)d.collect().foreach(println)b.cogroup(c).collect().foreach(println)b.groupWith(c).collect().foreach(println)b.cogroup(c,d).collect().foreach(println)val x = sc.parallelize(List((1,"apple"),(2,"banana"),(3,"orange"),(4,"kiwi")))val y = sc.parallelize(List((5,"computer"),(1,"laptop"),(1,"desktop"),(4,"iPad")))x.cogroup(y).collect().foreach(println)

repartitionAndSortWithinPartitions

repartitionAndSortWithinPartitions :根据给定的分区程序重新分区RDD,并在每个结果分区中根据键对记录进行排序。

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]

    val randRDD = sc.parallelize(List((2,"cat"),(6,"mouse"),(7,"cup"),(3,"book"),(4,"tv"),(1,"screen"),(5,"heater")),3)val rPartitioner = new RangePartitioner(3,randRDD)val partitioned2 = randRDD.repartitionAndSortWithinPartitions(rPartitioner)partitioned2.mapPartitionsWithIndex(myfunc).collect().foreach(println)def myfunc2(index:Int,iter:Iterator[(Int,String)]):Iterator[String] = {iter.map(x => "partID:"+index+", val:"+x)}partitioned2.mapPartitionsWithIndex(myfunc2).collect().foreach(println)

车遥遥,马憧憧。

君游东山东复东,安得奋飞逐西风。

愿我如星君如月,夜夜流光相皎洁。

月暂晦,星常明。

留明待月复,三五共盈盈。


RDD -- Transformation算子分析相关推荐

  1. Spark RDD使用详解3--Value型Transformation算子

    处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型  2)输入分区与输出分区多对一型  3)输 ...

  2. RDD之四:Value型Transformation算子

    处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型  2)输入分区与输出分区多对一型  3)输 ...

  3. 一个略显复杂的transformation算子_distinct

    进入distinct方法中 /*** Return a new RDD containing the distinct elements in this RDD.*/def distinct(numP ...

  4. Spark transformation算子案例

    Spark支持两种RDD操作:transformation和action  在本文中,将对几个常用的transformation算子进行案例演示,采用Java和Scala两种语言对代码进行编写  其中 ...

  5. 深圳大数据培训:Transformation算子演示

    深圳大数据培训:Transformation算子演示 val conf = new SparkConf().setAppName("Test").setMaster("l ...

  6. Spark的RDD转换算子

    目录 RDD转换算子 Value 类型 1. map 2. mapPartitions map 和mapPartitions 的区别 3. mapPartitionsWithIndex 4. flat ...

  7. Spark RDD Transformation

    RDD Transformation特点 lazy 只记录操作,不计算结果,类似于tensorflow中图的概念 转换 含义 map(func) 返回一个新的RDD,该RDD由每一个输入元素经过fun ...

  8. Spark学习笔记(7)——RDD行动算子

    RDD方法又称RDD算子. 算子 : Operator(操作) RDD的方法和Scala集合对象的方法不一样,集合对象的方法都是在同一个节点的内存中完成的.RDD的方法可以将计算逻辑发送到Execut ...

  9. Spark RDD使用详解4--Key-Value型Transformation算子

    Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一.聚集.连接操作. 输入分区与输出分区一对一 mapValues mapValues:针对(K ...

最新文章

  1. 在 CentOS 7.0 上源码安装 Xen 4.5
  2. java中oriqinal_Java集合10 (NavigableMap)
  3. 【51单片机快速入门指南】8:看门狗定时器(WDT)
  4. Storm原理与实现
  5. 电大本科免考英语和计算机,网络教育本科英语和计算机统考免考条件是什么?...
  6. React实战入门课程:dva开发一个cnode网站(3)
  7. 那些让您相见恨晚的app
  8. 《数学之友》期刊简介及投稿要求
  9. 计算机ps工具字母,PS如何制作m字母教程
  10. MySQL临时表的作用
  11. 软件人才争夺战日趋白热化
  12. Unity学习资源(超全)汇总 基础+项目+进阶+面试
  13. linux下root权限管理账号
  14. RMAN-06817: Pluggable Database CHARLESPDB cannot be backed up in NOARCHIVELOG mode.
  15. 六、入门python第六课
  16. 怎样解锁CAD图纸中被锁定的图层?
  17. 构造方法的作用和特点
  18. 关于计算机老师的话,关于赞美老师的话
  19. Android实现图片放大缩小
  20. 怎么把OFD文件转换成Word?分享轻松转换的方法

热门文章

  1. 【ironic】ironic介绍与原理
  2. php随机分配班级座位,浅谈班级座位安排原则与方法
  3. Paper Reading 《SimCSE》
  4. 度度熊与邪恶大魔王 (百度之星之资格赛)
  5. android 服务自动运行怎么办,怎么解决安卓软件服务停止后又自启?
  6. 点击按钮触发声音(xaml实现)
  7. Ext 仿QQ邮箱全选效果
  8. 重庆2018年度进出口总值时隔四年重返5000亿元规模
  9. 互联网成数据宝库,网络数据采集技术推动人工智能发展
  10. 前端项目中package.json到底是什么,又充当着什么作用呢?一文来带你了解package.json!