一、Transformations算子

1.map-一对一

特点就是一对一,进来一个,出去一个

lines.map(one=>{one+"#"}).foreach(println)

2.flatMap-一对多

进来一个,出去一堆。比如读取一行数据:hello world ,出去的就是hello和world两个单词。匿名函数定义了其中的逻辑:将读取到的数据按照空格切分,最终输出多个单词

    lines.flatMap(one=>{one.split(" ")}).foreach(println)

3.filter过滤

进来一个,只要符合定义规则的才能顺利输出出去。即:只有hello才能输出,其他单词全都被过滤掉

//先得到一堆单词
val rdd1 = lines.flatMap(one => {one.split(" ")})//定义过滤规则,进行过滤输出rdd1.filter(one=>{"hello".equals(one)}).foreach(println)

4.reduceByKey和sortBy

reduceByKey就是相当于MR的Reduce过程,将相同的key进行聚合,并执行对应的逻辑(这里是做+1,即统计单词个数)。
聚合后要对RDD进行排序了,这里是借助tuple二元组来做计数、排序的。按照tuple的第二位来进行排序,默认升序。如果想要降序,那就result.sortBy(tp=>{tp._2},false)

 //得到split切割后的N多个单词val words = lines.flatMap(one => {one.split(" ")})//map,1 To 1,hello--->(hello,1)val pairWords = words.map(one=>{(one,1)})//聚合val result = pairWords.reduceByKey((v1:Int,v2:Int)=>{v1+v2})result.sortBy(tp=>{tp._2})//按照第二位来排序,进来tp,得到第二位
//    result.sortBy(tp=>{tp._2},false)//降序输出,默认升序result.foreach(println)

5.sortByKey

使用sortByKey实现sortBy的功能:“hello world”—>“hello” “world”—>(hello,1) (world,1)
关键的时候来了,利用tuple的swap反转,(hello 1)—>(1,hello)
使用sortByKey来进行排序,然后再利用一次反转

 val words = lines.flatMap(one => {one.split(" ")})val pairWords = words.map(one=>{(one,1)})val result:RDD[(String,Int)] = pairWords.reduceByKey((v1:Int,v2:Int)=>{v1+v2})val transRDD = result.map(tp=>{tp.swap})//反转,string,int  变  int,stringval r = transRDD.sortByKey(false)r.map(_.swap).foreach(println)

6.sample抽样

 /*** sample算子抽样* true:抽出来一个,完事再放回去,再继续抽。* 0.1:抽样的比例* 100L:指定种子,抽到的数据不管运行多少次都一样*/val result: RDD[String] = lines.sample(true,0.1,100L)result.foreach(println)

7.join

(k,v) (k,w)—>(k,(v,w)),k相同的join在一起

val result = rdd1.join(rdd2)

7.1 leftOuterJoin

以左为主,没有的就用None占坑

val result = rdd1.leftOuterJoin(rdd2)

7.2 rightOuterJoin

以右为主,没有的就用None占位

val result = rdd1.rightOuterJoin(rdd2)

8.union

合并两个数据集,类型要一致

val result = rdd1.union(rdd2)

9.intersection

取2个RDD的交集

val result = rdd1.intersection(rdd2)

10.subtract

取差集

val result = rdd1.subtract(rdd2)

11.mapPartitions

和map类似,遍历的单位是每个partition上的数据

 val result = rdd1.mapPartitions(iter=>{val listBuffer = new ListBuffer[String]()println("打开")while (iter.hasNext){val s = iter.next()println("插入。。。"+s)listBuffer.append(s+"#")}println("关闭")listBuffer.iterator})

12.distinct:map+redeceByKey+map

相当于去重了

val rdd1: RDD[String] = sc.makeRDD(Array[String]("a", "b", "c", "a", "d", "e", "a", "b"))val result = rdd1.distinct()//等价val result = rdd1.map(s=>{(s,1)}).reduceByKey(_+_).map(tp=>tp._1)

13.cogroup

(K,V).cogroup(K,V)=>(K,([V],[W]))
输出:(zhangsan,(CompactBuffer(1),CompactBuffer(100)))

 val result = nameRDD.cogroup(scoreRDD)result.foreach(println)

14.mapPartitionsWithIndex

index:分区号;
iter:分区号下的数据

val rdd2 = rdd1.mapPartitionsWithIndex((index, iter) => {val list = new ListBuffer[String]()while (iter.hasNext) {val one = iter.next()list += (s"rdd1 partition = $index ,value = $one")}list.iterator})rdd2.foreach(println)//rdd1 partition = 1 ,value = b

15.repartition

可以增多、减少分区。宽依赖算子,会产生shuffle;
这里区别于coalesce,coalesce同样可能增加、减少分区。但是coalesce是窄依赖算子,默认无shuffle,可通过设置true来开启。当coalesce由少的分区分到多的分区时,不让产生shuffle,不起作用。
因此可以变相的理解为:repartition常用于增多分区,coalesce常用于减少分区

val rdd3 = rdd2.repartition(3)rdd3.mapPartitionsWithIndex((index, iter) => {val list = new ListBuffer[String]()while (iter.hasNext) {val one = iter.next()list += (s"rdd1 partition = $index ,value = $one")}list.iterator}).foreach(println)

16.zip & zipwithindex

zip:两个RDD可以通过zip压缩在一起,输出结果:(a,1)

zipwithindex:Long就是RDD的index下标0,1,2…和各自的下标压缩在一起,形成K-V格式RDD。如:(a,0)

 rdd1.zip(rdd2).foreach(println)val rdd: RDD[(String, Long)] = rdd1.zipWithIndex()rdd.foreach(println)

