Spark RDD基本操作
Spark RDD Scala语言编程
RDD(Resilient Distributed Dataset)是一个不可变的分布式对象集合, 每个rdd被分为多个分区, 这些分区运行在集群的不同节点上。rdd支持两种类型的操作:转化(trainsformation)和行动(action), Spark只会惰性计算rdd, 也就是说, 转化操作的rdd不会立即计算, 而是在其第一次遇到行动操作时才去计算, 如果想在多个行动操作中复用一个rdd, 可以使用RDD.persist()让Spark把这个rdd缓存下来。
0. 初始化SparkContext
import org.apache.spark.{SparkConf, SparkContext}val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("spark-rdd-demo"))
1. 创建RDD
Spark提供了2种创建RDD的方式:
1.1 读取外部数据集
val lines = sc.parallelize(List("Java", "Scala", "C++"))
1.2 在驱动器程序中对一个集合进行并行化
val lines = sc.textFile("hdfs://dash-dev:9000/input/test.txt")
2. RDD操作
2.1 转化操作
RDD的转化操作是返回新RDD的操作, 常用转化操作总结如下:
表1: 对一个数据为{1,2,3,3}的RDD进行基本的转化操作
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
map() | 将函数应用于RDD中每个元素, 将返回值构成新的RDD | rdd.map(x=>x+1) | {2,3,4,5} |
flatMap() | 将函数应用于RDD中的每个元素, 将返回的迭代器的所有内容构成新的RDD, 常用来切分单词 | rdd.flatMap(x=>x.to(2)) | {1,2,2} |
filter() | 返回一个通过传入给filter()的函数的元素组成的RDD | rdd.filter(x=> x>2) | {3,3} |
distinct() | 去重 | rdd.distinct() | {1,2,3} |
sample(withReplacement, fraction, [seed]) | 对RDD采样, 以及是否替换 | rdd.sample(false, 0.5) | 非确定的 |
表2: 对数据分别为{1,2,3}和{2,3,4}RDD进行针对2个RDD的转化操作
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
union() | 求2个RDD的并集 | rdd.union(other) | {1,2,3,4} |
intersection() | 求2个RDD的交集 | rdd.intersection(other) | {2,3} |
subtract() | 求2个RDD的差集 | rdd.subtract(other) | {1} |
cartesian() | 求2个RDD的笛卡尔积 | rdd.cartesian(other) | {1,2}, {1,3}, {1,4}…{3,4} |
sample(withReplacement, fraction, [seed]) | 对RDD采样, 以及是否替换 | rdd.sample(false, 0.5) | 非确定的 |
2.2 行动操作
RDD的行动操作会把最终求得的结果返回驱动器程序, 或者写入外部存储系统中。
表3: 对一个数据为{1,2,3,3}的RDD进行基本RDD的行动操作
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
redcue() | 并行整合RDD中的所有元素 | rdd.reduce((x, y) => x+y) | 9 |
collect() | 返回RDD中的所有元素 | rdd.collect() | {1,2,3,4} |
count() | 求RDD中的元素个数 | rdd.count() | 4 |
countByValue() | 各元素在RDD中出现的次数 | rdd.countByValue() | {1,1}, {2, 1}, {3,2} |
take(n) | 从RDD中返回n个元素 | rdd.take(2) | {1,2} |
top(n) | 从RDD中返回前n个元素 | rdd.top(3) | {3,3,2} |
foreach(func) | 对RDD中的每个元素使用给定的函数 | rdd.foreach(print) | 1,2,3,3 |
2.3 向Spark传递函数
Spark的大部分转化和行动操作都要依赖于用户传递的函数来计算, 当传递的对象是某个对象的成员, 或者包含了对某个对象中一个字段的引用时(如self.field), Spark就会把整个对象发送到工作节点上--这比你本意想传递的东西大太多了!替代的方案是,把你需要的字段从对象中拿出来放到一个局部变量中, 然后传递这个局部变量:
class SearchFunctions(val query: String) {def isMatch(s: String): Boolean = {s.contains(query)}def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {// 问题:"isMatch"表示"this.isMatch", 因此会传递整个thisrdd.map(isMatch)}def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {// 问题: "query"表示"this.query", 因此会传递整个thisrdd.map(x => x.split(query))}def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {// 安全:只把我们需要的字段拿出来放入局部变量中val localQuery = this.queryrdd.map(x => x.split(localQuery))}
}
另外, 要注意的是, Spark要求我们传入的函数及其应用的数据是可序列化的(实现了Java的Serializable接口), 否则会出现NotSerializableException。
作者 @wusuopubupt
2016年11月11日
Spark RDD基本操作相关推荐
- Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)...
本博文的主要内容是: 1.rdd基本操作实战 2.transformation和action流程图 3.典型的transformation和action RDD有3种操作: 1. Trandform ...
- Spark笔记:RDD基本操作(上)
本文主要是讲解spark里RDD的基础操作.RDD是spark特有的数据模型,谈到RDD就会提到什么弹性分布式数据集,什么有向无环图,本文暂时不去展开这些高深概念,在阅读本文时候,大家可以就把RDD当 ...
- Spark笔记:RDD基本操作(下)
上一篇里我提到可以把RDD当作一个数组,这样我们在学习spark的API时候很多问题就能很好理解了.上篇文章里的API也都是基于RDD是数组的数据模型而进行操作的. Spark是一个计算框架,是对ma ...
- Spark SQL基本操作以及函数的使用
2019独角兽企业重金招聘Python工程师标准>>> 引语: 本篇博客主要介绍了Spark SQL中的filter过滤数据.去重.集合等基本操作,以及一些常用日期函数,随机函数,字 ...
- Spark RDD概念学习系列之rdd持久化、广播、累加器(十八)
1.rdd持久化 2.广播 3.累加器 1.rdd持久化 通过spark-shell,可以快速的验证我们的想法和操作! 启动hdfs集群 spark@SparkSingleNode:/usr/loca ...
- pyspark rdd 基本操作
pyspark rdd 基本操作 原文链接 #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on F ...
- Spark RDD、DataFrame原理及操作详解
RDD是什么? RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用. RDD内部可以 ...
- spark RDD概念及组成详解
概念 弹性分布式数据集(RDD),Spark中的基本抽象.表示可以并行操作的元素的不变分区集合.此类包含所有RDD上可用的基本操作,例如map,filter和persist.另外,PairRDDFun ...
- PySpark之RDD基本操作
PySpark之RDD基本操作 Spark是基于内存的计算引擎,它的计算速度非常快.但是仅仅只涉及到数据的计算,并没有涉及到数据的存储,但是,spark的缺点是:吃内存,不太稳定 总体而言,Spark ...
最新文章
- python脚本自动运行失败_解决Python中定时任务线程无法自动退出的问题
- JS_ECMA基本语法中的几种封装的小函数
- TextView实现自动滚动滚动.
- PL SQL导入导出sql/dmp文件
- P1057 传球游戏
- mysql数据库设计255_MySQL数据库设计规范
- 从零实现深度学习框架——实现常见运算的计算图(下)
- 手机中文c语言编辑器,Turbo C中文(c语言编辑器)v3.7.8.9
- 如何破解WP7并安装xap文件
- 【正本清源】算力大战根本就从未开始过!BCH分叉事件之技术细节七问
- FLV player 在线播放器【www.lantianye3.top】
- 3.EVE-NG导入Dynamips和IOL
- Endnote X9 插入参考文献、设置参考文献格式
- bzoj1754[Usaco2005 qua]Bull Math*
- Java项目:SpringBoot人才求职招聘网站
- view标签class属性
- Unity Shader 实现简单的压扁效果
- 以太网是什么拓扑结构
- 如何删除WhatsApp消息
- MATLAB计算信号短时平均过零率
热门文章
- SpringBoot:ERROR: column “***“ is of type numeric but expression is of type character varying
- Windows 系统45个小技巧(建议收藏)
- CEC2015:动态多目标野狗优化算法求解CEC2015(提供完整MATLAB代码,含GD、IGD、HV和SP评价指标)
- DOA估计 基于互质阵列的DOA估计
- Matlab:程序设计
- FPGA verilog实现全减器
- iPhone手机录音小技能:教你如何设置轻点两下背部录音
- c语言分隔符的作用,句子分割代码(C语言程序的语句分隔符是)
- Halcon学习(一) 初识Halcon HDevelop下载安装
- :I/O中断处理过程包括哪几个阶段?中断服务程序流程分为哪几部分?