1. RDD Transformations

Transformation 描述
map(func) 对原RDD中的元素进行一一映射,返回新的RDD
filter(func) 对原RDD过滤,当 func 为 true 时则保留该元素,返回新的RDD
flatMap(func) 扁平化(降维)的 map,每个参数可映射到多个输出项,返回新的RDD
mapPartitions(func) 对每个分区的数据单独进行 map,func 必须是 Iterator<T> => Iterator<U> 类型
mapPartitionsWithIndex(func) 类似于 mapPartitions,但提供了一个表示分区数的整数值
sample(withReplace, fraction, seed) 使用给定的随机生成器的种子对数据进行采样
union(otherDataset) 将两个泛型相同的RDD合并(求并集),返回新的RDD
instersection(otherDataset) 将两个泛型相同的RDD相交(求交集),返回新的RDD
distinct([numPartitions]) 将原RDD去重,返回新的RDD
groupByKey([numPartitions]) 对 (K, V) 进行调用,返回 (K, Iterator) 泛型的新RDD
reduceByKey(func, [numPartitions]) 对 (K, V) 进行调用,将相同键的值进行合并,返回 (K, V) 泛型的新RDD
aggregateByKey(zeroValue)(SeqOP, combOp, [numPartitions]) 对 (K, V) 进行调用,返回 (K, U) 泛型的RDD,其中每个键的值使用给定的组合函数和中性“零”值聚合
sortByKey([ascending], [numPartitions]) 实现对 (K, V) 按照 K 进行排序
join(otherDataset, [numPartitions]) 对泛型为 (K, V ) 和 (K, W) 的两个数据集,返回 (K, (V, W))
cogroup(otherDataset, [numPartitions]) 对泛型为 (K, V ) 和 (K, W) 的两个数据集,返回 (K, (Iterator, Iterable))
cartesian(otherDataSet) 当调用泛型为 T 和 U 的数据集时,返回一个 (T, U) 对(所有元素对)的数据集。
pipe(command, [envVars]) 通过 shell 命令来管理管道 RDD 的每个分区
coalesce(numPartitions) 将 RDD 中的分区减少到指定值
repartition(numPartitions) 重新设置分区数,通过网络shuffle打乱数据重新分区
repartitionAndSortWithinPartitions(partitioner) 根据指定的分区数器对RDD重新分区,并在每个结果分区内,按键对记录进行排序

1.1 Map-Partition

键值对泛型的 RDD 可以通过 partitionBy 指定分区器。

Spark 默认实现了两种分区器:HashPartitionerRangePartitioner,也可以自定义分区器。

  • HashPartitioner:根据 key 的 hashCode 返回值对分区数取模

    • 优势:可以将相同 key 的元素分到同一分区,方便 byKey 的操作
    • 劣势:如果某些相同 key 的元素较多,容易造成数据倾斜
  • RangePartitioner:使用抽样方法,随机抽样并轮询分发数据到不容的分区
    • 优势:发分区后每个RDD中的元素数量相差无几
    • 劣势:会将数据打乱,如需 byBey 操作会重新进行 shuffle

自定义分区器:

  • 使用匿名类的方式自定义分区器
rdd.map(x => (x,x))
.partitionBy(new Partitioner {// 设置分区数量override def numPartitions: Int = 2// 根据 key 计算出分区编号override def getPartition(key: Any): Int = key.asInstanceOf[Int] % 2
})
  • 单独构造类的方式自定义分区器

    • 继承 Partitioner 抽象类
    • 重写其中的 numPartitionsgetPartition 分发
rdd.map(x => (x, x))
.partitionBy(new DefinePartition(2))class DefinePartition(num: Int) extends Partitioner {override def numPartitions: Int = numoverride def getPartition(key: Any): Int = key.asInstanceOf[Int] % 2
}

使用 mapPartitionsWithIndex 方法,还能获取分区编号

1.2 ByKey

在上面可以看到很多 ByKey 的算子,这些算子都适配于 (K, V) 泛型的 RDD,此类算子底层都是基于 combineByKeyWithClassTag 实现。

combineByKeyWithClassTag 中的 WithClassTag 相当于一种泛型检测机制,在该算子之上有一个简单的继承,名为combineByKey

def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null): RDD[(K, C)] = self.withScope {combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,partitioner, mapSideCombine, serializer)(null)
}

combineByKey 还是在调用 combineByKeyWithClassTag,为了方便,看此方法就可以了!

