Spark是一个开源的、通用的并行计算与分布式计算框架,其活跃度在Apache基金会所有开源项目中排第三位,最大特点是基于内存计算,适合迭代计算,兼容多种应用场景,同时还兼容Hadoop生态系统中的组件,并且具有非常强的容错性。Spark的设计目的是全栈式解决批处理、结构化数据查询、流计算、图计算和机器学习等业务和应用,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,效率提升越大。

Spark集成了Spark SQL(分布式SQL查询引擎,提供了一个DataFrame编程抽象)、Spark Streaming(把流式计算分解成一系列短小的批处理计算,并且提供高可靠和吞吐量服务)、MLlib(提供机器学习服务)、GraphX(提供图计算服务)、SparkR(R on Spark)等子框架,为不同应用领域的从业者提供了全新的大数据处理方式,越来越便捷、轻松。

为了适应迭代计算,Spark把经常被重用的数据缓存到内存中以提高数据读取和操作速度,比Hadoop快近百倍,并且支持Java、Scala、Python、R等多种语言。除map和reduce之外,Spark还支持filter、foreach、reduceByKey、aggregate以及SQL查询、流式查询等等。

扩展库pyspark提供了SparkContext(Spark功能的主要入口,一个SparkContext表示与一个Spark集群的连接,可用来创建RDD或在该集群上广播变量)、RDD(Spark中的基本抽象,弹性分布式数据集Resilient Distributed Dataset)、Broadcast(可以跨任务重用的广播变量)、Accumulator(共享变量,任务只能为其增加值)、SparkConf(用来配置Spark)、SparkFiles(访问任务的文件)、StorageLevel(更细粒度的缓冲永久级别)等可以公开访问的类,并且提供了pyspark.sql、pyspark.streaming与pyspark.mllib等模块与包。

>>> from pyspark import SparkFiles
>>> path = 'test.txt'
>>> with open(path, 'w') as fp:  #创建文件
       fp.write('100')
>>> sc.addFile(path)   #提交文件
>>> def func(iterator):
       with open(SparkFiles.get('test.txt')) as fp:  #打开文件
           Val = int(fp.readline()) #读取文件内容
           return [x * Val for x in iterator]
>>> sc.parallelize([1, 2, 3, 4, 5]).mapPartitions(func).collect() #并行处理,collect()返回包含RDD上所有元素的列表
[100, 200, 300, 400, 500]
>>> sc.parallelize([2, 3, 4]).count()   #count()用来返回RDD中元素个数,parallelize()用来分布本地的Python集合,并创建RDD
3
>>> rdd = sc.parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect())  #collect()返回包含RDD中元素的列表,cartesian()计算两个RDD的笛卡尔积

[(1, 1), (1, 2), (2, 1), (2, 2)] 
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()  #只保留符合条件的元素
[2, 4]
>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) #返回唯一元素
[1, 2, 3]
>>> rdd = sc.parallelize(range(10))
>>> rdd.map(lambda x: str(x)).collect()   #映射
>>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0])
>>> rdd.max() #最大值
43.0
>>> rdd.max(key=str)
5.0
>>> rdd.min()   #最小值
1.0
>>> rdd.sum() #所有元素求和
59.0
>>> from random import randint
>>> lst = [randint(1,100) for _ in range(20)]
>>> lst
[18, 55, 48, 13, 86, 23, 85, 62, 66, 58, 73, 96, 90, 16, 49, 98, 49, 69, 3, 53]
>>> sc.parallelize(lst).top(3) #最大的3个元素
[98, 96, 90]
>>> sorted(lst, reverse=True)[:3]
[98, 96, 90]
>>> sc.parallelize(range(100)).filter(lambda x:x>90).take(3) #使用take()返回前3个元素
[91, 92, 93]
>>> sc.parallelize(range(20), 3).glom().collect()  #查看分片情况
[[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11, 12], [13, 14, 15, 16, 17, 18, 19]]
>>> sc.parallelize(range(20), 6).glom().collect()  #查看分片情况
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9], [10, 11, 12], [13, 14, 15], [16, 17, 18, 19]]
>>> myRDD = sc.parallelize(range(20), 6)  #6表示分片数
>>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part]) #执行任务
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
>>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part], [1]) #只查看第2个分片的结果
[9, 16, 25]
>>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part], [1,5]) #查看第2和第6个分片上的结果
[9, 16, 25, 256, 289, 324, 361]
>>> sc.parallelize([1,2,3,3,3,2]).distinct().collect()  #distinct()返回包含唯一元素的RDD
[1, 2, 3]
>>> from operator import add, mul
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) #把所有分片上的数据累加
15
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(1, mul)  #把所有分片上的数据连乘
120
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)  #reduce()函数的并行版本
15
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(mul)
120
>>> result = sc.parallelize(range(1, 6)).groupBy(lambda x: x%3).collect()  #对所有数据进行分组
>>> for k, v in result:
    print(k, sorted(v))

0 [3]
1 [1, 4]
2 [2, 5]
>>> rdd1 = sc.parallelize(range(10))
>>> rdd2 = sc.parallelize(range(5, 20))
>>> rdd1.interp(rdd2).collect()   #交集
[8, 9, 5, 6, 7]
>>> rdd1.subtract(rdd2).collect()  #差集
[0, 1, 2, 3, 4]
>>> rdd1.union(rdd2).collect()  #合并两个RDD上的元素
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
>>> rdd1 = sc.parallelize('abcd')
>>> rdd2 = sc.parallelize(range(4))
>>> rdd1.zip(rdd2).collect()   #两个RDD必须等长
[('a', 0), ('b', 1), ('c', 2), ('d', 3)]
>>> rdd = sc.parallelize('abcd')
>>> rdd.map(lambda x: (x, 1)).collect()  #内置函数map()的并行版本
[('a', 1), ('b', 1), ('c', 1), ('d', 1)]
>>> sc.parallelize([1, 2, 3, 4, 5]).stdev()  #计算标准差
1.4142135623730951
>>> sc.parallelize([1, 1, 1, 1, 1]).stdev()
0.0

