Python Spark RDD
Python Spark RDD
RDD(Resilient Distributed Dataset)弹性分布式数据集是Spark的核心,RDD能与其他系统兼容,可以导入外部存储系统的数据集,例如HDFS、HBase或其他Hadoop数据源。
RDD的三种基本运算
- transformation“转换”运算:RDD执行“转换”运算会产生另外一个RDD;RDD具有lazy特性,“转换”运算并不会立刻执行,等到执行“动作”运算才实际执行
- action“动作”运算:RDD执行“动作”运算后不会产生另一个RDD,而是产生数值,数组或写入文件系统;“动作”运算会立刻执行,并且连同之前的“转换”运算一起执行。
- persistence“持久化”运算:对于会重复使用的RDD,可以将RDD“持久化”在内存中作为后续使用,以提高执行性能。
Lineage机制具备容错的特性
- RDD本身具有Lineage机制,记录每个RDD与其父代RDD之间的关联,还会记录通过什么操作才由父代RDD得到该RDD信息。
- RDD本身的immutable(不变性),加上Lineage机制,使得Spark具备容错的特性。如果某节点的机器出现故障,那么存储于该节点的RDD损毁后就会重新执行一连串“转换”命令,产生新的输出数据,以避免因为特定节点的故障而造成整个系统无法运行的问题。
基于IPython Notebook 进行基本的Spark RDD运算
首先,终端切换ipynotebook工作目录
cd ~/pythonwork/ipynotebook
运行IPython Notebook 使用Spark
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" pyspark
打开IPython Notebook后
1、创建intRDD
intRDD = sc.parallelize([4,1,2,5,5,6,8])
上述命令使用parallelize方法输入一个List的参数的方式定义intRDD,是一个“转换”运算,不会立即执行
2、intRDD转换为List
intRDD.collect()
上述命令,intRDD执行collect()方法之后会转换为List,是一个“动作”运算,会立即执行。
执行结果
3、创建stringRDD并转换
stringRDD=sc.parallelize(["Apple", "Google", "Facebook", "Apache"])
stringRDD.collect()
4、map运算
map运算通过传入的函数将每个元素经过函数运算产生另外一个RDD。如下图:
上图,函数isEven作为参数传入map命令,map使得每个元素作为实参传入函数并返回结果,从而产生另一个RDD,map本身是一个“转换”运算,不会立即执行,后面加上collect()方法(“动作”运算)才立即执行出结果。
map运算也可以不传入函数名,即匿名函数,例如:
上述命令中,lamba语句表示匿名函数(anonymous functions),其中指定x为形参,x+5为要执行的命令
map字符串运算,针对字符串RDD执行map运算:
5、filter运算
顾名思义,filter“滤波运算”对RDD内每个元素进行筛选,并产生另一个RDD
6、distinct运算
distinct运算会删除重复元素intRDD.distinct().collect()
7、randomSplit运算
randomSplit运算可以将整个集合元素以随机数的方式按照比例分为多个RDD
8、groupBy运算
groupBy运算可以按照传入匿名函数规则将数据分为多个List
多个RDD“转换”运算
创建多个RDD范例:
intRDD1 = sc.parallelize([1,3,5,7,9])
intRDD2 = sc.parallelize([2,4,6,8,10])
intRDD3 = sc.parallelize([3,1,5,2])
1、union并集运算
2、intersection交集及subtract差集运算
3、cartesian笛卡儿积运算 intRDD1.cartesian(intRDD3).collect()
基本“动作”运算
1、读取RDD元素
2、统计功能
将RDD内的元素进行统计运算
RDD Key-Value基本“转换运算”
1、创建Key-Value RDD范例
kvRDD = sc.parallelize([(1,3),(5,2),(5,6),(4,7)])
kvRDD.collect()
其中,第一个字段是Key,第二个字段是Value
2、列出全部key值和value值
3、filter运算
4、mapValues运算
mapValues运算针对RDD内每一个(key,value)键值对进行运算,产生另外一个RDD
5、sortByKey运算
6、reduceByKey运算
按照Key值进行reduce运算
上述命令,key值为5的value进行了“+”运算的合并操作。
多个RDD Key-Value “转换”运算
创建多个Key-Value RDD 范例
kvRDD1 = sc.parallelize([(2,3),(3,5),(4,6),(1,2)])
kvRDD2 = sc.parallelize([(3,5),(5,4)])
kvRDD1.collect()
kvRDD2.collect()
1、join运算
join运算将两个RDD按照相同的key值join起来
2、subtractByKey运算
subtractByKey运算会删除相同key值的数据
Key-Value RDD “动作”运算
key-value lookup运算
可以使用lookup输入key值查找value值
Broadcast 共享变量
Shared variable共享变量可用于节省内存与运行时间,提升并行处理的执行效率。共享变量包括Broadcast和accumulator
不使用共享变量的一般RDD范例在并行处理中每执行一次转换都必须将数据(列表或字典)传送到WorkerNode,才能执行转换,如果字典(对照表)很大,需要转换的列表RDD也很大,会耗费很多内存与时间。
1、Broadcasti广播变量使用规则
- 可以使用SparkContext.broadcast([初始值])创建
- 使用.value方法读取Broadcasti广播变量的值
- Broadcasti广播变量被创建后不能修改
2、Broadcasti广播变量范例
创建kvCorp key-value RDD
kvCorp=sc.parallelize([(1,"Apple"),(2,"Google"),(3,"Facebook"),(4,"Apache")])
在并行处理中,bcCorpMap广播变量会传送到WorkerNode机器中,并存储在内存中,且后续在此worker Node都可以使用该广播变量执行转换,节省了内存和时间。
accumulator 累加器
1、accumulator 累加器共享变量规则
- accumulator 累加器可以使用SparkContext.accumulator([初始值])创建
- 使用方法.add()进行累加
- 在task中,例如foreach循环中,不能读取累加器的值
- 只有驱动程序(循环外),才可以使用.value来读取累加器的值
2、accumulator累计器范例
RDD Persistence持久化
Spark RDD 持久化机制可以将需要重复运算的RDD存储在内存中,以便大幅提升运算效率。
Spark RDD持久化使用方法如下:
- RDD.persist(存储等级)——指定存储等级,默认是MEMORY_ONLY(存储在内存中)。
- RDD.unpersist()——取消持久化。
Python Spark RDD相关推荐
- 《Python Spark 2.0 Hadoop机器学习与大数据实战_林大贵(著)》pdf
<Python+Spark 2.0+Hadoop机器学习与大数据实战> 五星好评+强烈推荐的一本书,虽然内容可能没有很深入,但作者非常用心的把每一步操作详细的列出来并给出说明,让我们跟着做 ...
- Spark RDD/Core 编程 API入门系列之动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战(二)...
1.动手实战和调试Spark文件操作 这里,我以指定executor-memory参数的方式,启动spark-shell. 启动hadoop集群 spark@SparkSingleNode:/usr/ ...
- spark学习13(spark RDD)
RDD及其特点 1)RDD(Resillient Distributed Dataset)弹性分布式数据集,是spark提供的核心抽象.它代表一个不可变.可分区.里面的元素可并行计算的集合 2)RDD ...
- Spark RDD概念学习系列之rdd持久化、广播、累加器(十八)
1.rdd持久化 2.广播 3.累加器 1.rdd持久化 通过spark-shell,可以快速的验证我们的想法和操作! 启动hdfs集群 spark@SparkSingleNode:/usr/loca ...
- spark RDD官网RDD编程指南
http://spark.apache.org/docs/latest/rdd-programming-guide.html#using-the-shell Overview(概述) 在较高的层次上, ...
- Spark RDD并行度与分区设置
默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能 够并行计算的任务数量我们称之为并行度.这个数量可以在构建 RDD 时指定.记住,这里 的并行执行的任 ...
- SPARK RDD JAVA API 用法指南
1.RDD介绍: RDD,弹性分布式数据集,即分布式的元素集合.在spark中,对所有数据的操作不外乎是创建RDD.转化已有的RDD以及调用RDD操作进行求值.在这一切的背后,Spark会自动将RDD ...
- python spark视频_Spark2.x+Python大数据机器学习视频课程
本课程系统讲解如何在Spark2.0上高效运用Python来处理数据并建立机器学习模型,帮助读者开发并部署高效可拓展的实时Spark解决方案. 第一章.搭建Spark 2.x+Python开发环境及基 ...
- [Spark][Python]Spark 访问 mysql , 生成 dataframe 的例子:
[Spark][Python]Spark 访问 mysql , 生成 dataframe 的例子: mydf001=sqlContext.read.format("jdbc").o ...
最新文章
- R语言plot函数可视化、ggplot2可视化把图像标题(title)的部分内容着色实战:标题的部分内容配置不同的色彩、副标题(subtitle)的内容配置不同的色彩
- 带线的无限级下拉树列表-完整示例篇
- java对日期进行加减操作以及比较大小
- Echart..js插件渲染报错 data.length1?
- 基础才是重中之重~通过人类的生活来学习Delegate
- SQL Server触发器创建、删除、修改、查看示例步骤
- 裁剪左上角x左上角y填什么_少了立体裁剪,你的服装设计生涯还完整吗?
- javascript判断非空
- go语言导出oracle数据,Go语言导出内容到Excel的方法
- android 模拟gps坐标,android中模拟器中实现GPS坐标改变
- 前端拼音首字母搜索姓名
- UE接入过程(LTE和NR)
- WorkNC如何创建夹具系统 (以虎钳为例)
- MySQL中where 1=1真的会影响性能么?
- 计算机中f4的应用,Excel中F4键的9个功能,提高90%工作效率
- 使用js调用接口导出excel
- 硬件开发之pcb---PCB抗干扰设计原则
- 一元线性回归模型及其Python案例
- mysql 分表 id_MySQL分表自增ID解决方案
- 华为S2300交换机端口镜像配置