spark中,slice=partition,一个slice对应一个task,启动task的数量上限取决于集群中核的数量

sc.parallelize(0 until numMappers, numMappers)中的numMappers就是slice的数量[1]

下面的图来自[3]

在spark调优中,增大RDD分区数目,可以增大任务并行度

map(function) 
map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。

任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

scala> val a = sc.parallelize(1 to 9, 3)//这里的3表示的是把数据分成几份
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24scala> val b = a.map(x => x*2)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:25scala> a.collect
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> b.collect
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

当然map也可以把Key变成Key-Value对

scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24scala> val b = a.map(x => (x, 1))
b: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:25scala> b.collect.foreach(println(_))
(dog,1)
(tiger,1)
(lion,1)
(cat,1)
(panther,1)
( eagle,1)

mapPartitions(function) 
map()的输入函数是应用于RDD中每个元素,

而mapPartitions()的输入函数是应用于每个Partition(一个RDD可以对应于多个Partition)

关于这个具体案例可以参见[4]

mapValues(function) 
原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD

scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> val b = a.map(x => (x.length, x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[1] at map at <console>:25scala> b.mapValues("x" + _ + "x").collect
res0: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (6,x eaglex))

mapWith和flatMapWith 
感觉用得不多,参考http://blog.csdn.net/jewes/article/details/39896301

flatMap(function) 
与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素

scala> val a = sc.parallelize(1 to 4, 2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24scala> val b = a.flatMap(x => 1 to x)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at flatMap at <console>:25scala> b.collect
res1: Array[Int] = Array(1,1, 2,1, 2, 3, 1, 2, 3, 4)

注意,上面的flatMap中的字符串,其实是一个函数

flatMapValues(function)

scala> val a = sc.parallelize(List((1,2),(3,4),(5,6)))
a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24scala> val b = a.flatMapValues(x=>1 to x)
b: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[8] at flatMapValues at <console>:25scala> b.collect.foreach(println(_))
(1,1)
(1,2)
(3,1)
(3,2)
(3,3)
(3,4)
(5,1)
(5,2)
(5,3)
(5,4)
(5,5)
(5,6)

参考链接:

[1]https://blog.csdn.net/robbyo/article/details/50623339

[2]https://blog.csdn.net/guotong1988/article/details/50555185

[3]https://blog.csdn.net/weixin_38750084/article/details/83021819

[4]https://blog.csdn.net/appleyuchi/article/details/88371867

Spark API 详解(转)相关推荐

  1. Spark API 详解/大白话解释 之 map、mapPartitions、mapValues、mapWith、flatMap、flatMapWith、flatMapValues

    重点看mapPartitions 本文转自http://blog.csdn.net/guotong1988/article/details/50555185,所有权力归原作者所有. map(funct ...

  2. 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战025--DateSet实用API详解025

    一.Flink DateSet定制API详解(JAVA版) -002 flatMap 以element为粒度,对element进行1:n的转化. 执行程序: package code.book.bat ...

  3. 百度PaddleOCR及云平台OCR API详解及示例

    百度PaddleOCR及云平台OCR API详解及示例 目录 百度PaddleOCR及云平台OCR API详解及示例 使用百度开源的PaddleOCR 多个开源代码库比较

  4. Java 8 Stream API详解--转

    原文地址:http://blog.csdn.net/chszs/article/details/47038607 Java 8 Stream API详解 一.Stream API介绍 Java 8引入 ...

  5. 【小白学PyTorch】扩展之Tensorflow2.0 | 21 Keras的API详解(下)池化、Normalization

    <<小白学PyTorch>> 扩展之Tensorflow2.0 | 21 Keras的API详解(上)卷积.激活.初始化.正则 扩展之Tensorflow2.0 | 20 TF ...

  6. Android复习14【高级编程:推荐网址、抠图片上的某一角下来、Bitmap引起的OOM问题、三个绘图工具类详解、画线条、Canvas API详解(平移、旋转、缩放、倾斜)、矩阵详解】

    目   录 推荐网址 抠图片上的某一角下来 8.2.2 Bitmap引起的OOM问题 8.3.1 三个绘图工具类详解 画线条 8.3.16 Canvas API详解(Part 1) 1.transla ...

  7. EXT核心API详解(二)-Array/Date/Function/Number/String

    EXT核心API详解(二)-Array/Date/Function/Number/String Array类 indexOf( Object o )  Number object是否在数组中,找不到返 ...

  8. ServletFileUpload API详解

    ServletFileUpload1.ServletFileUpload upload=new ServletFileUpload(factory);创建一个上传工具,指定使用缓存区与临时文件存储位置 ...

  9. DiskFileItemFactory API详解

    核心API介绍1.DiskFileItemFactory作用:可以设置缓存大小以及临时文件保存位置. 默认缓存大小是 10240(10k).临时文件默认存储在系统的临时文件目录下.(可以在环境变量中查 ...

最新文章

  1. http://blog.csdn.net/lovejavaydj/article/details/6
  2. 非库存采购的自动记帐
  3. 科大星云诗社动态20210329
  4. 靠 GitHub 打赏谋生的程序员,他们是怎么做的?
  5. animateWithDuration
  6. 设计模式--抽象工厂(个人笔记)
  7. 伪静态在webconfig中配置
  8. geotools将shp数据存入postgres
  9. 速读原著-TCP/IP(互联网与实现)
  10. 如何把桌面计算机和回收站隐藏,对!回收站图标没了,教你怎么隐藏回收站保护个人隐私...
  11. 服务器无法取消指令方块显示,我的世界服务器如何关掉命令方块的提示(如图)...
  12. rsyslog-trouble: imjournal: 26794 messages lost due to rate-limiting
  13. 关于HDR的学习笔记
  14. 罗永浩宣布独家签约抖音 称希望成抖音一哥
  15. 【新书推荐】【2018】有源集成天线的设计与应用
  16. matlab频谱分析详解
  17. 零基础编程教学实录-001 二进制与计算机
  18. 人工智能大会爆火的“数字员工”究竟是什么?
  19. 企业如何实现精细化人员管理?五大业务场景值得关注
  20. 算法详解——后缀自动机

热门文章

  1. JS快速获取图片宽高的方法
  2. 为了方便大家下载四级资料,开通FTP
  3. (八)React原理
  4. lda主题词评论python_Python之酒店评论主题提取LDA主题模型
  5. 实现一个MVVM和promise
  6. 判断数组中是否包含某个元素,判断对象中是否包含某个属性,判断字符串中是否包含某个字符串片段
  7. sql中的where 1=1有什么用?
  8. Angular使用中的编码tips(持续更)
  9. python版本及ML库
  10. iWiscloud智慧家居控制中心