1. RDD基本操作

val rdd1 = sc.parallelize(List(1,2,3,4,4))
输出结果:rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
//这里org.apache.spark.rdd.RDD[Int],这里RDD[Int] 表示RDD里面存放是Int类型//将rdd转换成Array
scala> rdd1.collect
res0: Array[Int] = Array(1, 2, 3, 4, 4)//创建stringRDD
scala> val rdd2 = sc.parallelize(List("apple","orange","banana","Grape"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24
//将stringRDD转换成Array
scala> rdd2.collect
res1: Array[String] = Array(apple, orange, banana, Grape)//map运算:map运算可以传入的函数,将每一个元素经过函数运算产生另外一个RDD
scala> rdd1.map(_ + 1).collect
res2: Array[Int] = Array(2, 3, 4, 5, 5)
scala> rdd2.map(x => "fruit:"+x).collect
res5: Array[String] = Array(fruit:apple, fruit:orange, fruit:banana, fruit:Grape)//filter操作:可以用于对RDD内每个元素进行筛选,并且产生新的RDDscala> rdd1.filter(_ < 3).collect
res6: Array[Int] = Array(1, 2)scala> rdd1.filter(x => x < 3).collect
//filter对字符串操作
scala> rdd2.filter(x => x.contains("ra")).collect
res8: Array[String] = Array(orange, Grape)//distinct运算:删除重复元素
scala> rdd1.distinct.collect
res9: Array[Int] = Array(4, 2, 1, 3)//randomSplit运算:可以将整个集合元素,以随机数的方式按照比例分为多个RDD
scala> val sRDD = rdd1.randomSplit(Array(0.4,0.6))
sRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[12] at randomSplit at <console>:25, MapPartitionsRDD[13] at randomSplit at <console>:25)
scala> sRDD(0).collect
res3: Array[Int] = Array(1, 2, 4, 4)                                            scala> sRDD(1).collect
res4: Array[Int] = Array(3)//groupBy运算:按照输入的匿名参数规则,将数据分为多个Array
scala> val gRDD = rdd1.groupBy(x => if(x%2==0) "even" else "odd").collect
gRDD: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 4)), (odd,CompactBuffer(3, 1)))//输出结果
scala> gRDD(0)
res5: (String, Iterable[Int]) = (even,CompactBuffer(2, 4, 4))
scala> gRDD(1)
res6: (String, Iterable[Int]) = (odd,CompactBuffer(3, 1))

2. 多个RDD转换操作,RDD支持执行多个RDD的运算

