一、RDD的创建

1.1. 通过并行化集合创建 (本地对象转分布式RDD)

data = [1, 2, 3, 4, 5, 6, 7, 8, 9]
rdd = sc.parallelize(data, numSlices=3)
print(rdd.getNumPartitions())   #打印分区数
print(rdd.glom().collect())

numSlices表示设置分区数,不设置时会根据CPU核来确定

1.2 读取外部数据源 (读取文件)

path = '  '      # hdfs://ip:8020/
rdd = sc.textFile(path, minPartitions)

外部数据路径可以是本地文件数据也可以是分布式文件数据(HDFS,S3等)
textFile中的minPartitions设置的为最小分区数,若设置太大,此参数将无效。

1.3. 读取一堆小文件

rdd = sc.wholeTextFiles(path, minPartitions)

这个API偏向于少量分区读取数据。因为这个API表明了自己是小文件读取专用,若文件的数据很小分区很多,会导致shuffle的几率更高.所以尽量少分区读取数据。

二、 Transformation算子

使用Transformation算子的RDD返回值仍然是RDD。这类算子的lazy 懒加载的,没有Action算子,Transformation算子是不工作的。
2.1. map
将RDD中得数据一条条地进行处理,并返回新的RDD。

dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])def map_func(data):    return data * 10print(dataRDD.map(map_func).collect())print(dataRDD.map(lambda x: x * 10).collect())  # 匿名lambda写法

RESULT: [10, 20, 30, 40, 50, 60, 70, 80, 90]


2.2. flatMap
对RDD执行map操作,然后进行解除嵌套操作。

dataRDD = sc.parallelize(["Hadoop Spark Scala", "Java Hadoop"])flatMapRDD = dataRDD.flatMap(lambda x: x.split(' '))print(flatMapRDD.collect())

RESULT: [‘Hadoop’, ‘Spark’, ‘Scala’, ‘Java’, ‘Hadoop’]


2.3. filter
过滤想要的数据

dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])result = dataRDD.filter(lambda x: x % 2 == 0).collect()print(result)

RESULT: [2, 4, 6, 8]


2.4. distinct
对RDD数据进行去重处理从返回新的RDD, 此算子一般不需要传参。

dataRDD = sc.parallelize([1, 2, 1, 1, 1, 3, 4, 2, 3, 5, 5, 5, 5, 6, 7, 8, 9])result = dataRDD.distinct().collect()print(result)

RESULT: [1, 2, 3, 4, 5, 6, 7, 8, 9]


2.5. union
将两个RDD合并成一个RDD

dataRDD1 = sc.parallelize([1, 2, 1, 1, 1, 3, 4, 2, 3])dataRDD2 = sc.parallelize([5, 5, 5, 5, 6, 7, 8, 9])dataRDD = dataRDD1.union(dataRDD2).collect()print(dataRDD)

RESULT: [1, 2, 1, 1, 1, 3, 4, 2, 3, 5, 5, 5, 5, 6, 7, 8, 9]


2.6. intersection
求两个RDD的交集

dataRDD1 = sc.parallelize([1, 2, 1, 1, 1, 3, 4, 2, 3])dataRDD2 = sc.parallelize([5, 5, 4, 1, 6, 7, 3, 9])dataRDD = dataRDD1.intersection(dataRDD2).collect()print(dataRDD)

RESULT: [1, 3, 4]


2.7. join
对两个RDD执行jion操作(内外链接),jion算子只能用于kv型RDD
jion 内连接
leftOuterJoin 左外连接
rightOuterJoin 右外连接

dataRDD1 = sc.parallelize([(1, 'hadoop'), (2, 'spark'), (3, 'flink')])
dataRDD2 = sc.parallelize([(1, 'java'), (2, 'python')])# 内连接
dataJionRDD = dataRDD1.join(dataRDD2).collect()
# [(1, ('hadoop', 'java')), (2, ('spark', 'python'))]# 左外连接
dataLeftJionRDD = dataRDD1.leftOuterJoin(dataRDD2).collect()
# [(1, ('hadoop', 'java')), (2, ('spark', 'python')), (3, ('flink', None))]

2.8. glom
按照分区将RDD数据加上嵌套