明天回老家过年了,一周后回来,偏远的农村上网不太方便,这几天可能暂时不再发文了,春节回来以后再继续发。提前祝所有朋友春节快乐!

Python大数据处理扩展库pySpark用法精要相关推荐

  1. Python爬虫扩展库BeautifulSoup4用法精要

    BeautifulSoup是一个非常优秀的Python扩展库,可以用来从HTML或XML文件中提取我们感兴趣的数据,并且允许指定使用不同的解析器.由于beautifulsoup3已经不再继续维护,因此 ...

  2. Python扩展库psutil用法精要

    0.安装与导入psutil pip install psutil import psutil 1.查看CPU信息 >>> psutil.cpu_count() #查看CPU核数 2 ...

  3. Python大数据处理库 PySpark实战 总结四

    Python大数据处理库 PySpark实战四 ETL 实战 实验数据来源 数据加载 观察资料 选择.筛选与聚合 机器学习实战 实验数据来源 数据加载 统计描述 清洗与变形 Pipeline 逻辑回归 ...

  4. Python截屏扩展库pyscreenshot安装与使用

    PIL是非常成熟的Python图像处理扩展库,但只支持Python 2.x,另一个同样功能的扩展库pillow完美支持Python 3.x.然而,这两个库的部分功能不能在各版本的Linux平台使用,这 ...

  5. Python大数据处理,应对海量数据挑战

    Python大数据处理,应对海量数据挑战 Python的特点及在大数据处理中的优势 1 Python语言的特点 2 Python在大数据处理中所具备的优势 二.Python常用的大数据处理工具介绍 1 ...

  6. 海龟编程 python绘图工具turtle库的用法 turtle库使用方法大全,画笔设置 画布设置 绘图设置,画笔粗细,画笔颜色, 画笔速度。Python二级必须会的命令(已获取证书)

    目录 海龟编程 python绘图工具turtle库的用法 画布: 画笔 画笔运动命令: 画笔的控制命令: 全局控制命令: 简单turtle绘图示例: 圆中方: 三色同心圆: 四个圆中方: 螺旋正方: ...

  7. Python爬虫辅助库BeautifulSoup4用法精要

    BeautifulSoup是一个非常优秀的Python扩展库,可以用来从HTML或XML文件中提取我们感兴趣的数据,并且允许指定使用不同的解析器.由于beautifulsoup3已经不再继续维护,因此 ...

  8. python官方的扩展库索引是什么_python扩展列表

    广告关闭 腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元! python扩展内容阅读本文需要3分钟? ① python中yield关键字的使 ...

  9. Python使用pandas扩展库DataFrame对象的pivot方法对数据进行透视转换

    Python扩展库pandas的DataFrame对象的pivot()方法可以对数据进行行列互换,或者进行透视转换,在有些场合下分析数据时非常方便. DataFrame对象的pivot()方法可以接收 ...

最新文章

  1. docker build 指定dockerfile
  2. 如何排查系统的性能瓶颈点?
  3. 取得select框的text
  4. ISA Server 2004 SP2新特性(上)
  5. 名企架构师的心声:让我一次“架”个“构”
  6. java中程序定义book类_Java基础_0302:类和对象
  7. 单例初始化(MRC模式之autorelease)
  8. Ubuntu下GPAC(MP4Box)的安装 | 基于MP4Box搭建DASH视频系统
  9. CSS中设置页面背景图片
  10. 鼠标点击特效(富强,民主,文明,和谐,诚信,友善)
  11. 连接远程电脑的时候提示:此计算机无法连接到远程计算机
  12. android extra_shortcut_icon,Android自动创建shortcut
  13. git 错误error: failed to push some refs to
  14. linux系统下载编译器,linux gcc编译器下载 GNU Compiler Collection(gcc编译器) v4.7.0 for linux版 下载-脚本之家...
  15. 程序人生 | 大龄的程序员都到哪里去了?
  16. 问题 J: 老肖数等式
  17. CP56time2a
  18. oracle sqlplus as sysdba,sqlplus / as sysdba 详解
  19. Saber 2016 安装和破解(WIN10)
  20. About Codeblocks

热门文章

  1. JAVA设置输入数据范围,如何使用Apache POI(SXSSF)为特定单元格设置数据(数字)格式区域设置?...
  2. html5圆形导航菜单,圆滑细腻,那些使用圆形导航菜单的漂亮网页设计
  3. mysql分组后组内排序_图解排序 3/10 希尔排序
  4. linux screen vim 颜色不一样,tmux中的Vim显示错误的颜色
  5. html js布尔值怎么定义,JavaScript基本类型值-Undefined、Null、Boolean
  6. easyui dialog 不执行页面js_Spring Security(六):前端菜单,角色权限页面的搭建
  7. mcldownload文件夹_《我的世界》中国版游戏空间精简教程 多余文件删除方法
  8. 记录一次 Win10 通过 VirtualBox安装CentOS7 的辛酸史
  9. 框架 butterknife
  10. SSM实现的在线挂号预约管理系统源码