scala> val rdd1 = sc.parallelize(List(3,1,2,5,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24scala> val rdd2 = sc.parallelize(List(5,6))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24scala> val rdd3 = sc.parallelize(List(2,7))
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24//使用union进行并集运算
scala> rdd1.union(rdd2).union(rdd3).collect
res7: Array[Int] = Array(3, 1, 2, 5, 5, 5, 6, 2, 7)
//使用++进行并集运算
scala> (rdd1 ++ rdd2 ++ rdd3).collect
res8: Array[Int] = Array(3, 1, 2, 5, 5, 5, 6, 2, 7)//使用intersection进行交集运算
scala> rdd1.intersection(rdd2).collect
res9: Array[Int] = Array(5)//使用subtract进行差集运算
scala> rdd1.subtract(rdd2).collect
res10: Array[Int] = Array(2, 1, 3)//使用cartesian进行笛卡尔集的运算
scala> rdd1.cartesian(rdd2).collect
res11: Array[(Int, Int)] = Array((3,5), (1,5), (3,6), (1,6), (2,5), (5,5), (5,5), (2,6), (5,6), (5,6))

3. 基本动作运算(这都是Actions运算,会马上执行结果)

//读取第1条数据
scala> rdd1.first
res12: Int = 3
//读取前几条数据
scala> rdd1.take(2)
res13: Array[Int] = Array(3, 1)
//按照从大到小排序读取前几条数据
scala> rdd1.takeOrdered(4)(Ordering[Int].reverse)
res15: Array[Int] = Array(5, 5, 3, 2)
//统计功能
scala> rdd1.stats
res16: org.apache.spark.util.StatCounter = (count: 5, mean: 3.200000, stdev: 1.600000, max: 5.000000, min: 1.000000)scala> rdd1.min
res17: Int = 1scala> rdd1.min
res17: Int = 1scala> rdd1.max
res18: Int = 5scala> rdd1.stdev
res19: Double = 1.6scala> rdd1.count
res20: Long = 5scala> rdd1.sum
res21: Double = 16.0scala> rdd1.mean
res22: Double = 3.2

4. RDD Key-Value 基本 “转换” 运算 -这个是map-reduce的基础

//创建Key-Value的RDD
scala> val kv = sc.parallelize(List((3,4),(3,6),(5,6),(1,2)))
kv: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24scala> kv.collect
res23: Array[(Int, Int)] = Array((3,4), (3,6), (5,6), (1,2))//列出所有的key值
scala> kv.keys.collect
res24: Array[Int] = Array(3, 3, 5, 1)//列出所有的value值
scala> kv.values.collect
res25: Array[Int] = Array(4, 6, 6, 2)//使用filter筛选出key<5的所有k-v对
scala> kv.filter{case (key,value) => key<5}.collect
res28: Array[(Int, Int)] = Array((3,4), (3,6), (1,2))//mapValues,可以针对RDD内每一组(K,V)进行运算,并且产生另外一个RDD
scala> kv.mapValues(x => x*x).collect
res29: Array[(Int, Int)] = Array((3,16), (3,36), (5,36), (1,4))//sortByKey 从小到大按照key排序
scala> kv.sortByKey(true).collect //true可以不写
res30: Array[(Int, Int)] = Array((1,2), (3,4), (3,6), (5,6))
scala> kv.sortByKey(false).collect
res32: Array[(Int, Int)] = Array((5,6), (3,4), (3,6), (1,2))//reduceByKey:不同的key,保持不变,相同的key,数值相加1). 例如:Array((1,2), (3,4), (3,6), (5,6)) ,第一个是key,第二个是value
2). reduceByKey 会虚招相同的key合并,相同的key数据有(3,4), (3,6)
3). 合并之后的结果为:(3,4+6)
4). 剩下的(1,2),(5,6),因为没有相同的key,保持不变
rdd1.reduceByKey((x,y) => (x+y)).collect

5. 多个RDD Key-Value “转换” 运算

val rdd1 = sc.parallelize(List((3,4),(3,6),(5,6),(1,2)))
val rdd2 = sc.parallelize(List((3,8),(6,8)))//Key-Value RDD join运算:按照相同的key值join起来
scala> rdd1.join(rdd2)
res4: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[8] at join at <console>:28scala> rdd1.join(rdd2).collect
res5: Array[(Int, (Int, Int))] = Array((3,(4,8)), (3,(6,8)))scala> rdd1.join(rdd2).collect.foreach(println)
(3,(4,8))
(3,(6,8))//Key-Value RDD leftOuterJoin 运算,运算规则如下:
1). leftOuterJoin 会从左边的集合(rdd1)对应到右边的集合(rdd2),并显示所有左边集合(rdd1)中的元素
2). 如果rdd1的key值对应到rdd2,会显示相同的key (3,(4,Some(8)))、(3,(6,Some(8)))
3). 如果rdd1的key值对应不到rdd2,会显示None (5,(6,None))、(1,(2,None))scala> rdd1.leftOuterJoin(rdd2).collect
res7: Array[(Int, (Int, Option[Int]))] = Array((1,(2,None)), (3,(4,Some(8))), (3,(6,Some(8))), (5,(6,None)))scala> rdd1.leftOuterJoin(rdd2).collect.foreach(println)
(1,(2,None))
(3,(4,Some(8)))
(3,(6,Some(8)))
(5,(6,None))//Key-Value RDD rightOuterJoin 运算,运算规则如下:
1). rightOuterJoin 会从右边的集合(rdd1)对应到左边的集合(rdd2),并显示所有右边集合(rdd1)中的元素
2). 如果rdd1的key值对应到rdd2,会显示相同的key (3,(4,Some(8)))、(3,(6,Some(8)))
scala> rdd1.rightOuterJoin(rdd2).collect
res9: Array[(Int, (Option[Int], Int))] = Array((6,(None,8)), (3,(Some(4),8)), (3,(Some(6),8)))scala> rdd1.rightOuterJoin(rdd2).collect.foreach(println)
(6,(None,8))
(3,(Some(4),8))
(3,(Some(6),8))//Key-Value RDD subtractByKey 运算: 删除相同key的值
scala> rdd1.subtractByKey(rdd2).collect.foreach(println)
(1,2)
(5,6)

