collectAsMap(): Map[K, V]

返回key-value对,key是唯一的,如果rdd元素中同一个key对应多个value,则只会保留一个。/** * Return the key-value pairs in this RDD to the master as a Map. * * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only *          one value per key is preserved in the map returned) * * @note this method should only be used if the resulting data is expected to be small, as * all the data is loaded into the driver's memory. */def collectAsMap(): Map[K, V]
scala> val rdd = sc.parallelize(List(("A",1),("A",2),("A",3),("B",1),("B",2),("C",3)),3)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> rdd.collectAsMap
res0: scala.collection.Map[String,Int] = Map(A -> 3, C -> 3, B -> 2)   

countByKey(): Map[K, Long]

计算有多少个不同的key./** * Count the number of elements for each key, collecting the results to a local Map. * * Note that this method should only be used if the resulting map is expected to be small, as * the whole thing is loaded into the driver's memory. * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */def countByKey(): Map[K, Long] = self.withScope {  self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap}
scala> val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:24scala> rdd.countByKey
res5: scala.collection.Map[Int,Long] = Map(1 -> 3, 2 -> 3)

countByValue()

计算不同的value个数,该函数首先通过map将每个元素转成(value,null)的key-value(value为null)对,然后调用countByKey进行统计。

/** * Return the count of each unique value in this RDD as a local map of (value, count) pairs. * * Note that this method should only be used if the resulting map is expected to be small, as * the whole thing is loaded into the driver's memory. * To handle very large results, consider using rdd.map(x =&gt; (x, 1L)).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {  map(value => (value, null)).countByKey()}
scala> val rdd = sc.parallelize(List(1,2,3,4,5,4,4,3,2,1))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24scala> rdd.countByValue
res12: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 2, 2 -> 2, 3 -> 2, 4 -> 3)

lookup(key: K)

根据key值搜索所有的value./** * Return the list of values in the RDD for key `key`. This operation is done efficiently if the * RDD has a known partitioner by only searching the partition that the key maps to. */def lookup(key: K): Seq[V]
scala> val rdd = sc.parallelize(List(("A",1),("A",2),("A",3),("B",1),("B",2),("C",3)),3)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24scala> rdd.lookup("A")
res2: Seq[Int] = WrappedArray(1, 2, 3)

checkpoint()

将RDD数据根据设置的checkpoint目录保存至硬盘中。

/** * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with `SparkContext#setCheckpointDir` and all references to its parent * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */def checkpoint(): Unit
/*通过linux命令创建/home/check目录后,设置checkpoint directory*/
scala> sc.setCheckpointDir("/home/check")scala> val rdd = sc.parallelize(List(("A",1),("A",2),("A",3),("B",1),("B",2),("C",3)),3)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24/*
*执行下面的代码会在/home/check目录下创建一个空的目录/home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49
*/
scala> rdd.checkpoint/*
执行count后会在上述目录下创建一个rdd目录,rdd目录下是数据文件
*/
scala> rdd.count
res5: Long = 6           

[root@localhost ~]# ll -a /home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49/
total 8
drwxr-xr-x. 2 root root 4096 Sep  4 10:29 .
drwxr-xr-x. 3 root root 4096 Sep  4 10:29 ..
[root@localhost ~]# ll -a /home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49/
total 12
drwxr-xr-x. 3 root root 4096 Sep  4 10:30 .
drwxr-xr-x. 3 root root 4096 Sep  4 10:29 ..
drwxr-xr-x. 2 root root 4096 Sep  4 10:30 rdd-6
[root@localhost ~]# ll -a /home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49/rdd-6/
total 32
drwxr-xr-x. 2 root root 4096 Sep  4 10:30 .
drwxr-xr-x. 3 root root 4096 Sep  4 10:30 ..
-rw-r--r--. 1 root root  171 Sep  4 10:30 part-00000
-rw-r--r--. 1 root root   12 Sep  4 10:30 .part-00000.crc
-rw-r--r--. 1 root root  170 Sep  4 10:30 part-00001
-rw-r--r--. 1 root root   12 Sep  4 10:30 .part-00001.crc
-rw-r--r--. 1 root root  170 Sep  4 10:30 part-00002
-rw-r--r--. 1 root root   12 Sep  4 10:30 .part-00002.crc

collect()

