文章目录

  • 常见的Action算子
  • 常见分区操作算子

常见的Action算子

  • countByKey算子:统计Key出现的次数,部分代码如下:
rdd_file = sc.textFile("../Data/input/words.txt")
rdd_map = rdd_file.flatMap(lambda line: line.split(" ")).map(lambda x:(x, 1))
rdd_count = rdd_map.countByKey()
print(rdd_count)
print(type(rdd_count))
# 返回结果为字典
# defaultdict(<class 'int'>, {'Apple': 4, 'Banana': 5, 'Orange': 4, 'Peach': 2})
# <class 'collections.defaultdict'>
  • collect算子:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象。RDD是分布式对象,数据量可以很大,所以用这个算子之前需要知道如果数据集结果很大,就会把driver内存撑爆,出现oom。

  • reduce算子:对RDD数据集按照传入的逻辑进行聚合操作,部分代码如下:

rdd = sc.parallelize(range(1,10))
rdd_reduce = rdd.reduce(lambda a,b : a+b)
print(rdd_reduce)
# 45
  • fold算子:和reduce一样接收传入逻辑进行聚合,聚合是带有初始值的。这个初始值既要作用在分区内,也要作用在分区间,部分代码如下:
rdd = sc.parallelize(range(1,10),3)
rdd_reduce = rdd.fold(10,lambda a,b : a+b)
print(rdd_reduce)
# 1 分为[1,2,3] [4,5,6] [7,8,9]
# 2 每个分区+10
# 3 最后汇总再+10 得到结果85
  • first算子:取出RDD第一个元素
sc.parallelize([1,2,3,4]).first()
# 1
  • take算子:取出RDD的前N个元素
sc.parallelize([1,2,3,4],3).take(2)
# [1,2]
  • top算子:对RDD元素进行降序排序,取前N个
sc.parallelize([1,2,3,4],3).top(2)
# [4, 3]
  • count算子:计算RDD有多少条数据,返回值为一个数字
sc.parallelize([1,2,3,4],3).count()
# 4
  • takeSample算子:随机抽样RDD的数据,部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6,7,6,5,4,3,2,1],1)
rdd_takeSample1 = rdd.takeSample(True, 18)
print(rdd_takeSample1)
rdd_takeSample2 = rdd.takeSample(False, 18)
print(rdd_takeSample2)# [1, 1, 1, 4, 6, 4, 1, 1, 5, 4, 6, 7, 5, 1, 6, 6, 6, 2]
# [2, 4, 2, 5, 5, 6, 3, 7, 4, 1, 6, 3, 1]
# 参数一:bool型,True表示运行取同一个数据,False表示不允许取同一个数据,与数据内容无关,是否重复表示的是同一个位置的数据。
# 参数二:抽样的数目(设置为false则无法超越RDD总数)
# 参数三:随机种子(一般不需要传参)
  • takeOrdered算子:对RDD排序取前N个,部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6,7])
#升序
rdd_takeOrdered1 = rdd.takeOrdered(4)
#降序
rdd_takeOrdered2 = rdd.takeOrdered(4,lambda x : -x)print(rdd_takeOrdered1)
print(rdd_takeOrdered2)
# [1, 2, 3, 4]
# [7, 6, 5, 4]
  • foreach算子:对RDD的每个元素,执行逻辑操作与map类似,但是这个方法没有返回值。如果想显示值,只能在里面自行打印(无需经过Driver,直接在Executor打印效率更高)。
rdd = sc.parallelize([1,2,3,4,5,6,7],1)
rdd1 = rdd.foreach(lambda x : 2*x +1)
rdd2 = rdd.foreach(lambda x : print(2*x +1))
print(rdd1)
3
5
7
9
11
13
15
None
  • saveAsTextFile算子:保存文件API,分布式执行,不经过Driver,每个分区所在的Executor直接控制数据写出到目标文件系统中,每个分区产生1个结果文件。