5. RDD Key-Value "动作"运算

//读取第一条数据
scala> rdd1.first
res4: (Int, Int) = (3,4)
//读取前几条数据
scala> rdd1.take(2)
res5: Array[(Int, Int)] = Array((3,4), (3,6))//获取第一条数据的元素
scala> rdd1.first._1
res6: Int = 3scala> rdd1.first._2
res7: Int = 4//计算每一个key值的条数
scala> rdd1.countByKey
res8: scala.collection.Map[Int,Long] = Map(1 -> 1, 3 -> 2, 5 -> 1)//将结果转换成map的形式显示
scala> rdd1.collectAsMap
res9: scala.collection.Map[Int,Int] = Map(5 -> 6, 1 -> 2, 3 -> 6)//查询map中的数据
scala> res9(5)
res10: Int = 6
//k-v 的lookup运算,查找key值
scala> rdd1.lookup(3)
res12: Seq[Int] = WrappedArray(4, 6)scala> rdd1.lookup(5)
res13: Seq[Int] = WrappedArray(6)

6. Broadcast 广播变量

//创建rdd
scala> val kv = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))
kv: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24//创建map对照表 fruitMap
scala> val fruitMap = kv.collectAsMap
fruitMap: scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana)//创建fruitIds
val fruitIds = kv.keys.collect
val fruitIds = kv.keys.collect.toList
//使用fruitMap 进行对照表转换
//val fruitNames = fruitIds.map(x => fruitMap(x)).collect 执行报错 为什么呢?
//

7. accumulator 累加器

spark提供了accumulator累加器共享变量(shared variable).使用规则如下:
1). accumulator 累加器可以使用SparkContext.accumulator([初始值]) 来创建
2). 使用 "+=" 累加
3). 在task中,例如 foreach循环中,不能读取累加器的值
4). 只有驱动程序,也就是循环外,才可以使用 .value 来读取累加器的值scala> val ls = sc.parallelize(List(1,2,3,4,5,6,7))
ls: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24scala> val total = sc.accumulator(0.0)
warning: there were two deprecation warnings; re-run with -deprecation for details
total: org.apache.spark.Accumulator[Double] = 0.0scala> val num = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
num: org.apache.spark.Accumulator[Int] = 0scala> ls.foreach( i => { total += i; num += 1})scala> println(total.value)
28.0scala> println(num.value)
7

8. RDD persistence 持久化

RDD persistence 持久化机制:可以用于将需要重复运算的RDD存储在内存中,以便大幅提升运算效率Spark RDD持久化方法如下:RDD.persist(存储等级) -- 默认存储等级是:MEMERY_ONLY,也就是存储在内存中持久等级:MEMERY_ONLY:默认,存储RDD的方式是以Java对象反串行化在JVM内存中。如果RDD太大无法完全存储在内存中,多余的RDD partitions不会cache在内存中,而是需要时再重新计算MEMERY_AND_DISK: 存储RDD的方式是以Java对象反串行化在JVM内存中。如果RDD太大无法完全存储在内存中,多余的RDD partitions存储在硬盘,需要时从硬盘读取MEMERY_AND_SER:类似 MEMERY_ONLYMEMERY_AND_DISK_SER: 类似 MEMERY_AND_DISKDISK_ONLY: 存储在硬盘上RDD.unpersist() -- 取消持久化范例:略去