Spark RDD-Operations相关推荐

  1. Spark学习之Spark RDD算子

    个人主页zicesun.com 这里,从源码的角度总结一下Spark RDD算子的用法. 单值型Transformation算子 map /*** Return a new RDD by applyi ...

  2. Spark RDD概念学习系列之rdd持久化、广播、累加器(十八)

    1.rdd持久化 2.广播 3.累加器 1.rdd持久化 通过spark-shell,可以快速的验证我们的想法和操作! 启动hdfs集群 spark@SparkSingleNode:/usr/loca ...

  3. spark RDD官网RDD编程指南

    http://spark.apache.org/docs/latest/rdd-programming-guide.html#using-the-shell Overview(概述) 在较高的层次上, ...

  4. spark Rdd 操作transformaction和action等

    为什么80%的码农都做不了架构师?>>>    transformation是惰性的,只有action操作的时候,才会真正执行.spark有很多api,RDD的api只是spark的 ...

  5. value toDF is not a member of org.apache.spark.rdd.RDD[People]

    编译如下代码时,出现value toDF is not a member of org.apache.Spark.rdd.RDD[People]  错误 val rdd : RDD[People]= ...

  6. Spark RDD/Core 编程 API入门系列之动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战(二)...

    1.动手实战和调试Spark文件操作 这里,我以指定executor-memory参数的方式,启动spark-shell. 启动hadoop集群 spark@SparkSingleNode:/usr/ ...

  7. spark学习13(spark RDD)

    RDD及其特点 1)RDD(Resillient Distributed Dataset)弹性分布式数据集,是spark提供的核心抽象.它代表一个不可变.可分区.里面的元素可并行计算的集合 2)RDD ...

  8. Spark RDD API:Map和Reduce

    参考文章: http://blog.csdn.net/jewes/article/details/39896301 http://homepage.cs.latrobe.edu.au/zhe/Zhen ...

  9. 第14课:Spark RDD解密

    以下为Spark RDD解密课程学习心得: 在介绍Spark RDD之前,先简单的说下Hadoop MapReduce,它是基于数据流的方式进行计算,从物理存储上加载数据,然后操作数据, 最后写入到物 ...

  10. 学习笔记Spark(三)—— Spark架构及原理(spark架构、spark RDD)

    一.Spark架构 1.1.基本组件 Cluster Manager 在standalone模式中即为Master主节点,控制整个集群,监控worker.在YARN模式中为资源管理器. Worker ...

最新文章

  1. java 4d_GitHub - wm3445/Java-concurrency at 4d10ae51a9deec37340fc40d03f205cfbe8de43b
  2. SQL-语句实现九九乘法表
  3. OC高效率52之理解OC错误模型
  4. 【解决】make: 警告:检测到时钟错误。您的创建可能是不完整的
  5. 趋势科技云安全软件_阿里达摩院发布2020十大科技趋势!量子计算、类脑计算系统崛起...
  6. android开源2016_2016年开源领域的7大法律发展
  7. 聚宽macd底背离_老股民技巧一招鲜:MACD顶、底背离图解及近期实战应用,字字斗金...
  8. iOS底层探索之类的结构—cache分析(上)
  9. 匿名管道 与 命名管道
  10. PCB设计敷铜时的天线效应
  11. 新款大屏卡罗拉linux系统,丰田卡罗拉大屏车载导航影音系统
  12. 湖南师范大学2018年大学生程序设计竞赛新生赛 F-小名的回答
  13. php的persion是,php创建Persion类,反射过程,反射后使用流程详解
  14. java爬虫工具xpath提取_爬虫 xpath (数据提取)
  15. Matlab中filter.m和filtfilt.m函数C语言实现
  16. [推荐收藏]MAC地址完美攻略(教你如何防止IP被盗用及绑定IP)
  17. java ico_Java图片处理:ico格式转 PNG/JPG等格式
  18. HDFS的读写流程步骤(附图文解析)
  19. 一些电脑使用的技巧和软件分享(电脑初学者必备)
  20. 【React Native】深入理解Native与RN通信原理

热门文章

  1. gim nerdtree 使用
  2. 计算机等级考试考什么
  3. Maven 中央仓库
  4. VUE+Element实现草稿箱
  5. 网络模拟EVE-NG配置教程PPT
  6. IO性能测试工具使用
  7. 树莓派安装python3.6_python3.6安装pycrypto
  8. python驱动级模拟按键_python如何实现驱动级的模拟按键?
  9. 【YOLOV5-6.x讲解】数据增强方式介绍+代码实现
  10. 2018 Multi-University Training Contest 8 1010 Taotao Picks Apples【二分】