#设置为三个分区
rdd_file = sc.textFile("hdfs://node1:8020/Test/WordCount.txt",3)
rdd_words = rdd_file.flatMap(lambda line: line.split(" "))
rdd_map = rdd_words.map(lambda x:(x, 1))
rdd_total = rdd_map.reduceByKey(lambda a,b: a + b)
rdd_rs = rdd_total.saveAsTextFile("hdfs://node1:8020/Test/word_rs1")

结果如下图所示在HDFS WebUI上查看

常见分区操作算子

  • mapPartitions算子:与map相似,只是一次被传递的是一整个分区的数据,虽然在执行次数上与map相同,但是可以因为减少了网络io的传输次数,效率会大大的提高。部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6],3)
def func(iter):rs = list()for it  in iter:rs.append(2 * it + 1)return rs
rdd_part = rdd.mapPartitions(func)
rdd_rs = rdd_part.collect()
print(rdd_rs)# [3, 5, 7, 9, 11, 13]
  • foreachPartition算子:与普通foreach一样,只是一次被传递的是一整个分区的数据,部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6],3)
# 因为没有返回值所以不需要return
def func(iter):rs = list()for it  in iter:rs.append(2 * it + 1)print(rs)rdd_part = rdd.foreachPartition(func)# [3, 5]
# [7, 9]
# [11, 13]
  • partitionBy算子:对RDD进行自定义分区操作,部分代码如下
# 参数1 重新分区后有几个分区
# 参数2 自定义分区规则,函数传入(返回编号为int类型,分区编号从0开始,不要超过分区数)
rdd = sc.parallelize([('a',1),('b',2),('c',3),('d',4),('e',5),('f',6)])def func(key):if key == 'a' or key == 'b' : return 0if key == 'c' or key == 'd' : return 1return 2rdd_part = rdd.partitionBy(3,func)
rdd_rs = rdd_part.glom().collect()
print(rdd_rs)# [[('a', 1), ('b', 2)], [('c', 3), ('d', 4)], [('e', 5), ('f', 6)]]
  • repartition算子:对RDD的分区执行重新分区。不建议使用此算子,除非做全局排序的时候,将其设置为1。如果修改尽量减少,不要增加,增加会导致shuffle。不管是增加还是减少都会影响并行计算(内存迭代并行的管道数量),部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6],3)
rdd_re1 = rdd.getNumPartitions()
print(rdd_re1)
rdd_re2 = rdd.repartition(1).getNumPartitions()
print(rdd_re2)
rdd_re3 = rdd.repartition(5).getNumPartitions()
print(rdd_re3)
# 3
# 1
# 5
  • coalesce算子:对分区数量进行增减,部分代码如下:
# 参数1:分区数
# 参数2:Bool True表示允许shuffle,False表示不允许(默认)。
rdd_re4 = rdd.coalesce(1).getNumPartitions()
print(rdd_re4)
rdd_re5 = rdd.coalesce(5).getNumPartitions()
print(rdd_re5)
rdd_re6 = rdd.coalesce(5,shuffle=True).getNumPartitions()
print(rdd_re6)
# 1
# 3 没有加shuffle=True这里有个API安全机制,分区不会增加
# 5
  • 在源码中我们可以发现reparation算子底层调用的就是coalesce算子,只不过shuffle定义为true。源码如下:
def repartition(self, numPartitions):return self.coalesce(numPartitions, shuffle=True)

