从集合(内存)中创建RDD
从集合(内存)中创建RDD,Spark主要提供了两种方法:parallelize和makeRDD由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、HBase 等。
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()
RDD转换算子
RDD根据数据方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value
类型将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD1: RDD[Int] = dataRDD.map(num => {num * 2}
)
val dataRDD2: RDD[String] = dataRDD1.map(num => {"" + num}
)将处理过的数据进行逐条映射转换,这里转换可以为类型的转换,也可以是值的转换;
flatMap函数
def flatMap[U:ClassTag]:RDD[U]
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
val dataRDD = sparkContext.makeRDD(List(List(1,2),List(3,4)
),1)
val dataRDD1 = dataRDD.flatMap(list => list
)
def groupBy:
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样
的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.groupBy( _%2)
def filer(f:T=> Boolean);RDD[T]
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出
现数据倾斜。
val dataRDD = sparkContext.makeRDD(List(1,2,3,4
),1)
val dataRDD1 = dataRDD.filter(_%2 == 0)
def distinct(0:
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2
),1)
val dataRDD1 = dataRDD.distinct()
val dataRDD2 = dataRDD.distinct(2)
sortBy
def sortBy[K](
f: (T) => K
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理
的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一
致。中间存在 shuffle 的过程
val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
union
def union(other: RDD[T]): RDD[T]
对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.union(dataRDD2)
subtract
def subtract(other: RDD[T]): RDD[T]
以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.subtract(dataRDD2)
reduceByKey:
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
可以将数据按照相同的 Key 对 Value 进行聚合
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
groupByKey
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
val dataRDD1 =sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.groupByKey()
val dataRDD3 = dataRDD1.groupByKey(2)
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))
案例实操:
统计出每一个省份每个广告被点击数量排行的 Top3
reduce:
def reduce(f: (T, T) => T): T
聚合RDD中的所有元素,先聚合分区内的数据,再聚合分区间的数据
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))// 聚合数据
val reduceResult:Int = rdd.reduce(_+_)
collect函数
def collect():Array[T]
在驱动程序中,以数组 Array 的形式返回数据集的所有元素val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
//收集数据到Driver
rdd.collect().foreach(println)
count函数
def count(): Long
返回RDD中元素的个数
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的个数
val countResult: Long = rdd.count()
save相关算子
函数签名:
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件
rdd.map((_,1)).saveAsSequenceFile("output2")
RDD文件的读取和保存
Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
文件格式分为:text 文件、csv 文件、sequence 文件以及 Object 文件;
文件系统分为:本地文件系统、HDFS、HBASE 以及数据库。
text文件:
//读取输入文件
val inputRDD:RDD[String] = sc.textFile("input/1.txt)//保存数据
inputRDD.saveASTextFile("output")sequence 文件
SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat
File)。在 SparkContext 中,可以调用 sequenceFile[keyClass, valueClass](path)。
// 保存数据为 SequenceFile
dataRDD.saveAsSequenceFile("output")
// 读取 SequenceFile 文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)object 对象文件对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。可以通过 objectFile[T:
ClassTag](path)函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用
saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。// 保存数据
dataRDD.saveAsObjectFile("output")
// 读取数据
sc.objectFile[Int]("output").collect().foreach(println)
从集合(内存)中创建RDD相关推荐
- Spark创建RDD的四种方式(一):从集合(内存)中创建 RDD代码示例
从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法: def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defau ...
- 在内存中创建临时表和表变量
在Disk-Base数据库中,由于临时表和表变量的数据存储在tempdb中,如果系统频繁地创建和更新临时表和表变量,大量的IO操作集中在tempdb中,tempdb很可能成为系统性能的瓶颈.在SQL ...
- 使用zipfile/BytetesIO实现在内存中创建zip压缩文件
需求描述 Flask开发的系统后台需要将一些程序中生成的文本数据和二进制的内存对象打包成zip文件提供给客户端从浏览器直接下载,刚开始的思路是先分别将文本和二进制的数据写到磁盘上,打包后再当成临时文件 ...
- 如何在内存中创建文件供用户下载,而不是通过服务器下载?
有什么方法可以在客户端上创建文本文件并提示用户下载文本文件,而无需与服务器进行任何交互? 我知道我不能直接写给他们的机器(安全性和全部),但是我可以创建并提示他们保存吗? #1楼 您甚至可以做一个比U ...
- 学习笔记Spark(四)—— Spark编程基础(创建RDD、RDD算子、文件读取与存储)
文章目录 一.创建RDD 1.1.启动Spark shell 1.2.创建RDD 1.2.1.从集合中创建RDD 1.2.2.从外部存储中创建RDD 任务1: 二.RDD算子 2.1.map与flat ...
- 如何在sqlite3连接中创建并调用自定义函数
#!/user/bin/env python # @Time :2018/6/8 14:44 # @Author :PGIDYSQ #@File :CreateFunTest.py '''如何在sql ...
- java内存模型 创建类_JVM内存模型及String对象内存分配
昨天看了一篇关于<Java后端程序员1年工作经验总结>的文章,其中有一段关于String和StringBuffer的描述,对于执行结果仍然把握不准,趁此机会也总结了下JVM内存模型. 1. ...
- java 获取对象方法有哪些方法有哪些方法有哪些_Java中创建String 对象的方法有哪些...
Java中创建String 对象的方法有哪些 发布时间:2020-11-25 16:45:30 来源:亿速云 阅读:71 作者:Leah 这篇文章将为大家详细讲解有关Java中创建String 对象的 ...
- java 数据保存内存_java中的各种数据类型在内存中存储的方式 一
1.java是如何管理内存的 java的内存管理就是对象的分配和释放问题.(其中包括两部分) 分配:内存的分配是由程序完成的,程序员需要通过关键字new为每个对象申请内存空间(基本类型除外),所有的对 ...
最新文章
- 朱晔的互联网架构实践心得S1E2:屡试不爽的架构三马车
- 超参数搜索——网格搜索和随机搜索
- pythonsuper_python中的super()是什么意思呢
- C++创建对象的两种方式
- oracle中的nls在哪,Oracle的NLS设置
- 日志单例log4cpp简述
- WP7 Tip:改变启动页
- Git指令及码云的使用笔记
- java流程图怎么画_计算机流程图怎么画
- WinCC vbs脚本小结
- linux 外接网卡驱动下载,绿联USB外置显卡+网卡驱动程序
- c语言取地址和间接引用
- 1.3.1 计算机的主要性能指标 (机器字长、数据通路带宽、主存容量、运算速度、吞吐量、响应时间、主频和时钟周期、CPI、CPU执行时间、MIPS、MFLOPS、GFLOPS、TFLOPS)
- 淘宝用户分析(步骤详细,数据分析项目)
- 大专计算机应用技术毕业生登记表自我鉴定,大专毕业生登记表的自我鉴定(精选5篇)...
- 【UVA1723】Intervals
- 临河三中宏志班2021年高考成绩查询,内蒙古巴彦淖尔市临河三中2018-2019高一下学期第二次月考(宏志)生物试卷 Word版含答案.doc...
- 东方国信 Java一面
- vue 判断两对象是否一致_判断两个对象的值是否相等
- 网络安全工具使用集锦手册
热门文章
- IP地址和子网划分学习笔记之《预备知识:进制计数》
- 如何正确上传一张图片?
- Powershell进阶学习(6) 部署 Windows PowerShell Web 访问
- 实战:Windows Server 2008 活动目录 传送和争夺操作主控角色
- 常用命令-JAVA大数据-Week5-DAY2-linux
- C++11 static_assert
- 【转载】Android 工具-adb原理
- if(-1)为真还是假_女人是“真拒绝”还是“假矜持”,其实一眼就能看出来
- hive 强转为string_String 源码浅析————终结篇
- CATIA怎么约束快捷键_3ds Max 常用快捷键大全,你学会了吗