Spark RDD Scala语言编程


RDD(Resilient Distributed Dataset)是一个不可变的分布式对象集合, 每个rdd被分为多个分区, 这些分区运行在集群的不同节点上。rdd支持两种类型的操作:转化(trainsformation)和行动(action), Spark只会惰性计算rdd, 也就是说, 转化操作的rdd不会立即计算, 而是在其第一次遇到行动操作时才去计算, 如果想在多个行动操作中复用一个rdd, 可以使用RDD.persist()让Spark把这个rdd缓存下来。

0. 初始化SparkContext

import org.apache.spark.{SparkConf, SparkContext}val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("spark-rdd-demo"))

1. 创建RDD

Spark提供了2种创建RDD的方式:

1.1 读取外部数据集
val lines = sc.parallelize(List("Java", "Scala", "C++"))
1.2 在驱动器程序中对一个集合进行并行化
val lines = sc.textFile("hdfs://dash-dev:9000/input/test.txt")

2. RDD操作

2.1 转化操作

RDD的转化操作是返回新RDD的操作, 常用转化操作总结如下:

表1: 对一个数据为{1,2,3,3}的RDD进行基本的转化操作

函数名 目的 示例 结果
map() 将函数应用于RDD中每个元素, 将返回值构成新的RDD rdd.map(x=>x+1) {2,3,4,5}
flatMap() 将函数应用于RDD中的每个元素, 将返回的迭代器的所有内容构成新的RDD, 常用来切分单词 rdd.flatMap(x=>x.to(2)) {1,2,2}
filter() 返回一个通过传入给filter()的函数的元素组成的RDD rdd.filter(x=> x>2) {3,3}
distinct() 去重 rdd.distinct() {1,2,3}
sample(withReplacement, fraction, [seed]) 对RDD采样, 以及是否替换 rdd.sample(false, 0.5) 非确定的

表2: 对数据分别为{1,2,3}和{2,3,4}RDD进行针对2个RDD的转化操作

函数名 目的 示例 结果
union() 求2个RDD的并集 rdd.union(other) {1,2,3,4}
intersection() 求2个RDD的交集 rdd.intersection(other) {2,3}
subtract() 求2个RDD的差集 rdd.subtract(other) {1}
cartesian() 求2个RDD的笛卡尔积 rdd.cartesian(other) {1,2}, {1,3}, {1,4}…{3,4}
sample(withReplacement, fraction, [seed]) 对RDD采样, 以及是否替换 rdd.sample(false, 0.5) 非确定的
2.2 行动操作

RDD的行动操作会把最终求得的结果返回驱动器程序, 或者写入外部存储系统中。

表3: 对一个数据为{1,2,3,3}的RDD进行基本RDD的行动操作

函数名 目的 示例 结果
redcue() 并行整合RDD中的所有元素 rdd.reduce((x, y) => x+y) 9
collect() 返回RDD中的所有元素 rdd.collect() {1,2,3,4}
count() 求RDD中的元素个数 rdd.count() 4
countByValue() 各元素在RDD中出现的次数 rdd.countByValue() {1,1}, {2, 1}, {3,2}
take(n) 从RDD中返回n个元素 rdd.take(2) {1,2}
top(n) 从RDD中返回前n个元素 rdd.top(3) {3,3,2}
foreach(func) 对RDD中的每个元素使用给定的函数 rdd.foreach(print) 1,2,3,3
2.3 向Spark传递函数

Spark的大部分转化和行动操作都要依赖于用户传递的函数来计算, 当传递的对象是某个对象的成员, 或者包含了对某个对象中一个字段的引用时(如self.field), Spark就会把整个对象发送到工作节点上--这比你本意想传递的东西大太多了!替代的方案是,把你需要的字段从对象中拿出来放到一个局部变量中, 然后传递这个局部变量:

class SearchFunctions(val query: String) {def isMatch(s: String): Boolean = {s.contains(query)}def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {// 问题:"isMatch"表示"this.isMatch", 因此会传递整个thisrdd.map(isMatch)}def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {// 问题: "query"表示"this.query", 因此会传递整个thisrdd.map(x => x.split(query))}def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {// 安全:只把我们需要的字段拿出来放入局部变量中val localQuery = this.queryrdd.map(x => x.split(localQuery))}
}

另外, 要注意的是, Spark要求我们传入的函数及其应用的数据是可序列化的(实现了Java的Serializable接口), 否则会出现NotSerializableException。

作者 @wusuopubupt
2016年11月11日

Spark RDD基本操作相关推荐