返回RDD所有元素的数组。/** * Return an array that contains all of the elements in this RDD. * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. */def collect(): Array[T]
scala> val rdd = sc.parallelize(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24scala> rdd.collect
res8: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

toLocalIterator: Iterator[T]

返回一个包含所有算的迭代器。/** * Return an iterator that contains all of the elements in this RDD. * * The iterator will consume as much memory as the largest partition in this RDD. * * Note: this results in multiple Spark jobs, and if the input RDD is the result * of a wide transformation (e.g. join with different partitioners), to avoid * recomputing the input RDD should be cached first. */def toLocalIterator: Iterator[T]
scala> val rdd = sc.parallelize(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> val it = rdd.toLocalIterator
it: Iterator[Int] = non-empty iteratorscala> while(it.hasNext){| println(it.next)| }
1
2
3
4
5
6
7
8
9
10

count()

返回RDD中元素的数量。/** * Return the number of elements in the RDD. */def count(): Long
scala> val rdd = sc.parallelize(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.count
res1: Long = 10

dependencies

返回该RDD的依赖RDD的地址。/** * Get the list of dependencies of this RDD, taking into account whether the * RDD is checkpointed or not. */final def dependencies: Seq[Dependency[_]]
scala> val rdd = sc.parallelize(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd1 = rdd.filter(_>3)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:26scala> val rdd2 = rdd1.filter(_<6)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at filter at <console>:28scala> rdd2.dependencies
res2: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@21c882b5)

partitions

以数组形式返回RDD各分区地址/** * Get the array of partitions of this RDD, taking into account whether the * RDD is checkpointed or not. */final def partitions: Array[Partition]
scala> val rdd = sc.parallelize(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24scala> rdd.partitions
res4: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@70c, org.apache.spark.rdd.ParallelCollectionPartition@70d)

first()

返回RDD的第一个元素。/** * Return the first element in this RDD. */def first(): T
scala> val rdd = sc.parallelize(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> rdd.first
res5: Int = 1

fold(zeroValue: T)(op: (T, T) => T)

使用zeroValue和每个分区的元素进行聚合运算,最后各分区结果和zeroValue再进行一次聚合运算。/** * @param zeroValue the initial value for the accumulated result of each partition for the `op` *                  operator, and also the initial value for the combine results from different *                  partitions for the `op` operator - this will typically be the neutral *                  element (e.g. `Nil` for list concatenation or `0` for summation) * @param op an operator used to both accumulate results within a partition and combine results *                  from different partitions */def fold(zeroValue: T)(op: (T, T) => T): T
scala> val rdd = sc.parallelize(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24scala> rdd.fold(10)(_+_)
res13: Int = 35


转载于:https://www.cnblogs.com/alianbog/p/5837396.html

Spark RDD Action 简单用例(一)相关推荐

  1. Spark RDD Action

    动作 含义 reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的 collect() 在驱动程序中,以数组的形式返回数据集的所有元素 count() 返回R ...

  2. 使用Alluxio高效存储Spark RDD

    更多精彩内容参见云栖社区大数据频道https://yq.aliyun.com/big-data:此外,通过Maxcompute及其配套产品,低廉的大数据分析仅需几步,详情访问https://www.a ...

  3. Spark-----Spark 与 Hadoop 对比,Spark 集群搭建与示例运行,RDD算子简单入门

    目录 一.Spark 概述 1.1. Spark是什么 1.2. Spark的特点(优点) 1.3. Spark组件 1.4. Spark和Hadoop的异同 二.Spark 集群搭建 2.1. Sp ...

  4. spark Rdd 操作transformaction和action等

    为什么80%的码农都做不了架构师?>>>    transformation是惰性的,只有action操作的时候,才会真正执行.spark有很多api,RDD的api只是spark的 ...

  5. Spark RDD算子(transformation + action)

    概念 RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.RDD具有数据流模 ...

  6. Spark RDD使用详解1--RDD原理

    RDD简介 在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD).RDD是Spark的最基本抽象,是对分布式内存的抽象使 ...

  7. [Spark]-RDD详解之变量操作

    RDD的操作 1.1 概述    RDD整体包含两大类操作 transformation 从现有中创建一个新的数据集 action 在对数据集做一定程度的计算后将结果返回 对于所有的transform ...

  8. Apache Spark 2.2.0 中文文档 - Spark RDD(Resilient Distributed Datasets)

    Spark RDD(Resilient Distributed Datasets)论文 概要 1: 介绍 2: Resilient Distributed Datasets(RDDs) 2.1 RDD ...

  9. Spark RDD用法

    RDD简介 并行化集合 外部数据集 读取文本文件 读取其他数据格式 存储RDD RDD 操作 向Spark传递函数 理解闭包 本地模式 vs 集群模式 打印RDD元素 使用键值对 中间操作Transf ...

最新文章

  1. python语言程序设计基础网课-宜昌2020年_高校邦_Python程序设计基础【实境编程】...
  2. python自动上传图片_使用Python实现一个简单的图片上传存储服务
  3. 输出cglib以及jdk动态代理产生的class文件
  4. Java大数一些个人的见解
  5. 一位面试者提到直接调用vuex中mutations方法
  6. Gradle入门:创建二进制分发
  7. java 盘符 系统_001-Java再回首开篇-入门基础
  8. JavaScript 用typeof来判断是否是对象的弊端
  9. python调试神器_Python里三个最高逼格的调试神器
  10. 多进程间不共享全局变量
  11. cc攻击的原理和处理方式
  12. 自制批量 PDF 转图片的工具软件(附百度网盘地址)
  13. Q上多项式可约性深化定理
  14. 静态变量(类变量)和非静态变量(成员变量、实例变量)的区别
  15. 基于C++实现视频聊天软件(一)
  16. 批量计算两经纬度点间的距离、方位角、中点坐标
  17. CGerberDoc* GetDocument() const; 错误
  18. AnyDesk(远程控制软件)中文版
  19. 淘宝/天猫盗用图片投诉之什么是原图?提示非原图怎么办?
  20. banner.txt的图案

热门文章

  1. 正则表达式学习笔记007--字符组简记法1
  2. 将connection存放在Threadlocal里和数据库连接池的区别
  3. 用Delphi中的Indy控件实现收发邮件
  4. 随想录(嵌入式应用的三个领域)
  5. linux下的C语言开发(动态库)
  6. 一步一步写算法(之排序二叉树删除-1)
  7. 用汇编的眼光看C++(之特殊函数)
  8. python网络编程linux pdf_Python网络编程:Linux epoll
  9. linux中如何分割字符串数组中,关于bash:linux shell脚本:拆分字符串,将它们放入一个数组中,然后循环遍历它们...
  10. Burp Suite如何拦截GET请求提交POET请求的参数