dataRDD = sc.parallelize([1, 2, 1, 1, 1, 3, 4, 2, 3, 5, 5, 5, 5, 6, 7, 8, 9], 3)
result = dataRDD.glom().collect()# [[1, 2, 1, 1, 1], [3, 4, 2, 3, 5], [5, 5, 5, 6, 7, 8, 9]]

2.9. reduceByKey
针对K-V型的RDD,先根据key分组,然后根据提供的聚合逻辑对value进行聚合操作。

dataRDD = sc.parallelize(["Hadoop Spark Scala", "Java Hadoop"])flatMapRDD = dataRDD.flatMap(lambda x: x.split(' '))kvRDD = flatMapRDD.map(lambda x: (x, 1))result = kvRDD.reduceByKey(lambda a, b: a + b).collect()print(result)

RESULT: [(‘Java’, 1), (‘Hadoop’, 2), (‘Spark’, 1), (‘Scala’, 1)]


2.10. groupBy
将RDD数据进行分组

dataRDD = sc.parallelize([(1, 'hadoop'), (1, 'spark'), (2, 'java'), (2,
'scala'), (2, 'python'), (3, 'SQL')])# 根据key进行分组
groupByKeyRDD = dataRDD.groupBy(lambda x: x[0])result = groupByKeyRDD.map(lambda x: (x[0], list(x[1]))).collect()print(result)

RESULT: [(1, [(1, ‘hadoop’), (1, ‘spark’)]), (2, [(2, ‘java’), (2, ‘scala’), (2, ‘python’)]), (3, [(3, ‘SQL’)])]

# 根据value进行分组
kv2vkRDD = dataRDD.map(lambda x: (x[1], x[0]))groupByValueRDD = kv2vkRDD.groupBy(lambda x: x[1])result = groupByValueRDD.map(lambda x: (x[0], list(x[1]))).collect()print(result)

RESULT: [(1, [(‘hadoop’, 1), (‘spark’, 1)]), (2, [(‘java’, 2), (‘scala’, 2), (‘python’, 2)]), (3, [(‘SQL’, 3)])]


2.11. groupByKey
针对K-V型RDD,自动按照key进行分组

dataRDD = sc.parallelize([(1, 'hadoop'), (1, 'spark'), (2, 'java'), (2,
'scala'), (2, 'python'), (3, 'SQL')])
groupByKeyRDD = dataRDD.groupByKey()
result = groupByKeyRDD.map(lambda x: (x[0], list(x[1]))).collect()# [(1, ['hadoop', 'spark']), (2, ['java', 'scala', 'python']), (3, ['SQL'])]

2.12. sortBy
按照指定的排序依据对RDD数据进行排序

dataRDD = sc.parallelize([(3, 'SQL'), (1, 'hadoop'), (2, 'java'), (1,
'spark'), (2, 'scala'), (2, 'python')])
sortByKeyRDD = dataRDD.sortBy(lambda x: x[0], ascending=True)
result = sortByKeyRDD.collect()# [(1, 'hadoop'), (1, 'spark'), (2, 'java'), (2, 'scala'), (2, 'python'), (3, 'SQL')]

参数ascending为True表示升序,False表示降序


2.13. sortByKey
按照key对RDD数据进行排序

dataRDD = sc.parallelize([(3, 'SQL'), (1, 'hadoop'), (2, 'java'), (1,
'spark'), (2, 'scala'), (2, 'python')])
sortByKeyRDD = dataRDD.sortByKey(ascending=True)
result = sortByKeyRDD.collect()# [(1, 'hadoop'), (1, 'spark'), (2, 'java'), (2, 'scala'), (2, 'python'), (3, 'SQL')]