  1. Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)...

    本博文的主要内容是: 1.rdd基本操作实战 2.transformation和action流程图 3.典型的transformation和action RDD有3种操作: 1.  Trandform ...

  2. Spark笔记:RDD基本操作(上)

    本文主要是讲解spark里RDD的基础操作.RDD是spark特有的数据模型,谈到RDD就会提到什么弹性分布式数据集,什么有向无环图,本文暂时不去展开这些高深概念,在阅读本文时候,大家可以就把RDD当 ...

  3. Spark笔记:RDD基本操作(下)

    上一篇里我提到可以把RDD当作一个数组,这样我们在学习spark的API时候很多问题就能很好理解了.上篇文章里的API也都是基于RDD是数组的数据模型而进行操作的. Spark是一个计算框架,是对ma ...

  4. Spark SQL基本操作以及函数的使用

    2019独角兽企业重金招聘Python工程师标准>>> 引语: 本篇博客主要介绍了Spark SQL中的filter过滤数据.去重.集合等基本操作,以及一些常用日期函数,随机函数,字 ...

  5. Spark RDD概念学习系列之rdd持久化、广播、累加器(十八)

    1.rdd持久化 2.广播 3.累加器 1.rdd持久化 通过spark-shell,可以快速的验证我们的想法和操作! 启动hdfs集群 spark@SparkSingleNode:/usr/loca ...

  6. pyspark rdd 基本操作

    pyspark rdd 基本操作 原文链接 #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on F ...

  7. Spark RDD、DataFrame原理及操作详解

    RDD是什么? RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用. RDD内部可以 ...

  8. spark RDD概念及组成详解

    概念 弹性分布式数据集(RDD),Spark中的基本抽象.表示可以并行操作的元素的不变分区集合.此类包含所有RDD上可用的基本操作,例如map,filter和persist.另外,PairRDDFun ...

  9. PySpark之RDD基本操作

    PySpark之RDD基本操作 Spark是基于内存的计算引擎,它的计算速度非常快.但是仅仅只涉及到数据的计算,并没有涉及到数据的存储,但是,spark的缺点是:吃内存,不太稳定 总体而言,Spark ...

最新文章

  1. python脚本自动运行失败_解决Python中定时任务线程无法自动退出的问题
  2. JS_ECMA基本语法中的几种封装的小函数
  3. TextView实现自动滚动滚动.
  4. PL SQL导入导出sql/dmp文件
  5. P1057 传球游戏
  6. mysql数据库设计255_MySQL数据库设计规范
  7. 从零实现深度学习框架——实现常见运算的计算图(下)
  8. 手机中文c语言编辑器,Turbo C中文(c语言编辑器)v3.7.8.9
  9. 如何破解WP7并安装xap文件
  10. 【正本清源】算力大战根本就从未开始过!BCH分叉事件之技术细节七问
  11. FLV player 在线播放器【www.lantianye3.top】
  12. 3.EVE-NG导入Dynamips和IOL
  13. Endnote X9 插入参考文献、设置参考文献格式
  14. bzoj1754[Usaco2005 qua]Bull Math*
  15. Java项目:SpringBoot人才求职招聘网站
  16. view标签class属性
  17. Unity Shader 实现简单的压扁效果
  18. 以太网是什么拓扑结构
  19. 如何删除WhatsApp消息
  20. MATLAB计算信号短时平均过零率

热门文章

  1. SpringBoot:ERROR: column “***“ is of type numeric but expression is of type character varying
  2. Windows 系统45个小技巧(建议收藏)
  3. CEC2015:动态多目标野狗优化算法求解CEC2015(提供完整MATLAB代码,含GD、IGD、HV和SP评价指标)
  4. DOA估计 基于互质阵列的DOA估计
  5. Matlab:程序设计
  6. FPGA verilog实现全减器
  7. iPhone手机录音小技能:教你如何设置轻点两下背部录音
  8. c语言分隔符的作用,句子分割代码(C语言程序的语句分隔符是)
  9. Halcon学习(一) 初识Halcon HDevelop下载安装
  10. :I/O中断处理过程包括哪几个阶段?中断服务程序流程分为哪几部分?