二、Action算子

1.count

 //count:计算数据源有多少行val l = lines.count()println(l)

2.collect

回收计算结果到Driver端的内存

    val strings: Array[String] = lines.collect()strings.foreach(println)

3.firs

拿到第一条数据。first就是由take(1)实现的

    val result = lines.first()println(result)

4.tak

拿到指定行数的数据

    val result = lines.take(5)result.foreach(println)

5.foreachPartition

遍历的是每个partition上的数据

 rdd1.foreachPartition(iter=>{println("创建数据库连接")while (iter.hasNext){val s = iter.next()println("插入数据库:"+s)}println("关闭数据库连接")})

6.reduce &countByKey & countByValue

聚合执行对应逻辑,输出15

 val reslut = sc.parallelize(List[Int](1,2,3,4,5)).reduce((v1,v2)=>{v1+v2})println(reslut)

countByKey按照key分组,count整体相同的有几个

sc.parallelize(List[(String,Int)](("a",100),("b",200),("a",300),("d",400))).countByKey().foreach(println)

countByValue:整体作为value分组,计算出现次数。输出:((a,100),2)

sc.parallelize(List[(String,Int)](("a",100),("b",200),("a",300),("a",100))).countByValue().foreach(println)

Spark-Java算子

Spark-Scala算子相关推荐

  1. Spark学习之Spark RDD算子

    个人主页zicesun.com 这里,从源码的角度总结一下Spark RDD算子的用法. 单值型Transformation算子 map /*** Return a new RDD by applyi ...

  2. Spark action算子案例

    在上篇文章中,我们对Spark中几种常用的transformation算子通过Java和Scala两种代码分别进行了案例演示,Spark transformation算子案例  而在本文中,我们将继续 ...

  3. Spark RDD算子(八)mapPartitions, mapPartitionsWithIndex

    Spark RDD算子(八) mapPartitions scala版本 java版本 mapPartitionsWithIndex scala版本 java版本 mapPartitions mapP ...

  4. spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子

    目录 天小天:(一)Spark Streaming 算子梳理 - 简单介绍streaming运行逻辑 天小天:(二)Spark Streaming 算子梳理 - flatMap和mapPartitio ...

  5. 在Spark Scala/Java应用中调用Python脚本,会么?

    摘要:本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java程序调用的过程也大体相同. 本文分享自华为云社区<[Spark]如何在Spark Scala/ ...

  6. 教你如何在Spark Scala/Java应用中调用Python脚本

    摘要:本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java程序调用的过程也大体相同. 本文分享自华为云社区<[Spark]如何在Spark Scala/ ...

  7. Spark部分算子及使用

    Spark部分算子及使用 案例一:flatmap算子 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppN ...

  8. Spark transformation算子案例

    Spark支持两种RDD操作:transformation和action  在本文中,将对几个常用的transformation算子进行案例演示,采用Java和Scala两种语言对代码进行编写  其中 ...

  9. Spark RDD算子介绍

    Spark学习笔记总结 01. Spark基础 1. 介绍 Spark可以用于批处理.交互式查询(Spark SQL).实时流处理(Spark Streaming).机器学习(Spark MLlib) ...

  10. Spark 常用算子详解(转换算子、行动算子、控制算子)

    Spark简介 Spark是专为大规模数据处理而设计的快速通用的计算引擎: Spark拥有Hadoop MapReduce所具有的优点,但是运行速度却比MapReduce有很大的提升,特别是在数据挖掘 ...

最新文章

  1. jquery如何调用后台的方法
  2. python发送各类邮件的主要方法
  3. Svchost.exe占用CPU100%全面解析与进程说明
  4. vs开发工具报错:参数错误 异常来自 HRESULT:0x80070057 E_INVALIDARG
  5. 039_JavaScript对象访问器
  6. FCN全连接卷积网络(5)--Fully Convolutional Networks for Semantic Segmentation阅读(相关工作部分)
  7. 开源php面板,宝塔面板nginx安装终于搞定了
  8. Oracle中Date和Timestamp的区别
  9. shell脚本参数中有空格
  10. python不等于_Python小课堂|注释+运算符
  11. 八十第五个冠军(复制和匹配的字符串)
  12. VeryCD将于本月关闭 P2P历史即将终结
  13. python打开其他应用程序错误_Python应用程序错误(Udacity)
  14. costmap_2d 中计算footprint 的内切圆半径和外切圆半径的函数解析——点到线段的距离计算
  15. 使用mongoTemplate进行Aggregation聚合查询
  16. OnlyOffice 二次开发定制化部署
  17. 库卡机器人会卡顿吗_看完你就知道德国库卡机器人到底有多牛!
  18. ROS2读取realsense摄像头数据并发布topic到ros2
  19. WiFi以及天线测试项目详解
  20. android视频解码数据分辨率改变,Android实现任意分辨率视频编码的思考与实现

热门文章

  1. python提供两个对象身份比较操作符什么和什么来测试_python - 第二部分
  2. 第55件事 产品疯传的7个基本原则
  3. “service httpd does not support chkconfig” 問題
  4. 【入门须知】学DIV CSS技术如何入门?
  5. Linux基础(9)文本处理三剑客之grep
  6. hive的变量传递设置
  7. XML转JSON的javascript代码
  8. Android DHCP 启动分析【2】
  9. 偷懒日志 - 自动生成代码 - 第二步 生成POJO
  10. Openwebmail在Ubuntu Linux上的安装过程