Spark RDD常用算子-Transformation(Python版)相关推荐

  1. Spark RDD常用算子使用总结

    文章目录 概述 Transformation(转换算子) 1. map 2. flatMap 3. filter 4. mapPartitions 5. mapPartitionsWithIndex ...

  2. 大数据——Spark RDD常用算子总结

    Spark的核心是建立在同一的抽象弹性分布式数据集(Resilient Distributed Datasets,RDD)之上的,这使得Spark的各个组件可以无缝的进行集成,能够在同一个应用程序中完 ...

  3. 【Spark】Spark的常用算子

    Spark的常用算子 目录内容 Spark的常用算子 一.转换算子(Transformation) 二.行动算子(Action) 三.键值对算子(PairRDDFunctions) 四.文件系统算子( ...

  4. 2021年大数据Spark(十五):Spark Core的RDD常用算子

    目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 ​​​​​​​聚合函数算子 ​​​​​​​Scala集合中的聚合函数 ​​​​​ ...

  5. Spark一路火花带闪电——Pair RDD常用算子(参数及其返回值)探究

    文章目录 转化算子 行动算子 转化算子 以键值对集合{(1,2),(3,4),(3,6)}为例 RDD[U,T]注意下面的函数,对应U和T reduceByKey(f:(T,T) => T):R ...

  6. Spark RDD distinct 算子

    一.引言 使用spark很久第一次用到 distinct 算子,趁热打铁熟悉一下 distinct 的操作. 二.源码 distinct 算子会返回一个新的 RDD,这里的每一个元素都是唯一的不会有重 ...

  7. spark mongo java_Spark Mongodb集成 - Python版

    Spark是最近比较火的数据处理平台,相对于Hadoop的Mapreduce(MR),既具有运行速度的优势,又打破的Hadoop MR交互性差的局限,同时,其机器学习组件MLlib可以极大的满足数据处 ...

  8. Spark RDD 复杂算子

    aggregateByKey 是Transformation reduceByKey 是Transformation filter 是Transformation flatMap 是Transform ...

  9. python编写spark程序 视频_【Python版pyspark】Spark大数据基础入门视频课程

    [注意:本课程只包含pyspark系列课程的基础入门部分] Python版本的pyspark是学习Python的人的福音,为广大的Python开发人员提供了一个使用Python调用Spark接口处理大 ...

  10. paddle动态图自定义算子(python版)

    1.动态图自定义Python算子 先说一下辅助用的,这个接口: Paddle 通过 PyLayer 接口和PyLayerContext接口支持动态图的Python端自定义OP. 自定义好的算子如何使用 ...

最新文章

  1. 浙大开源 | VID-Fusion: 用于精确外力估计的鲁棒视觉惯性动力学里程计
  2. 天然气阶梯是按年还是按月_按年算账 多退少补 你的年度个税应该是怎么算?...
  3. c语言编写两个矩阵的乘积,如何用c语言编写两个模糊矩阵相乘的程序?
  4. stm32烧录软件_使用华为LiteOS Studio开发STM32物联网工程1
  5. Python回归 岭回归(Ridge Regression)
  6. mysql中同一天入职怎么表示_ORACLE入职考试题及答案
  7. android导航使用教程,android BottomNavigationView的简单使用教程
  8. Python编程实现对拉格朗日和KKT条件求极值
  9. 如何使用计算机小学生课件,小学生计算机课件
  10. PLUTO SDR入门系列之九:强大的开源软件“gnu radio”
  11. android电视 怎么调电视机的信号源,电视怎么调信号源
  12. DVB机顶盒工作原理
  13. 〖Python 数据库开发实战 - MySQL篇⑪〗- 修改数据表结构
  14. 计算机桌面ie图标无法删除,桌面ie图标删除不了的解决方法
  15. java--删除TMP文件
  16. 没项目实战经验?分享自学练手的软件测试项目实战+数据库+接口,部署超级简单
  17. S3C2440下clock的源码分析
  18. 嵌入式Linux是学驱动还是应用,嵌入式LINUX应该学什么?做驱动还是做应用
  19. 匿怨而友其人,左丘明耻之,丘亦耻之。我不是圣人,我假装什么都没发生。
  20. 简单来说,我们常听到的 22nm、14nm、10nm 究竟是什么意思?

热门文章

  1. 车载以太网交换机功能和应用案例汇总, 适用于AVB/TSN, 802.1AS(gPTP时钟同步)
  2. xamp环境搭建Pikachu实验环境搭建
  3. 【Excel】一、Excel入门指导
  4. 请问Bat文件是用什么什么语言写的?
  5. 蒋志平从零到亿的创业路
  6. 小程序悬浮按钮可拖动自动靠边
  7. 数字人民币支付新选择 没有网络时也能使用
  8. 人民的名义关系可视化展示
  9. 爆款养成思路,教你利用砍价做出刷屏活动!
  10. 作为程序员,应该更关注代码质量还是只需要完成功能就好了?