Spark RDD常用算子-Transformation(Python版)
一、RDD的创建
1.1. 通过并行化集合创建 (本地对象转分布式RDD)
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]
rdd = sc.parallelize(data, numSlices=3)
print(rdd.getNumPartitions()) #打印分区数
print(rdd.glom().collect())
numSlices表示设置分区数,不设置时会根据CPU核来确定
1.2 读取外部数据源 (读取文件)
path = ' ' # hdfs://ip:8020/
rdd = sc.textFile(path, minPartitions)
外部数据路径可以是本地文件数据也可以是分布式文件数据(HDFS,S3等)
textFile中的minPartitions设置的为最小分区数,若设置太大,此参数将无效。
1.3. 读取一堆小文件
rdd = sc.wholeTextFiles(path, minPartitions)
这个API偏向于少量分区读取数据。因为这个API表明了自己是小文件读取专用,若文件的数据很小分区很多,会导致shuffle的几率更高.所以尽量少分区读取数据。
二、 Transformation算子
使用Transformation算子的RDD返回值仍然是RDD。这类算子的lazy 懒加载的,没有Action算子,Transformation算子是不工作的。
2.1. map
将RDD中得数据一条条地进行处理,并返回新的RDD。
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])def map_func(data): return data * 10print(dataRDD.map(map_func).collect())print(dataRDD.map(lambda x: x * 10).collect()) # 匿名lambda写法
RESULT: [10, 20, 30, 40, 50, 60, 70, 80, 90]
2.2. flatMap
对RDD执行map操作,然后进行解除嵌套操作。
dataRDD = sc.parallelize(["Hadoop Spark Scala", "Java Hadoop"])flatMapRDD = dataRDD.flatMap(lambda x: x.split(' '))print(flatMapRDD.collect())
RESULT: [‘Hadoop’, ‘Spark’, ‘Scala’, ‘Java’, ‘Hadoop’]
2.3. filter
过滤想要的数据
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])result = dataRDD.filter(lambda x: x % 2 == 0).collect()print(result)
RESULT: [2, 4, 6, 8]
2.4. distinct
对RDD数据进行去重处理从返回新的RDD, 此算子一般不需要传参。
dataRDD = sc.parallelize([1, 2, 1, 1, 1, 3, 4, 2, 3, 5, 5, 5, 5, 6, 7, 8, 9])result = dataRDD.distinct().collect()print(result)
RESULT: [1, 2, 3, 4, 5, 6, 7, 8, 9]
2.5. union
将两个RDD合并成一个RDD
dataRDD1 = sc.parallelize([1, 2, 1, 1, 1, 3, 4, 2, 3])dataRDD2 = sc.parallelize([5, 5, 5, 5, 6, 7, 8, 9])dataRDD = dataRDD1.union(dataRDD2).collect()print(dataRDD)
RESULT: [1, 2, 1, 1, 1, 3, 4, 2, 3, 5, 5, 5, 5, 6, 7, 8, 9]
2.6. intersection
求两个RDD的交集
dataRDD1 = sc.parallelize([1, 2, 1, 1, 1, 3, 4, 2, 3])dataRDD2 = sc.parallelize([5, 5, 4, 1, 6, 7, 3, 9])dataRDD = dataRDD1.intersection(dataRDD2).collect()print(dataRDD)
RESULT: [1, 3, 4]
2.7. join
对两个RDD执行jion操作(内外链接),jion算子只能用于kv型RDD
jion 内连接
leftOuterJoin 左外连接
rightOuterJoin 右外连接
dataRDD1 = sc.parallelize([(1, 'hadoop'), (2, 'spark'), (3, 'flink')])
dataRDD2 = sc.parallelize([(1, 'java'), (2, 'python')])# 内连接
dataJionRDD = dataRDD1.join(dataRDD2).collect()
# [(1, ('hadoop', 'java')), (2, ('spark', 'python'))]# 左外连接
dataLeftJionRDD = dataRDD1.leftOuterJoin(dataRDD2).collect()
# [(1, ('hadoop', 'java')), (2, ('spark', 'python')), (3, ('flink', None))]
2.8. glom
按照分区将RDD数据加上嵌套
dataRDD = sc.parallelize([1, 2, 1, 1, 1, 3, 4, 2, 3, 5, 5, 5, 5, 6, 7, 8, 9], 3)
result = dataRDD.glom().collect()# [[1, 2, 1, 1, 1], [3, 4, 2, 3, 5], [5, 5, 5, 6, 7, 8, 9]]
2.9. reduceByKey
针对K-V型的RDD,先根据key分组,然后根据提供的聚合逻辑对value进行聚合操作。
dataRDD = sc.parallelize(["Hadoop Spark Scala", "Java Hadoop"])flatMapRDD = dataRDD.flatMap(lambda x: x.split(' '))kvRDD = flatMapRDD.map(lambda x: (x, 1))result = kvRDD.reduceByKey(lambda a, b: a + b).collect()print(result)
RESULT: [(‘Java’, 1), (‘Hadoop’, 2), (‘Spark’, 1), (‘Scala’, 1)]
2.10. groupBy
将RDD数据进行分组
dataRDD = sc.parallelize([(1, 'hadoop'), (1, 'spark'), (2, 'java'), (2,
'scala'), (2, 'python'), (3, 'SQL')])# 根据key进行分组
groupByKeyRDD = dataRDD.groupBy(lambda x: x[0])result = groupByKeyRDD.map(lambda x: (x[0], list(x[1]))).collect()print(result)
RESULT: [(1, [(1, ‘hadoop’), (1, ‘spark’)]), (2, [(2, ‘java’), (2, ‘scala’), (2, ‘python’)]), (3, [(3, ‘SQL’)])]
# 根据value进行分组
kv2vkRDD = dataRDD.map(lambda x: (x[1], x[0]))groupByValueRDD = kv2vkRDD.groupBy(lambda x: x[1])result = groupByValueRDD.map(lambda x: (x[0], list(x[1]))).collect()print(result)
RESULT: [(1, [(‘hadoop’, 1), (‘spark’, 1)]), (2, [(‘java’, 2), (‘scala’, 2), (‘python’, 2)]), (3, [(‘SQL’, 3)])]
2.11. groupByKey
针对K-V型RDD,自动按照key进行分组
dataRDD = sc.parallelize([(1, 'hadoop'), (1, 'spark'), (2, 'java'), (2,
'scala'), (2, 'python'), (3, 'SQL')])
groupByKeyRDD = dataRDD.groupByKey()
result = groupByKeyRDD.map(lambda x: (x[0], list(x[1]))).collect()# [(1, ['hadoop', 'spark']), (2, ['java', 'scala', 'python']), (3, ['SQL'])]
2.12. sortBy
按照指定的排序依据对RDD数据进行排序
dataRDD = sc.parallelize([(3, 'SQL'), (1, 'hadoop'), (2, 'java'), (1,
'spark'), (2, 'scala'), (2, 'python')])
sortByKeyRDD = dataRDD.sortBy(lambda x: x[0], ascending=True)
result = sortByKeyRDD.collect()# [(1, 'hadoop'), (1, 'spark'), (2, 'java'), (2, 'scala'), (2, 'python'), (3, 'SQL')]
参数ascending为True表示升序,False表示降序
2.13. sortByKey
按照key对RDD数据进行排序
dataRDD = sc.parallelize([(3, 'SQL'), (1, 'hadoop'), (2, 'java'), (1,
'spark'), (2, 'scala'), (2, 'python')])
sortByKeyRDD = dataRDD.sortByKey(ascending=True)
result = sortByKeyRDD.collect()# [(1, 'hadoop'), (1, 'spark'), (2, 'java'), (2, 'scala'), (2, 'python'), (3, 'SQL')]
Spark RDD常用算子-Transformation(Python版)相关推荐
- Spark RDD常用算子使用总结
文章目录 概述 Transformation(转换算子) 1. map 2. flatMap 3. filter 4. mapPartitions 5. mapPartitionsWithIndex ...
- 大数据——Spark RDD常用算子总结
Spark的核心是建立在同一的抽象弹性分布式数据集(Resilient Distributed Datasets,RDD)之上的,这使得Spark的各个组件可以无缝的进行集成,能够在同一个应用程序中完 ...
- 【Spark】Spark的常用算子
Spark的常用算子 目录内容 Spark的常用算子 一.转换算子(Transformation) 二.行动算子(Action) 三.键值对算子(PairRDDFunctions) 四.文件系统算子( ...
- 2021年大数据Spark(十五):Spark Core的RDD常用算子
目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 聚合函数算子 Scala集合中的聚合函数 ...
- Spark一路火花带闪电——Pair RDD常用算子(参数及其返回值)探究
文章目录 转化算子 行动算子 转化算子 以键值对集合{(1,2),(3,4),(3,6)}为例 RDD[U,T]注意下面的函数,对应U和T reduceByKey(f:(T,T) => T):R ...
- Spark RDD distinct 算子
一.引言 使用spark很久第一次用到 distinct 算子,趁热打铁熟悉一下 distinct 的操作. 二.源码 distinct 算子会返回一个新的 RDD,这里的每一个元素都是唯一的不会有重 ...
- spark mongo java_Spark Mongodb集成 - Python版
Spark是最近比较火的数据处理平台,相对于Hadoop的Mapreduce(MR),既具有运行速度的优势,又打破的Hadoop MR交互性差的局限,同时,其机器学习组件MLlib可以极大的满足数据处 ...
- Spark RDD 复杂算子
aggregateByKey 是Transformation reduceByKey 是Transformation filter 是Transformation flatMap 是Transform ...
- python编写spark程序 视频_【Python版pyspark】Spark大数据基础入门视频课程
[注意:本课程只包含pyspark系列课程的基础入门部分] Python版本的pyspark是学习Python的人的福音,为广大的Python开发人员提供了一个使用Python调用Spark接口处理大 ...
- paddle动态图自定义算子(python版)
1.动态图自定义Python算子 先说一下辅助用的,这个接口: Paddle 通过 PyLayer 接口和PyLayerContext接口支持动态图的Python端自定义OP. 自定义好的算子如何使用 ...
最新文章
- 浙大开源 | VID-Fusion: 用于精确外力估计的鲁棒视觉惯性动力学里程计
- 天然气阶梯是按年还是按月_按年算账 多退少补 你的年度个税应该是怎么算?...
- c语言编写两个矩阵的乘积,如何用c语言编写两个模糊矩阵相乘的程序?
- stm32烧录软件_使用华为LiteOS Studio开发STM32物联网工程1
- Python回归 岭回归(Ridge Regression)
- mysql中同一天入职怎么表示_ORACLE入职考试题及答案
- android导航使用教程,android BottomNavigationView的简单使用教程
- Python编程实现对拉格朗日和KKT条件求极值
- 如何使用计算机小学生课件,小学生计算机课件
- PLUTO SDR入门系列之九:强大的开源软件“gnu radio”
- android电视 怎么调电视机的信号源,电视怎么调信号源
- DVB机顶盒工作原理
- 〖Python 数据库开发实战 - MySQL篇⑪〗- 修改数据表结构
- 计算机桌面ie图标无法删除,桌面ie图标删除不了的解决方法
- java--删除TMP文件
- 没项目实战经验?分享自学练手的软件测试项目实战+数据库+接口,部署超级简单
- S3C2440下clock的源码分析
- 嵌入式Linux是学驱动还是应用,嵌入式LINUX应该学什么?做驱动还是做应用
- 匿怨而友其人,左丘明耻之,丘亦耻之。我不是圣人,我假装什么都没发生。
- 简单来说,我们常听到的 22nm、14nm、10nm 究竟是什么意思?