Python大数据处理扩展库pySpark用法精要
Spark是一个开源的、通用的并行计算与分布式计算框架,其活跃度在Apache基金会所有开源项目中排第三位,最大特点是基于内存计算,适合迭代计算,兼容多种应用场景,同时还兼容Hadoop生态系统中的组件,并且具有非常强的容错性。Spark的设计目的是全栈式解决批处理、结构化数据查询、流计算、图计算和机器学习等业务和应用,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,效率提升越大。
Spark集成了Spark SQL(分布式SQL查询引擎,提供了一个DataFrame编程抽象)、Spark Streaming(把流式计算分解成一系列短小的批处理计算,并且提供高可靠和吞吐量服务)、MLlib(提供机器学习服务)、GraphX(提供图计算服务)、SparkR(R on Spark)等子框架,为不同应用领域的从业者提供了全新的大数据处理方式,越来越便捷、轻松。
为了适应迭代计算,Spark把经常被重用的数据缓存到内存中以提高数据读取和操作速度,比Hadoop快近百倍,并且支持Java、Scala、Python、R等多种语言。除map和reduce之外,Spark还支持filter、foreach、reduceByKey、aggregate以及SQL查询、流式查询等等。
扩展库pyspark提供了SparkContext(Spark功能的主要入口,一个SparkContext表示与一个Spark集群的连接,可用来创建RDD或在该集群上广播变量)、RDD(Spark中的基本抽象,弹性分布式数据集Resilient Distributed Dataset)、Broadcast(可以跨任务重用的广播变量)、Accumulator(共享变量,任务只能为其增加值)、SparkConf(用来配置Spark)、SparkFiles(访问任务的文件)、StorageLevel(更细粒度的缓冲永久级别)等可以公开访问的类,并且提供了pyspark.sql、pyspark.streaming与pyspark.mllib等模块与包。
>>> from pyspark import SparkFiles
>>> path = 'test.txt'
>>> with open(path, 'w') as fp: #创建文件
fp.write('100')
>>> sc.addFile(path) #提交文件
>>> def func(iterator):
with open(SparkFiles.get('test.txt')) as fp: #打开文件
Val = int(fp.readline()) #读取文件内容
return [x * Val for x in iterator]
>>> sc.parallelize([1, 2, 3, 4, 5]).mapPartitions(func).collect() #并行处理,collect()返回包含RDD上所有元素的列表
[100, 200, 300, 400, 500]
>>> sc.parallelize([2, 3, 4]).count() #count()用来返回RDD中元素个数,parallelize()用来分布本地的Python集合,并创建RDD
3
>>> rdd = sc.parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect()) #collect()返回包含RDD中元素的列表,cartesian()计算两个RDD的笛卡尔积
[(1, 1), (1, 2), (2, 1), (2, 2)]
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect() #只保留符合条件的元素
[2, 4]
>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) #返回唯一元素
[1, 2, 3]
>>> rdd = sc.parallelize(range(10))
>>> rdd.map(lambda x: str(x)).collect() #映射
>>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0])
>>> rdd.max() #最大值
43.0
>>> rdd.max(key=str)
5.0
>>> rdd.min() #最小值
1.0
>>> rdd.sum() #所有元素求和
59.0
>>> from random import randint
>>> lst = [randint(1,100) for _ in range(20)]
>>> lst
[18, 55, 48, 13, 86, 23, 85, 62, 66, 58, 73, 96, 90, 16, 49, 98, 49, 69, 3, 53]
>>> sc.parallelize(lst).top(3) #最大的3个元素
[98, 96, 90]
>>> sorted(lst, reverse=True)[:3]
[98, 96, 90]
>>> sc.parallelize(range(100)).filter(lambda x:x>90).take(3) #使用take()返回前3个元素
[91, 92, 93]
>>> sc.parallelize(range(20), 3).glom().collect() #查看分片情况
[[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11, 12], [13, 14, 15, 16, 17, 18, 19]]
>>> sc.parallelize(range(20), 6).glom().collect() #查看分片情况
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9], [10, 11, 12], [13, 14, 15], [16, 17, 18, 19]]
>>> myRDD = sc.parallelize(range(20), 6) #6表示分片数
>>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part]) #执行任务
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
>>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part], [1]) #只查看第2个分片的结果
[9, 16, 25]
>>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part], [1,5]) #查看第2和第6个分片上的结果
[9, 16, 25, 256, 289, 324, 361]
>>> sc.parallelize([1,2,3,3,3,2]).distinct().collect() #distinct()返回包含唯一元素的RDD
[1, 2, 3]
>>> from operator import add, mul
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) #把所有分片上的数据累加
15
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(1, mul) #把所有分片上的数据连乘
120
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) #reduce()函数的并行版本
15
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(mul)
120
>>> result = sc.parallelize(range(1, 6)).groupBy(lambda x: x%3).collect() #对所有数据进行分组
>>> for k, v in result:
print(k, sorted(v))
0 [3]
1 [1, 4]
2 [2, 5]
>>> rdd1 = sc.parallelize(range(10))
>>> rdd2 = sc.parallelize(range(5, 20))
>>> rdd1.interp(rdd2).collect() #交集
[8, 9, 5, 6, 7]
>>> rdd1.subtract(rdd2).collect() #差集
[0, 1, 2, 3, 4]
>>> rdd1.union(rdd2).collect() #合并两个RDD上的元素
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
>>> rdd1 = sc.parallelize('abcd')
>>> rdd2 = sc.parallelize(range(4))
>>> rdd1.zip(rdd2).collect() #两个RDD必须等长
[('a', 0), ('b', 1), ('c', 2), ('d', 3)]
>>> rdd = sc.parallelize('abcd')
>>> rdd.map(lambda x: (x, 1)).collect() #内置函数map()的并行版本
[('a', 1), ('b', 1), ('c', 1), ('d', 1)]
>>> sc.parallelize([1, 2, 3, 4, 5]).stdev() #计算标准差
1.4142135623730951
>>> sc.parallelize([1, 1, 1, 1, 1]).stdev()
0.0
明天回老家过年了,一周后回来,偏远的农村上网不太方便,这几天可能暂时不再发文了,春节回来以后再继续发。提前祝所有朋友春节快乐!
Python大数据处理扩展库pySpark用法精要相关推荐
- Python爬虫扩展库BeautifulSoup4用法精要
BeautifulSoup是一个非常优秀的Python扩展库,可以用来从HTML或XML文件中提取我们感兴趣的数据,并且允许指定使用不同的解析器.由于beautifulsoup3已经不再继续维护,因此 ...
- Python扩展库psutil用法精要
0.安装与导入psutil pip install psutil import psutil 1.查看CPU信息 >>> psutil.cpu_count() #查看CPU核数 2 ...
- Python大数据处理库 PySpark实战 总结四
Python大数据处理库 PySpark实战四 ETL 实战 实验数据来源 数据加载 观察资料 选择.筛选与聚合 机器学习实战 实验数据来源 数据加载 统计描述 清洗与变形 Pipeline 逻辑回归 ...
- Python截屏扩展库pyscreenshot安装与使用
PIL是非常成熟的Python图像处理扩展库,但只支持Python 2.x,另一个同样功能的扩展库pillow完美支持Python 3.x.然而,这两个库的部分功能不能在各版本的Linux平台使用,这 ...
- Python大数据处理,应对海量数据挑战
Python大数据处理,应对海量数据挑战 Python的特点及在大数据处理中的优势 1 Python语言的特点 2 Python在大数据处理中所具备的优势 二.Python常用的大数据处理工具介绍 1 ...
- 海龟编程 python绘图工具turtle库的用法 turtle库使用方法大全,画笔设置 画布设置 绘图设置,画笔粗细,画笔颜色, 画笔速度。Python二级必须会的命令(已获取证书)
目录 海龟编程 python绘图工具turtle库的用法 画布: 画笔 画笔运动命令: 画笔的控制命令: 全局控制命令: 简单turtle绘图示例: 圆中方: 三色同心圆: 四个圆中方: 螺旋正方: ...
- Python爬虫辅助库BeautifulSoup4用法精要
BeautifulSoup是一个非常优秀的Python扩展库,可以用来从HTML或XML文件中提取我们感兴趣的数据,并且允许指定使用不同的解析器.由于beautifulsoup3已经不再继续维护,因此 ...
- python官方的扩展库索引是什么_python扩展列表
广告关闭 腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元! python扩展内容阅读本文需要3分钟? ① python中yield关键字的使 ...
- Python使用pandas扩展库DataFrame对象的pivot方法对数据进行透视转换
Python扩展库pandas的DataFrame对象的pivot()方法可以对数据进行行列互换,或者进行透视转换,在有些场合下分析数据时非常方便. DataFrame对象的pivot()方法可以接收 ...
最新文章
- docker build 指定dockerfile
- 如何排查系统的性能瓶颈点?
- 取得select框的text
- ISA Server 2004 SP2新特性(上)
- 名企架构师的心声:让我一次“架”个“构”
- java中程序定义book类_Java基础_0302:类和对象
- 单例初始化(MRC模式之autorelease)
- Ubuntu下GPAC(MP4Box)的安装 | 基于MP4Box搭建DASH视频系统
- CSS中设置页面背景图片
- 鼠标点击特效(富强,民主,文明,和谐,诚信,友善)
- 连接远程电脑的时候提示:此计算机无法连接到远程计算机
- android extra_shortcut_icon,Android自动创建shortcut
- git 错误error: failed to push some refs to
- linux系统下载编译器,linux gcc编译器下载 GNU Compiler Collection(gcc编译器) v4.7.0 for linux版 下载-脚本之家...
- 程序人生 | 大龄的程序员都到哪里去了?
- 问题 J: 老肖数等式
- CP56time2a
- oracle sqlplus as sysdba,sqlplus / as sysdba 详解
- Saber 2016 安装和破解(WIN10)
- About Codeblocks
热门文章
- JAVA设置输入数据范围,如何使用Apache POI(SXSSF)为特定单元格设置数据(数字)格式区域设置?...
- html5圆形导航菜单,圆滑细腻,那些使用圆形导航菜单的漂亮网页设计
- mysql分组后组内排序_图解排序 3/10 希尔排序
- linux screen vim 颜色不一样,tmux中的Vim显示错误的颜色
- html js布尔值怎么定义,JavaScript基本类型值-Undefined、Null、Boolean
- easyui dialog 不执行页面js_Spring Security(六):前端菜单,角色权限页面的搭建
- mcldownload文件夹_《我的世界》中国版游戏空间精简教程 多余文件删除方法
- 记录一次 Win10 通过 VirtualBox安装CentOS7 的辛酸史
- 框架 butterknife
- SSM实现的在线挂号预约管理系统源码