Spark API 详解(转)
spark中,slice=partition,一个slice对应一个task,启动task的数量上限取决于集群中核的数量
sc.parallelize(0 until numMappers, numMappers)中的numMappers就是slice的数量[1]
下面的图来自[3]
在spark调优中,增大RDD分区数目,可以增大任务并行度
map(function)
map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。
任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
scala> val a = sc.parallelize(1 to 9, 3)//这里的3表示的是把数据分成几份
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24scala> val b = a.map(x => x*2)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:25scala> a.collect
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> b.collect
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
当然map也可以把Key变成Key-Value对
scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24scala> val b = a.map(x => (x, 1))
b: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:25scala> b.collect.foreach(println(_))
(dog,1)
(tiger,1)
(lion,1)
(cat,1)
(panther,1)
( eagle,1)
mapPartitions(function)
map()的输入函数是应用于RDD中每个元素,
而mapPartitions()的输入函数是应用于每个Partition(一个RDD可以对应于多个Partition)
关于这个具体案例可以参见[4]
mapValues(function)
原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD
scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> val b = a.map(x => (x.length, x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[1] at map at <console>:25scala> b.mapValues("x" + _ + "x").collect
res0: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (6,x eaglex))
mapWith和flatMapWith
感觉用得不多,参考http://blog.csdn.net/jewes/article/details/39896301
flatMap(function)
与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素
scala> val a = sc.parallelize(1 to 4, 2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24scala> val b = a.flatMap(x => 1 to x)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at flatMap at <console>:25scala> b.collect
res1: Array[Int] = Array(1,1, 2,1, 2, 3, 1, 2, 3, 4)
注意,上面的flatMap中的字符串,其实是一个函数
flatMapValues(function)
scala> val a = sc.parallelize(List((1,2),(3,4),(5,6)))
a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24scala> val b = a.flatMapValues(x=>1 to x)
b: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[8] at flatMapValues at <console>:25scala> b.collect.foreach(println(_))
(1,1)
(1,2)
(3,1)
(3,2)
(3,3)
(3,4)
(5,1)
(5,2)
(5,3)
(5,4)
(5,5)
(5,6)
参考链接:
[1]https://blog.csdn.net/robbyo/article/details/50623339
[2]https://blog.csdn.net/guotong1988/article/details/50555185
[3]https://blog.csdn.net/weixin_38750084/article/details/83021819
[4]https://blog.csdn.net/appleyuchi/article/details/88371867
Spark API 详解(转)相关推荐
- Spark API 详解/大白话解释 之 map、mapPartitions、mapValues、mapWith、flatMap、flatMapWith、flatMapValues
重点看mapPartitions 本文转自http://blog.csdn.net/guotong1988/article/details/50555185,所有权力归原作者所有. map(funct ...
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战025--DateSet实用API详解025
一.Flink DateSet定制API详解(JAVA版) -002 flatMap 以element为粒度,对element进行1:n的转化. 执行程序: package code.book.bat ...
- 百度PaddleOCR及云平台OCR API详解及示例
百度PaddleOCR及云平台OCR API详解及示例 目录 百度PaddleOCR及云平台OCR API详解及示例 使用百度开源的PaddleOCR 多个开源代码库比较
- Java 8 Stream API详解--转
原文地址:http://blog.csdn.net/chszs/article/details/47038607 Java 8 Stream API详解 一.Stream API介绍 Java 8引入 ...
- 【小白学PyTorch】扩展之Tensorflow2.0 | 21 Keras的API详解(下)池化、Normalization
<<小白学PyTorch>> 扩展之Tensorflow2.0 | 21 Keras的API详解(上)卷积.激活.初始化.正则 扩展之Tensorflow2.0 | 20 TF ...
- Android复习14【高级编程:推荐网址、抠图片上的某一角下来、Bitmap引起的OOM问题、三个绘图工具类详解、画线条、Canvas API详解(平移、旋转、缩放、倾斜)、矩阵详解】
目 录 推荐网址 抠图片上的某一角下来 8.2.2 Bitmap引起的OOM问题 8.3.1 三个绘图工具类详解 画线条 8.3.16 Canvas API详解(Part 1) 1.transla ...
- EXT核心API详解(二)-Array/Date/Function/Number/String
EXT核心API详解(二)-Array/Date/Function/Number/String Array类 indexOf( Object o ) Number object是否在数组中,找不到返 ...
- ServletFileUpload API详解
ServletFileUpload1.ServletFileUpload upload=new ServletFileUpload(factory);创建一个上传工具,指定使用缓存区与临时文件存储位置 ...
- DiskFileItemFactory API详解
核心API介绍1.DiskFileItemFactory作用:可以设置缓存大小以及临时文件保存位置. 默认缓存大小是 10240(10k).临时文件默认存储在系统的临时文件目录下.(可以在环境变量中查 ...
最新文章
- http://blog.csdn.net/lovejavaydj/article/details/6
- 非库存采购的自动记帐
- 科大星云诗社动态20210329
- 靠 GitHub 打赏谋生的程序员,他们是怎么做的?
- animateWithDuration
- 设计模式--抽象工厂(个人笔记)
- 伪静态在webconfig中配置
- geotools将shp数据存入postgres
- 速读原著-TCP/IP(互联网与实现)
- 如何把桌面计算机和回收站隐藏,对!回收站图标没了,教你怎么隐藏回收站保护个人隐私...
- 服务器无法取消指令方块显示,我的世界服务器如何关掉命令方块的提示(如图)...
- rsyslog-trouble: imjournal: 26794 messages lost due to rate-limiting
- 关于HDR的学习笔记
- 罗永浩宣布独家签约抖音 称希望成抖音一哥
- 【新书推荐】【2018】有源集成天线的设计与应用
- matlab频谱分析详解
- 零基础编程教学实录-001 二进制与计算机
- 人工智能大会爆火的“数字员工”究竟是什么?
- 企业如何实现精细化人员管理?五大业务场景值得关注
- 算法详解——后缀自动机