Spark-Scala算子
一、Transformations算子
1.map-一对一
特点就是一对一,进来一个,出去一个
lines.map(one=>{one+"#"}).foreach(println)
2.flatMap-一对多
进来一个,出去一堆。比如读取一行数据:hello world ,出去的就是hello和world两个单词。匿名函数定义了其中的逻辑:将读取到的数据按照空格切分,最终输出多个单词
lines.flatMap(one=>{one.split(" ")}).foreach(println)
3.filter过滤
进来一个,只要符合定义规则的才能顺利输出出去。即:只有hello才能输出,其他单词全都被过滤掉
//先得到一堆单词
val rdd1 = lines.flatMap(one => {one.split(" ")})//定义过滤规则,进行过滤输出rdd1.filter(one=>{"hello".equals(one)}).foreach(println)
4.reduceByKey和sortBy
reduceByKey就是相当于MR的Reduce过程,将相同的key进行聚合,并执行对应的逻辑(这里是做+1,即统计单词个数)。
聚合后要对RDD进行排序了,这里是借助tuple二元组来做计数、排序的。按照tuple的第二位来进行排序,默认升序。如果想要降序,那就result.sortBy(tp=>{tp._2},false)
//得到split切割后的N多个单词val words = lines.flatMap(one => {one.split(" ")})//map,1 To 1,hello--->(hello,1)val pairWords = words.map(one=>{(one,1)})//聚合val result = pairWords.reduceByKey((v1:Int,v2:Int)=>{v1+v2})result.sortBy(tp=>{tp._2})//按照第二位来排序,进来tp,得到第二位
// result.sortBy(tp=>{tp._2},false)//降序输出,默认升序result.foreach(println)
5.sortByKey
使用sortByKey实现sortBy的功能:“hello world”—>“hello” “world”—>(hello,1) (world,1)
关键的时候来了,利用tuple的swap反转,(hello 1)—>(1,hello)
使用sortByKey来进行排序,然后再利用一次反转
val words = lines.flatMap(one => {one.split(" ")})val pairWords = words.map(one=>{(one,1)})val result:RDD[(String,Int)] = pairWords.reduceByKey((v1:Int,v2:Int)=>{v1+v2})val transRDD = result.map(tp=>{tp.swap})//反转,string,int 变 int,stringval r = transRDD.sortByKey(false)r.map(_.swap).foreach(println)
6.sample抽样
/*** sample算子抽样* true:抽出来一个,完事再放回去,再继续抽。* 0.1:抽样的比例* 100L:指定种子,抽到的数据不管运行多少次都一样*/val result: RDD[String] = lines.sample(true,0.1,100L)result.foreach(println)
7.join
(k,v) (k,w)—>(k,(v,w)),k相同的join在一起
val result = rdd1.join(rdd2)
7.1 leftOuterJoin
以左为主,没有的就用None占坑
val result = rdd1.leftOuterJoin(rdd2)
7.2 rightOuterJoin
以右为主,没有的就用None占位
val result = rdd1.rightOuterJoin(rdd2)
8.union
合并两个数据集,类型要一致
val result = rdd1.union(rdd2)
9.intersection
取2个RDD的交集
val result = rdd1.intersection(rdd2)
10.subtract
取差集
val result = rdd1.subtract(rdd2)
11.mapPartitions
和map类似,遍历的单位是每个partition上的数据
val result = rdd1.mapPartitions(iter=>{val listBuffer = new ListBuffer[String]()println("打开")while (iter.hasNext){val s = iter.next()println("插入。。。"+s)listBuffer.append(s+"#")}println("关闭")listBuffer.iterator})
12.distinct:map+redeceByKey+map
相当于去重了
val rdd1: RDD[String] = sc.makeRDD(Array[String]("a", "b", "c", "a", "d", "e", "a", "b"))val result = rdd1.distinct()//等价val result = rdd1.map(s=>{(s,1)}).reduceByKey(_+_).map(tp=>tp._1)
13.cogroup
(K,V).cogroup(K,V)=>(K,([V],[W]))
输出:(zhangsan,(CompactBuffer(1),CompactBuffer(100)))
val result = nameRDD.cogroup(scoreRDD)result.foreach(println)
14.mapPartitionsWithIndex
index:分区号;
iter:分区号下的数据
val rdd2 = rdd1.mapPartitionsWithIndex((index, iter) => {val list = new ListBuffer[String]()while (iter.hasNext) {val one = iter.next()list += (s"rdd1 partition = $index ,value = $one")}list.iterator})rdd2.foreach(println)//rdd1 partition = 1 ,value = b
15.repartition
可以增多、减少分区。宽依赖算子,会产生shuffle;
这里区别于coalesce,coalesce同样可能增加、减少分区。但是coalesce是窄依赖算子,默认无shuffle,可通过设置true来开启。当coalesce由少的分区分到多的分区时,不让产生shuffle,不起作用。
因此可以变相的理解为:repartition常用于增多分区,coalesce常用于减少分区
val rdd3 = rdd2.repartition(3)rdd3.mapPartitionsWithIndex((index, iter) => {val list = new ListBuffer[String]()while (iter.hasNext) {val one = iter.next()list += (s"rdd1 partition = $index ,value = $one")}list.iterator}).foreach(println)
16.zip & zipwithindex
zip:两个RDD可以通过zip压缩在一起,输出结果:(a,1)
zipwithindex:Long就是RDD的index下标0,1,2…和各自的下标压缩在一起,形成K-V格式RDD。如:(a,0)
rdd1.zip(rdd2).foreach(println)val rdd: RDD[(String, Long)] = rdd1.zipWithIndex()rdd.foreach(println)
二、Action算子
1.count
//count:计算数据源有多少行val l = lines.count()println(l)
2.collect
回收计算结果到Driver端的内存
val strings: Array[String] = lines.collect()strings.foreach(println)
3.firs
拿到第一条数据。first就是由take(1)实现的
val result = lines.first()println(result)
4.tak
拿到指定行数的数据
val result = lines.take(5)result.foreach(println)
5.foreachPartition
遍历的是每个partition上的数据
rdd1.foreachPartition(iter=>{println("创建数据库连接")while (iter.hasNext){val s = iter.next()println("插入数据库:"+s)}println("关闭数据库连接")})
6.reduce &countByKey & countByValue
聚合执行对应逻辑,输出15
val reslut = sc.parallelize(List[Int](1,2,3,4,5)).reduce((v1,v2)=>{v1+v2})println(reslut)
countByKey按照key分组,count整体相同的有几个
sc.parallelize(List[(String,Int)](("a",100),("b",200),("a",300),("d",400))).countByKey().foreach(println)
countByValue:整体作为value分组,计算出现次数。输出:((a,100),2)
sc.parallelize(List[(String,Int)](("a",100),("b",200),("a",300),("a",100))).countByValue().foreach(println)
Spark-Java算子
Spark-Scala算子相关推荐
- Spark学习之Spark RDD算子
个人主页zicesun.com 这里,从源码的角度总结一下Spark RDD算子的用法. 单值型Transformation算子 map /*** Return a new RDD by applyi ...
- Spark action算子案例
在上篇文章中,我们对Spark中几种常用的transformation算子通过Java和Scala两种代码分别进行了案例演示,Spark transformation算子案例 而在本文中,我们将继续 ...
- Spark RDD算子(八)mapPartitions, mapPartitionsWithIndex
Spark RDD算子(八) mapPartitions scala版本 java版本 mapPartitionsWithIndex scala版本 java版本 mapPartitions mapP ...
- spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子
目录 天小天:(一)Spark Streaming 算子梳理 - 简单介绍streaming运行逻辑 天小天:(二)Spark Streaming 算子梳理 - flatMap和mapPartitio ...
- 在Spark Scala/Java应用中调用Python脚本,会么?
摘要:本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java程序调用的过程也大体相同. 本文分享自华为云社区<[Spark]如何在Spark Scala/ ...
- 教你如何在Spark Scala/Java应用中调用Python脚本
摘要:本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java程序调用的过程也大体相同. 本文分享自华为云社区<[Spark]如何在Spark Scala/ ...
- Spark部分算子及使用
Spark部分算子及使用 案例一:flatmap算子 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppN ...
- Spark transformation算子案例
Spark支持两种RDD操作:transformation和action 在本文中,将对几个常用的transformation算子进行案例演示,采用Java和Scala两种语言对代码进行编写 其中 ...
- Spark RDD算子介绍
Spark学习笔记总结 01. Spark基础 1. 介绍 Spark可以用于批处理.交互式查询(Spark SQL).实时流处理(Spark Streaming).机器学习(Spark MLlib) ...
- Spark 常用算子详解(转换算子、行动算子、控制算子)
Spark简介 Spark是专为大规模数据处理而设计的快速通用的计算引擎: Spark拥有Hadoop MapReduce所具有的优点,但是运行速度却比MapReduce有很大的提升,特别是在数据挖掘 ...
最新文章
- jquery如何调用后台的方法
- python发送各类邮件的主要方法
- Svchost.exe占用CPU100%全面解析与进程说明
- vs开发工具报错:参数错误 异常来自 HRESULT:0x80070057 E_INVALIDARG
- 039_JavaScript对象访问器
- FCN全连接卷积网络(5)--Fully Convolutional Networks for Semantic Segmentation阅读(相关工作部分)
- 开源php面板,宝塔面板nginx安装终于搞定了
- Oracle中Date和Timestamp的区别
- shell脚本参数中有空格
- python不等于_Python小课堂|注释+运算符
- 八十第五个冠军(复制和匹配的字符串)
- VeryCD将于本月关闭 P2P历史即将终结
- python打开其他应用程序错误_Python应用程序错误(Udacity)
- costmap_2d 中计算footprint 的内切圆半径和外切圆半径的函数解析——点到线段的距离计算
- 使用mongoTemplate进行Aggregation聚合查询
- OnlyOffice 二次开发定制化部署
- 库卡机器人会卡顿吗_看完你就知道德国库卡机器人到底有多牛!
- ROS2读取realsense摄像头数据并发布topic到ros2
- WiFi以及天线测试项目详解
- android视频解码数据分辨率改变,Android实现任意分辨率视频编码的思考与实现