【极简spark教程】RDD编程
目录
入门
RDD编程指引
创建rdd集合,可以将rdd看做是spark分布式环境下的list
读取文件
RDD操作
转换transform:生成了新的RDD
行动action:汇总所有结果返回驱动程序
缓存
打印部分记录
共享变量
累加器
创建累加器
构造累加器
留意惰性(spark2.4.0中疑似取消了,因为以下代码在spark2.4.0中测试返回了正常结果)
入门
val textFile = sc.textFile("/test.csv")//textFile为RDD类型,具有List的很多相似操作,可以进行循环遍历,例如map,foreach,filter等
- map操作:对rdd中每行进行处理
- flatmap操作:对rdd中每行进行展开处理
- collect操作:将结果转换为Array类型
- cache操作:将rdd和dataset保存在内存,被session持有
RDD编程指引
创建rdd集合,可以将rdd看做是spark分布式环境下的list
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data, 5)// distData类型为ParallelCollectionRDD,且分片数为5
读取文件
- 若读取本地文件,本地文件需要在所有节点上可以被访问到
- 所有读取文件的方法都支持在目录上、通配符、压缩包上运行
sc.textFile("/my/directory") sc.textFile("/my/directory/*.txt") sc.textFile("/my/directory/*.gz")
- 控制返回文件数量,通常情况下返回文件为一个文件夹下的多个文件,可以使用SparkContext.wholeTextFiles控制返回文件的个数,例如返回一个文件
SparkContext.sequenceFile[Int, String] SparkContext.hadoopRDD SparkContext.objectFile
RDD操作
转换transform:生成了新的RDD
- map:返回一个新的分布式数据集,该数据集是通过将源的每个元素传递给函数func形成的
- mapValues:返回一个新的分布式数据集,同map相似,mapValues在(K,V)对的数据集上调用,仅对V进行操作
- filter:返回一个新的数据集,该数据集是通过选择源中func返回true的那些元素形成的
- flatmap:与map相似,但是每个输入项都可以映射到0个或多个输出项(因此func应该返回Seq而不是单个项)
- mapPartitions:与map相似,但是分别在RDD的每个分区(块)上运行,因此func在类型T的RDD上运行时必须为Iterator <T> => Iterator <U>类型
- mapPartitionsWithIndex:与mapPartitions相似,但它还为func提供表示分区索引的整数值,因此当在类型T的RDD上运行时,func必须为(Int,Iterator <T>)=> Iterator <U>类型
- sample:使用给定的随机数发生器的种子进行抽样,共三个参数,WithReplacement为true表示有抽样放回,原数据集大小不变,为false表示无放回抽样,原数据集在抽样后减少百分比,fraction表示抽样比例,seed表示随机数种子,Long型整数,例如12345L
- union:返回一个新的数据集,其中包含源数据集中的元素的并集
- intersection:返回一个新的RDD,其中包含源数据集中的元素的交集
- distinct:返回一个新的数据集,其中包含源数据集的不同元素
- groupByKey:
在(K,V)对的数据集上调用时,返回(K,Iterable <V>)对的数据集。
注意:如果要分组以便对每个键执行聚合(例如求和或平均值),则使用reduceByKey或aggregateByKey将产生更好的性能。
注意:默认情况下,输出中的并行度取决于父RDD的分区数。您可以传递一个可选numPartitions参数来设置不同数量的任务。
- reduceByKey:在(K,V)对的数据集上调用时,返回(K,V)对的数据集,其中每个键的值使用给定的reduce函数func(其类型必须为(V,V)=>)进行汇总V.与in一样groupByKey,reduce任务的数量可以通过可选的第二个参数配置
- aggregateByKey:在(K,V)对的数据集上调用时,返回(K,U)对的数据集,其中每个键的值使用给定的Combine函数和中性的“零”值进行汇总。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。像in中一样groupByKey,reduce任务的数量可以通过可选的第二个参数配置
- sortByKey:在由K实现Ordered的(K,V)对的数据集上调用时,返回(K,V)对的数据集,按布尔值指定,按键以升序或降序排序ascending
- join:在(K,V)和(K,W)类型的数据集上调用时,返回(K,(V,W))对的数据集,其中每个键都有所有成对的元素。外连接通过支持leftOuterJoin,rightOuterJoin和fullOuterJoin。注意:join之前最好确认rdd中元素的类型,防止出现Any类型,导致报错:but class RDD is invariant in type T.You may wish to define T as +T instead.
- cogroup:在(K,V)和(K,W)类型的数据集上调用时,返回(K,(Iterable <V>,Iterable <W>))元组的数据集。此操作也称为groupWith
- cartesian:笛卡尔积,在类型T和U的数据集上调用时,返回(T,U)对(所有元素对)的数据集
- pipe:通过shell命令(例如Perl或bash脚本)通过管道传输RDD的每个分区。将RDD元素写入进程的stdin,并将输出到其stdout的行作为字符串的RDD返回
- coalesce:将RDD中的分区数减少到numPartitions。筛选大型数据集后,对于更有效地运行操作很有用
- repartition:随机重排RDD中的数据以创建更多或更少的分区,并在整个分区之间保持平衡。这始终会拖曳网络上的所有数据
- repartition(1):重排RDD中的数据,合并为一个分区
- repartition(col("colName")):重排RDD中的数据,根据指定列的记录进行分区
- repartitionAndSortWithinPartitions:根据给定的分区程序对RDD重新分区,并在每个结果分区中,按其键对记录进行排序。这比repartition在每个分区内调用然后排序更为有效,因为它可以将排序推入洗牌机制
行动action:汇总所有结果返回驱动程序
- reduce:使用函数func(该函数接受两个参数并返回一个)来聚合数据集的元素。该函数应该是可交换的和关联的,以便可以并行正确地计算它
- collect:在驱动程序中将数据集的所有元素作为数组返回。这通常在返回足够小的数据子集的过滤器或其他操作之后很有用
- count:返回数据集中的元素数
- first:返回数据集的第一个元素(类似于take(1))
- take:返回数据集的前n个元素的数组
- takeSample:返回一个数组,该数组包含数据集num个元素的随机样本(是否替换),可以选择预先指定随机数生成器种子
- takeOrdered:使用自然顺序或自定义比较器返回RDD 的前n个元素
- saveAsTextFile:将数据集的元素以文本文件(或文本文件集)的形式写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定目录中。Spark将在每个元素上调用toString,以将其转换为文件中的一行文本
- saveAsSequenceFile:在本地文件系统,HDFS或任何其他Hadoop支持的文件系统的给定路径中,将数据集的元素作为Hadoop SequenceFile写入。这在实现Hadoop的Writable接口的键/值对的RDD上可用。在Scala中,它也可用于隐式转换为Writable的类型(Spark包括对基本类型(如Int,Double,String等)的转换
- saveAsObjectFile:使用Java序列化以简单的格式编写数据集的元素,然后可以使用进行加载 SparkContext.objectFile()
- countByKey:仅在类型(K,V)的RDD上可用。返回(K,Int)对的哈希图以及每个键的计数
- foreach:
在数据集的每个元素上运行函数func。通常这样做是出于副作用,例如更新累加器或与外部存储系统进行交互。
注意:在之外修改除累加器以外的变量foreach()可能会导致不确定的行为。有关更多详细信息,请参见了解闭包。
缓存
- persist:可以根据参数进行不同级别的缓存MEMORY_ONLY、MEMORY_AND_DISK、MEMORY_ONLY_SER、MEMORY_AND_DISK_SER、DISK_ONLY
- cache:默认缓存级别MEMORY_ONLY
- 缓存级别选择:MEMORY_ONLY>MEMORY_ONLY_SER>MEMORY_AND_DISK
- unpersist:释放缓存
打印部分记录
- collect:将全部记录汇总到一台机器上,可能会耗尽内存
- take:获取部分记录
共享变量
- 广播变量:在所有节点上创建一个只读变量,在使用时不应该调用函数中的指定变量值,而是直接使用指定广播变量,而且防止修改节点上的广播变量,
dataFrame和变量都可以使用broadcast进行广播,但是rdd不可以
val broadcastVar = sc.broadcast(Array(1, 2, 3))val broadcastDF = functions.broadcast(df)
- 广播变量:在所有节点上创建一个只读变量,在使用时不应该调用函数中的指定变量值,而是直接使用指定广播变量,而且防止修改节点上的广播变量,
累加器
创建累加器
val accum = sc.longAccumulator("My Accumulator")sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))println(accum.value)
构造累加器
//继承AccumulatorV2class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] { private val myVector: MyVector = MyVector.createZeroVector def reset(): Unit = {myVector.reset()}def add(v: MyVector): Unit = {myVector.add(v)} }// 创建累加器对象val myVectorAcc = new VectorAccumulatorV2 //在spark上下文中进行注册 sc.register(myVectorAcc, "MyVectorAcc1")
留意惰性(spark2.4.0中疑似取消了,因为以下代码在spark2.4.0中测试返回了正常结果)
val accum = sc.longAccumulator data.map { x => accum.add(x); x } // 这里累加器仍然为0,因为没有行动action操作触发执行map操作.
【极简spark教程】RDD编程相关推荐
- 【极简spark教程】DataFrame常用操作
目录 创建DataFrame List,toDF:使用List[Tuple]包装每行记录,结合toDF接口,,转化为DataFrame DataFrameRDD,StructType:推荐使用RDD和 ...
- 【极简spark教程】spark聚合函数
聚合函数分为两类,一种是spark内置的常用聚合函数,一种是用户自定义聚合函数 UDAF 不带类型的UDAF[较常用] 继承UserDefinedAggregateFunction 定义输入数据的sc ...
- 你好Python -- 极简Python教程
你好Python -- 极简Python教程 本教程针对Python业余爱好者,展示入门级的编程知识. 目录 你好Python -- 极简Python教程 一. 你好Python! 二. 我想对Pyt ...
- Nginx 极简入门教程
Nginx 极简入门教程 基本介绍 Nginx 是一个高性能的 HTTP 和反向代理 web 服务器,同时也提供了 IMAP/POP3/SMTP服务. Nginx 是由伊戈尔·赛索耶夫为俄罗斯访问量第 ...
- Python极简入门教程
前言 为了方便各位小白能轻松入门Python,同时加深自己对Python的理解,所以创造了"Python极简入门教程",希望能帮到大家,若有错误请多指正,谢谢.极简入门教程代表着不 ...
- tensorflow平台极简方式_TensorFlow极简入门教程
原标题:TensorFlow极简入门教程 随着 TensorFlow 在研究及产品中的应用日益广泛,很多开发者及研究者都希望能深入学习这一深度学习框架.本文介绍了TensorFlow 基础,包括静态计 ...
- pyecharts极简入门教程
作者:luanhz 来源:小数志 导读 数据可视化是整个数据分析流程中的关键环节,甚至有着一图定成败的关键性地位.前期,陆续推出了matplotlib和seaborn详细入门教程,对于常规的数据探索和 ...
- Spark:RDD编程总结(概述、算子、分区、共享变量)
目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...
- pyecharts x轴字体大小调整_pyecharts极简入门教程
导读 数据可视化是整个数据分析流程中的关键环节,甚至有着一图定成败的关键性地位.前期,陆续推出了matplotlib和seaborn详细入门教程,对于常规的数据探索和基本图表制作是足够的,但二者的一个 ...
最新文章
- makefile学习(转载)
- php 尾递归,关于尾递归的使用详解
- 操作系统:Windows映射网络文件夹的方法介绍
- 问题 J: Sequence Problem (II) : Array Practice
- 美团遭遇反垄断调查;微信利用社交垄断封杀西瓜视频;Qt 6 for Python发布|极客头条...
- 帝国cms+php7.0+mysql_帝国cms切换php7.x登录后台空白解决方法
- 一个简单的空间配置器
- MyCat 主键ID自增长配置
- mysql 面试知识点笔记(三)联合索引的最左匹配原则
- 基于python的爬虫毕业论文_基于python网络爬虫及数据处理毕业论文 相关实例(示例源码)下载 - 好例子网...
- android 如何播放音频,android如何播放和录制音频
- 一文理解全文搜索引擎(Lucene、Elasticsearch、Solr)、目录搜索引擎、元搜索引擎的异同
- 转载:Fiddler 教程
- 一个案例教会你:全面的数据分析应该怎么做?
- XCode 报错Thread 2:signal SIGABRT
- [R语言] WGCNA入门教程
- 重写equals方法一定要重写hashcode方法吗
- tpc-e mysql_mysql评测工具TPC-C使用
- 第二届Stata中国用户大会暨“计量经济方法及应用研讨会”会议通知
- ProGuard技术详解