Spark的RDD行动算子
目录
- 基本概念
- 算子介绍
- 1. reduce
- 2. collect
- 3. count
- 4. first
- 5. take
- 6. takeOrdered
- 案例实操1-6
- 7. aggregate
- 8. fold
- 案例实操7-8
- 9. countByKey
- 案例实操
- 10. save相关算子
- 案例实操
- 11. foreach
- 案例实操
基本概念
行动算子主要是将在数据集上运行计算后的数值返回到驱动程序,从而触发触发作业(Job)的执行。其底层代码调用的就是runJob的方法,底层会创建ActiveJob,并提交执行。
算子介绍
1. reduce
函数定义
def reduce(f: (T, T) => T): T
说明
聚集RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
2. collect
函数定义
def collect(): Array[T]
说明
在驱动程序中,以数组Array 的形式返回数据集的所有元素 。
3. count
函数定义
def count(): Long
说明
返回RDD 中元素的个数。
4. first
函数定义
def first(): T
说明
返回RDD 中的第一个元素 。
5. take
函数定义
def take(num: Int): Array[T]
说明
返回一个由RDD 的前 n 个元素组成的数组。
6. takeOrdered
函数定义
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
说明
返回该RDD 排序后的前 n 个元素组成的数组。
案例实操1-6
package com.atguigu.bigdata.spark.core.rdd.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark02_RDD_Operator_Action {def main(args: Array[String]): Unit = {//TODO 准备环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4))//TODO - 行动算子//reduce
// val i = rdd.reduce(_ + _)
//
// println(i)//collect:方法会讲不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
// val ints: Array[Int] = rdd.collect()
//
// println(ints.mkString(","))//统计数据源中的数据的个数val cnt = rdd.count()println(cnt)//first:获取数据源中第一个数据val first = rdd.first()println(first)//take:获取N个数据val ints: Array[Int] = rdd.take(3)println(ints.mkString(","))val rdd1 = sc.makeRDD(List(4,2,3,1))//takeOrdered:排序后,获取N个数据val ints1: Array[Int] = rdd1.takeOrdered(3)println(ints1.mkString(","))sc.stop()}
}
7. aggregate
函数定义
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
说明
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。
8. fold
函数定义
def fold(zeroValue: T)(op: (T, T) => T): T
说明
折叠操作,aggregate 的简化版操作。
案例实操7-8
package com.atguigu.bigdata.spark.core.rdd.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark03_RDD_Operator_Action {def main(args: Array[String]): Unit = {//TODO 准备环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4),2)//TODO - 行动算子//aggregateByKey:初始值只会参与分区内的计算//aggregate:初始值会参与分区内的计算,并且参与分区间的计算
// val result: Int = rdd.aggregate(10)(_ + _, _ + _)val result: Int = rdd.fold(10)(_ + _)println(result)sc.stop()}
}
9. countByKey
函数定义
def countByKey(): Map[K, Long]
说明
统计每种key 的个数。
package com.atguigu.bigdata.spark.core.rdd.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark04_RDD_Operator_Action {def main(args: Array[String]): Unit = {//TODO 准备环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,1,3,4),2)val rdd1 = sc.makeRDD(List(("a",1),("a",2),("a",3)))//TODO - 行动算子val intToLong: collection.Map[Int, Long] = rdd.countByValue()println(intToLong)val stringToLong: collection.Map[String, Long] = rdd1.countByKey()println(stringToLong)sc.stop()}
}
案例实操
10. save相关算子
函数定义
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile( path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
说明
将数据保存到不同格式的文件中。
案例实操
package com.atguigu.bigdata.spark.core.rdd.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark05_RDD_Operator_Action {def main(args: Array[String]): Unit = {//TODO 准备环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3)))//TODO - 行动算子rdd.saveAsTextFile("output")rdd.saveAsObjectFile("output1")//saveAsSequenceFile方法要求数据的格式必须为K-V类型rdd.saveAsSequenceFile("output2")sc.stop()}
}
11. foreach
函数定义
def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
说明
分布式遍历RDD 中的每一个元素,调用指定函数。
案例实操
package com.atguigu.bigdata.spark.core.rdd.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark06_RDD_Operator_Action {def main(args: Array[String]): Unit = {//TODO 准备环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4) )//foreach其实Driver端内存集合循环遍历的方法rdd.collect().foreach(println)println("*************")//foreach其实是Executor端内存数据打印rdd.foreach(println)//算子:Operator(算子)//RDD的方法和Scala集合对象的方法不一样//集合对象的方法都是在同一个节点的内存中完成的//RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行//为了区分不同的处理效果,所以将RDD的方法称之为算子//RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在EXecutor端执行sc.stop()}
}
Spark的RDD行动算子相关推荐
- Spark的RDD转换算子
目录 RDD转换算子 Value 类型 1. map 2. mapPartitions map 和mapPartitions 的区别 3. mapPartitionsWithIndex 4. flat ...
- spark常用RDD算子 汇总(java和scala版本)
github: https://github.com/zhaikaishun/spark_tutorial spark RDD的算子挺多,有时候如何灵活的使用,该如何用一下子想不起来,这一段时间将s ...
- spark之RDD的转换算子与行为算子的具体使用
文章目录 1.Transform算子 1.1 map 1.2 flatmap 1.3 groupBy和groupBykey 1.4 filter 1.5 Mappartitions 1.6 mapVa ...
- Spark:RDD编程总结(概述、算子、分区、共享变量)
目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...
- Spark学习笔记(7)——RDD行动算子
RDD方法又称RDD算子. 算子 : Operator(操作) RDD的方法和Scala集合对象的方法不一样,集合对象的方法都是在同一个节点的内存中完成的.RDD的方法可以将计算逻辑发送到Execut ...
- Spark的RDD持久化
RDD持久化 1. RDD Cache 缓存 说明 RDD 通过Cache 或者Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM 的堆内存中.但是并不是这两个方法被调用时立即 ...
- Spark的RDD依赖关系
RDD依赖关系 RDD 血缘关系 RDD 只支持粗粒度转换,即在大量记录上执行的单个操作.将创建 RDD 的一系列Lineage(血统)记录下来,以便恢复丢失的分区.RDD 的Lineage 会记录R ...
- Spark的RDD序列化
RDD序列化 1. 闭包检查 从计算的角度, 算子以外的代码都是在Driver 端执行, 算子里面的代码都是在 Executor端执行.那么在 scala 的函数式编程中,就会导致算子内经常会用到算子 ...
- Spark _07_补充部分算子【二】
接Spark _06_补充部分算子[一] https://blog.csdn.net/qq_41946557/article/details/102673673 scala API package d ...
最新文章
- SQL DEVELOPER 打不开了
- 《数学之美》第11章 如何确定网页和查询的相关性
- vue2 + vue-router + vuex + iview 入门项目
- quercus mysql_14.5 Quercus 原理及展望
- SQL大数据查询优化
- python报错 TypeError: an integer is required
- 一台电脑上同启动两个Tomcat的方式,windows/Linux配置。
- 8.基本数据结构-顺序表和链表
- JAVA入门级教学之(switch语句)
- 互联网晚报 | 11月20日 星期六 | 阿里云单季营收首次超200亿;淘特年度活跃用户超2.4亿;首届中国网络文明大会在京召开...
- tp摄像头的默认地址_tp-link怎么设置无线桥接 tp-link设置无线桥接方法【图文】...
- 如何解决”ArcGIS Server Site is currently being configured by another administrative operation“的问题
- mysql中datetime有带时区_当服务器时区不是UTC时,从Java中检索来自MySQL的UTC DATETIME字段...
- C语言在工业工程专业的应用,工业工程专业知识介绍
- 【配置关系】—Entity Framework实例详解
- 项目管理之项目周报模板
- PDF虚拟打印机的功能详解和使用方法
- 奥的斯自动人行道服务器密码,奥的斯服务器中文说明21页
- 主分区、扩展分区、逻辑分区和活动分区的区别与联系
- ZYNQ的Linux Linaro系统镜像制作SD卡启动