spark入门三(RDD基本运算)相关推荐

  1. Spark入门(三)——SparkRDD剖析(面试点)

    Spark RDD剖析 RDD简介 如下案例: RDD容错 RDD 宽窄依赖 Sage划分(重点) 小结 RDD缓存机制 Check Point 机制 RDD简介 Spark计算中一个重要的概念就是可 ...

  2. Spark入门系列(二)| 1小时学会RDD编程

    作者 | 梁云1991 转载自Python与算法之美(ID:Python_Ai_Road) 导读:本文为 Spark入门系列的第二篇文章,主要介绍 RDD 编程,实操性较强,感兴趣的同学可以动手实现一 ...

  3. Spark入门必读:核心概念介绍及常用RDD操作

    导读:Spark是由加州大学伯克利分校AMP实验室开源的分布式大规模数据处理通用引擎,具有高吞吐.低延时.通用易扩展.高容错等特点.Spark内部提供了丰富的开发库,集成了数据分析引擎Spark SQ ...

  4. Spark入门实战系列--6.SparkSQL(中)--深入了解SparkSQL运行计划及调优

    [注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.1  运行环境说明 1.1.1 硬软件环境 l  主机操作系统:Windows 64位, ...

  5. Spark入门实战系列--2.Spark编译与部署(下)--Spark编译安装

    [注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.编译Spark Spark可以通过SBT和Maven两种方式进行编译,再通过make-d ...

  6. Spark入门实战系列--8.Spark MLlib(上)--机器学习及SparkMLlib简介

    [注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.机器学习概念 1.1 机器学习的定义 在维基百科上对机器学习提出以下几种定义: l&qu ...

  7. Spark入门实战系列--4.Spark运行架构

    注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1. Spark运行架构 1.1 术语定义 lApplication:Spark Applic ...

  8. Spark入门实战系列--6.SparkSQL(上)--SparkSQL简介

    [注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.SparkSQL的发展历程 1.1 Hive and Shark SparkSQL的前身 ...

  9. Spark入门阶段一之扫盲笔记

    介绍 spark是分布式并行数据处理框架 与mapreduce的区别: mapreduce通常将中间结果放在hdfs上,spark是基于内存并行大数据框架,中间结果放在内存,对于迭代数据spark效率 ...

最新文章

  1. win7下用sublime搭建c语言开发环境
  2. 2、垃圾回收算法(标记清除算法、复制算法、标记整理算法和分代收集算法),各种垃圾收集器讲解(学习笔记)
  3. 好记心不如烂笔头,ssh登录 The authenticity of host 192.168.0.xxx can't be established. 的问题...
  4. 列出和过滤NIO.2中的目录内容
  5. OpenStack SFC 深入剖析
  6. 普华永道报告:三波自动化浪潮将依次出现,人类工作将显著受到影响
  7. 大数据带来的安全隐患有哪些
  8. 为web站点提供https服务的步骤
  9. VS2010/MFC设置对话框控件的Tab顺序
  10. qi接收启动协议_基于QI协议的无线充电通信系统
  11. MATLAB热障涂层成像,微波检测热障涂层孔隙率的可行性研究
  12. ad18常用快捷键可以修改吗_AD18快捷键
  13. php斗鱼弹幕接口,php实现斗鱼弹幕,一起来欣赏弹幕吧~
  14. windows使用choco安装cmake
  15. LOJ#10064. 「一本通 3.1 例 1」黑暗城堡
  16. php: 远程操作浏览器cookie存储入门
  17. css 绘制心形图案
  18. Altas 200 DK环境配置
  19. python画圣诞树
  20. 数据结构上机——希尔排序(含监视哨版本)

热门文章

  1. 【水滴石穿】github_popular
  2. sym8 matlab,【Matlab学习手记】sym8小波滤波
  3. 2.4 FrozenLake使用cross-entropy方法
  4. 广告点击率预测问题初探
  5. 再论意识、行为和结果
  6. Python换钱的最少货币数
  7. 洛谷是什么?hydro是什么
  8. mysql增加年龄约束_mysql-约束
  9. Magic Leap开发指南(3)-- 将你的Cube投递给其他用户
  10. Windows 2008 Server搭建Radius服务器的方法