Spark RDD编程模型及算子介绍(二)相关推荐

  1. spark应用程序转换_Spark—RDD编程常用转换算子代码实例

    Spark-RDD编程常用转换算子代码实例 Spark rdd 常用 Transformation 实例: 1.def map[U: ClassTag](f: T => U): RDD[U]  ...

  2. RDD编程模型笔记(一)

    1.RDD编程模型 在 Spark 中,RDD 被表示为对象,通过对象上的方法调用来对 RDD 进行转换. 在Spark中,只有遇到action,才会执行 RDD 的计算(即延迟计算),这样在运行时可 ...

  3. spark RDD编程 第1关:数据去重

    Spark RDD编程初级实践(一) 任务描述 本关任务:编写Spark独立应用程序实现数据去重. 相关知识 为了完成本关任务,你需要掌握:RDD的创建:RDD的转换操作:RDD的行动操作. RDD的 ...

  4. 大数据培训课程RDD编程模型

    编程模型 在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换.经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可 ...

  5. spark RDD编程

    RDD,也就是 弹性分布式数据集 的简称,它是spark处理的分布式元素集合. 对于RDD的操作包括: 创建RDD(从外部数据或者内存中的数据),转化RDD(利用筛选条件等),调用RDD操作求值. * ...

  6. Spark RDD编程API

    1. RDD的算子分类 Transformation(转换):根据数据集创建一个新的数据集,计算后返回一个新RDD:例如:一个rdd进行map操作后生了一个新的rdd. Action(动作):对rdd ...

  7. Spark 编程模型(上)

    Spark的编程模型 核心概念(注意对比MR里的概念来学习) Spark Application的组成 Spark Application基本概念 Spark Application编程模型 回顾sc ...

  8. [Kafka与Spark集成系列三] Spark编程模型

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  9. 从Storm到Flink:大数据处理的开源系统及编程模型(文末福利)

    本文节选自CCF大数据教材系列丛书之<大数据处理>,本书由华中科技大学金海教授主编,包括大数据处理基础技术.大数据处理编程与典型应用处理.大数据处理系统与优化三个方面.本教材以大数据处理编 ...

最新文章

  1. 使用命名管道的OVERLAPPED方式实现非阻塞模式编程 .
  2. 手机扫一扫就能“隔空移物”?AR炫酷新玩法,快来解锁新技能吧!
  3. Caffe官方教程翻译(1):LeNet MNIST Tutorial
  4. linux系统管理试卷必修B卷,2013-2014Linux系统管理试卷
  5. 6个班的计算机课平均分给,计算机六道题
  6. 读《纸本书变电子书是很小的事》有感
  7. Myrrix——基于Mahout的开源推荐系统
  8. [理解ASP.NET Core框架]一个五十行的控制台Web
  9. win10安装打印机驱动的方法,电脑打印机驱动安装教程
  10. Win10 1903:美化鼠标指针
  11. matlab三轴机械臂,MATLAB 中的机械臂算法——路径规划
  12. (二)VISIO 中间带箭头的弧线怎么画
  13. Go在迅雷P2P连通系统中的性能优化实践-朱文
  14. 【行业云说直播间】-智慧能源 低碳未来正式上线
  15. veracrypt取消加密卷_VeraCrypt如何创建文件型加密卷?保护文件资料安全
  16. 外汇天眼:外汇储备减少对新兴市场货币构成风险
  17. Android入门之APP启动流程
  18. 佛法摄持下的技术人生
  19. 插入数据库insert into 出错 unrecognized token:
  20. 开启猪年大运——风口上的猪

热门文章

  1. bootcamp空间不足_mac安装双系统 提示磁盘上没有足够的可用空间的解决方法
  2. WPS word文档插入图片显示不全
  3. 攻克弹唱第七课(如何弹奏neon)
  4. 总结在mac电脑上安装brew
  5. pdfbox pdf转图片,图片长宽大小与pdf保持一致
  6. tensorflow中squeeze与expand_dims
  7. MathWorks官方MATLAB/Simulink基础入门视频教程 笔记(matlab基础)
  8. AI芯片产业生态及竞争格局:英伟达、谷歌、BAT实力拆解对比
  9. 用python画盒图_[519]matplotlib(四)|Python中用matplotlib绘制盒状图(Boxplots)和小提琴图(Violinplots)...
  10. 【代码超详解】LightOJ 1197 Help Hanzo(区间质数筛法)