连载中:http://ihoge.cn/tags/pyspark/

title: 弹性式分布数据集RDD——Pyspark基础 (二)
date: 2018-04-15 17:59:21
comments: true
categories:
- Spark
tags:

- pyspark

RDD的内部运行方式

RDD不仅是一组不可变的JVM(Java虚拟机)对象的分布集,而且是Spark的核心,可以让任务执行高速运算。

RDD将跟踪(计入日记)应用于每个快的所有转换,以加速计算速度,并在发生错误和部分数据丢失时提供回退(容错机制)。

RDD采用并行的运行方式,也就是每个转换操作并行执行,从而提高速度。
RDD有两种并行操作:
- 转换操作(返回指向新的RDD的指针)
- 动作操作(在运行计算后向驱动程序返回值)

数据集的转换通常是惰性的,这也意味着任何转换操作仅在调用数据集上的操作时才执行。该延迟执行会产生风多的精细查询:针对性能进行优化查询。这种优化始于Spark的DAGScheduler——面向阶段的调度器。DAGScheduler负责Stage级的调度详见:Spark运行原理剖析

由于具有单独的RDD转换和动作,DAGScheduler可以在查询中执行优化。包括但不限于避免shuffle数据(最耗费资源的任务)

创建RDD

方式一: 用.parallelize(...)集合(元素list或array)

data = sc.parallelize([('a',1),('b',2),('c',3),('d',5),('e',5)])

方式二: 读入外部文件

  • 支持多文件系统中读取:如NTFS、FAT、HFS+(Mac OS Extended),或者如HDFS、S3、Cassandra这类的分布式文件系统,还有其他类文件系统。
  • 指出多种数据格式:如文本、parquet、JSON、Hive tables(Hive表)以及使用JDBC驱动程序可读取的关系数据库中的数据。(注意:Spark可以自动处理压缩数据集)

��Tip1:读取的方式不同,持有对象表达方式也不同。从文件中读取的数据表示为MapPartitionsRDD;使用集合方法的数据表示为ParallelCollectionRDD

��**Tip2:**RDD是无schema的数据结构(和DataFrame不同),所以我们几乎可以混用任何数据结构:tuple、dict、list和spark等都能支持。如果对数据集使用.collect()方法,将把RDD对所有元素返回给驱动程序,驱动程序将其序列化成了一个列表。

data_from_file = sc.textFile("hdfs://master:9000/pydata/VS14MORT.txt.gz",4) # 这里表示4个分区
def extractInformation(row):import reimport numpy as npselected_indices = [2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,19,21,22,23,24,25,27,28,29,30,32,33,34,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,58,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,81,82,83,84,85,87,89]'''Input record schemaschema: n-m (o) -- xxxn - position fromm - position too - number of charactersxxx - description1. 1-19 (19) -- reserved positions2. 20 (1) -- resident status3. 21-60 (40) -- reserved positions4. 61-62 (2) -- education code (1989 revision)5. 63 (1) -- education code (2003 revision)6. 64 (1) -- education reporting flag7. 65-66 (2) -- month of death8. 67-68 (2) -- reserved positions9. 69 (1) -- sex10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated11. 71-73 (3) -- number of units (years, months etc)12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)13. 75-76 (2) -- age recoded into 52 categories14. 77-78 (2) -- age recoded into 27 categories15. 79-80 (2) -- age recoded into 12 categories16. 81-82 (2) -- infant age recoded into 22 categories17. 83 (1) -- place of death18. 84 (1) -- marital status19. 85 (1) -- day of the week of death20. 86-101 (16) -- reserved positions21. 102-105 (4) -- current year22. 106 (1) -- injury at work23. 107 (1) -- manner of death24. 108 (1) -- manner of disposition25. 109 (1) -- autopsy26. 110-143 (34) -- reserved positions27. 144 (1) -- activity code28. 145 (1) -- place of injury29. 146-149 (4) -- ICD code30. 150-152 (3) -- 358 cause recode31. 153 (1) -- reserved position32. 154-156 (3) -- 113 cause recode33. 157-159 (3) -- 130 infant cause recode34. 160-161 (2) -- 39 cause recode35. 162 (1) -- reserved position36. 163-164 (2) -- number of entity-axis conditions37-56. 165-304 (140) -- list of up to 20 conditions57. 305-340 (36) -- reserved positions58. 341-342 (2) -- number of record axis conditions59. 343 (1) -- reserved position60-79. 344-443 (100) -- record axis conditions80. 444 (1) -- reserve position81. 445-446 (2) -- race82. 447 (1) -- bridged race flag83. 448 (1) -- race imputation flag84. 449 (1) -- race recode (3 categories)85. 450 (1) -- race recode (5 categories)86. 461-483 (33) -- reserved positions87. 484-486 (3) -- Hispanic origin88. 487 (1) -- reserved89. 488 (1) -- Hispanic origin/race recode'''record_split = re\.compile(r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' + r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' + r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' + r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' + r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')try:rs = np.array(record_split.split(row))[selected_indices]except:rs = np.array(['-99'] * len(selected_indices))return rsdata_file = data_from_file.map(extractInformation)
data_file.map(lambda row: row).take(1)
data_file.cache()
data_file.is_cached
True

全局作用域和局部作用域

Spark可以在两种模式下运行:本地和集群。本地运行Spark代码时和目前使用的python没有说明不同。然而他如果将相同的代码部署到集群,便可能会导致大量的困扰,这就需要了解Spark是怎么在集群上执行工作的。这里有一篇文章介绍的很详细。参考:Spark运行原理详解

在集群模式下,提交任务时任务发送给了Master节点。该驱动程序节点为任务创建DAG,并且决定哪一个执行者(Worker)节点运行特定的任务。然后该驱动程序知识工作者执行它们的任务,并且在结束时将结果返回给驱动程序。然而在这之前,驱动程序为每一个任务的终止做准备:驱动程序中有一组变量和方法,以变工作者在RDD上执行任务。

这组变量和方法在执行者的上下问本质上是静态的,每个执行器从驱动程序中获取的一份变量和方法的副本。这意味着运行任务时,如果执行者改变这些变量或覆盖这些方法,它不影响任何其他执行者的副本或者驱动程序的变量和方法。这可能会导致一些意想不到的行为和运行错误,这些行为和错误通常都很难被追踪到。

转换

转换操作可以调整数据集。包括映射、筛选、链接、转换数据集中的值。

.map()转换

data_2014 = data_file.map(lambda x: x[16])
data_2014.take(10)
['2014', '2014', '2014', '2014', '2014', '2014', '2014', '2014', '2014', '-99']

.filter()转换

data_filter = data_file.filter(lambda x: x[16] == '2014' and x[21] == '0')
print(data_filter.count())
data_file.take(2)
22[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11','  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ','238', '070', '   ', '24', '01', '11I64  ', '       ', '       ','       ', '       ', '       ', '       ', '       ', '       ','       ', '       ', '       ', '       ', '       ', '       ','       ', '       ', '       ', '       ', '       ', '01','I64  ', '     ', '     ', '     ', '     ', '     ', '     ','     ', '     ', '     ', '     ', '     ', '     ', '     ','     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',' ', '1', '1', '100', '6'], dtype='<U40'),array(['1', '  ', '2', '1', '01', 'M', '1', '058', ' ', '37', '17', '08','  ', '4', 'D', '3', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I250','214', '062', '   ', '21', '03', '11I250 ', '61I272 ', '62E669 ','       ', '       ', '       ', '       ', '       ', '       ','       ', '       ', '       ', '       ', '       ', '       ','       ', '       ', '       ', '       ', '       ', '03','I250 ', 'E669 ', 'I272 ', '     ', '     ', '     ', '     ','     ', '     ', '     ', '     ', '     ', '     ', '     ','     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',' ', '1', '1', '100', '6'], dtype='<U40')]

.flatMap()转换

.flatMap()方法和.map()工作类似,不同的是flatMap()返回一个扁平的结果而不是一个列表。

data_flat = data_file.flatMap(lambda x: (x[16], int(x[16])+1))
data_flat.take(10)
['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]

.flatMap()可以用于过滤一些格式不正确的记录。在这个机制下,.flatMap()方法吧每一行看作一个列表对待,然后将所有记录简单的加入到一起,通过传递一个空列表可以丢弃格式不正确的记录。

.distinct()转换

这里用该方法检查性别列表是否只包含了男性和女性验证我们是否准确解释了数据集。

distinct_gender = data_file.map(lambda x: x[5]).distinct()
distinct_gender.collect()
['M', 'F', '-99']

.sample() 转换

该方法返回数据集的随机样本。第一个参数withReplacement指定采样是否应该替换,第二个参数fraction定义返回数据量的百分比,第三个参数是伪随机数产生器的种子seed

为了节省运算时间,这里选取愿数据千分之一的随机数据作为下面的练习数据。

data_sample = data_file.sample(False, 0.001, 666)
data_sample.cache()
PythonRDD[25] at RDD at PythonRDD.scala:48

.leftOuterJoin()转换

  • .leftOuterJoin(): 根据两个数据集中都有得值来连接两个RDD,并返回左侧的RDD记录,而右边的记录副加载两个RDD匹配的地方。
  • .join() :只返回两个RDD之间的关联数值
  • .intersection():返回两个RDD中相等的记录
rdd1 = sc.parallelize([('a',1), ('b',4), ('c',10)])
rdd2 = sc.parallelize([('a',4), ('a',1), ('b',6), ('d',15)])
print("leftOuterJoin: ",rdd1.leftOuterJoin(rdd2).collect())
print("Join: ",rdd1.join(rdd2).collect())
print("intersection: ", rdd1.intersection(rdd2).collect())
leftOuterJoin:  [('c', (10, None)), ('b', (4, 6)), ('a', (1, 1)), ('a', (1, 4))]
Join:  [('b', (4, 6)), ('a', (1, 1)), ('a', (1, 4))]
intersection:  [('a', 1)]

.repartition()转换

重新对数据集进行分区,改变数据集分赛区的数量。此功能应该谨慎并且仅当真正需要的时候使用,因为它会充足数据,导致性能产生巨大的影响。

print(len(rdd2.glom().collect()))
rdd2 = rdd2.repartition(4)
print(len(rdd2.glom().collect()))
3
4

动作

.collect() 动作

返回所有RDD的元素给驱动程序

��同时常用的还有: .collectAsMap()方法

.take() 动作

可以说这事最有用的方法,返回单个数据分区的前n行。

rdd.take(1)
#等同于:
rdd.first()

.reduce() 动作

该方法使用指定的方法减少RDD中的元素。可以用该方法计算RDD总的元素之和:

rdd1.map(lambda x: x[1]).reduce(lambda x, y: x + y)

在每一个分区里,reduce()方法运行求和方法,将改总和返回给最终聚合所在的程序节点。

⚠️警告:
要谨慎注意的是,reduce传递的函数需要时关联的,既满足元素顺序改变结果不变,操作符顺序改变结果不变。如:

rdd = sc.parallelize([1, 2, 0.5, 0.1],1)
rdd.reduce(lambda x, y: x / y)out: 10.0
rdd = sc.parallelize([1, 2, 0.5, 0.1],2)
rdd.reduce(lambda x, y: x / y)out: 0.1

这里我们希望输出结果是10.0,第一个只把RDD放在一个分区,输出结果符合预期。但是在第二个例子中,分了2个区,结果就不对了。因为该方法是在每个分区并行计算的。

.reduceByKey() 动作

该方法和.reduce()方法类似,但是实在key-key基础上运行:

data_key = sc.parallelize([('a',3), ('a',1), ('b',6), ('d',1), ('b',6), ('d',15), ('d',3), ('a',7), ('b', 8)],4)
data_key.reduceByKey(lambda x, y: x+y).collect()
[('b', 20), ('a', 11), ('d', 19)]

.count() 动作

.count() 方法统计出了RDD里所有的元素数量。

rdd.count()

.count() 方法产生入戏方法同样的结果,但不需要把整个数据集移动到驱动程序:

len(rdd.collect()). # ⚠️警告:不要这样做!!

.countByKey() 动作

如果数据集是Ket-Value形式,可以使用.countByKey()方法

data_key.countByKey().items()
dict_items([('a', 3), ('b', 3), ('d', 3)])

.saveAsTextFile() 动作

该方法将RDD保存为文本文件:每个文件一个分区

data_key.saveAsTextFile('hdfs://master:9000/out/data_key.txt')

要读取它的时候需要解析,因为所有行都被视为字符串:

def parseInput(row):import repattern = re.compile(r"\(\'([a-z]+)\',.([0-9]+)\)") # 这里“+”号代表匹配一个或多个匹配字符,否则针对双位数动作操作会报错row_split = pattern.split(row)return (row_split[1], row_split[2])
data_key_read = sc.textFile('hdfs://master:9000/out/data_key.txt')
data_key_read.map(parseInput).collect()
[('a', '3'),('a', '1'),('b', '6'),('d', '1'),('b', '6'),('d', '15'),('d', '3'),('a', '7'),('b', '8')]

��同时还有:
- rdd.saveAsHadoopDataset
- rdd.saveAsSequenceFile
- …
等方法

.foreach() 动作

这个方法对RDD里的每个元素,用迭代方法应用相同的函数;和.map()相比,.foreach()方法按照一个接一个的方式,对每一条记录应用一个定义好的函数。当希望将数据曹村道PySpark本身不支持的数据库是,该方法很有用。

def f(x):print(x)rdd.foreach(f)

小结:

  • RDD是Spark的核心;这些无schema数据结构早Spark中处理的最基本的数据结构。
  • RDD的两种创建方式: parallelize 和 文件读取
  • Spark中的转化是惰性的,只在操作被调用时应用。
  • Scala 和 Python RDD之间一个主要的区别是速度: Python RDD 比 Scala 慢很多!

弹性式分布数据集RDD——Pyspark基础 (二)相关推荐

  1. PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

    目录 前言 一.PySpark基础功能 1.Spark SQL 和DataFrame 2.Pandas API on Spark 3.Streaming 4.MLBase/MLlib 5.Spark ...

  2. Spark弹性式数据集RDDs

    title: Spark弹性式数据集RDDs date: 2021-05-08 16:24:20 tags: Spark RDD 全称为 Resilient Distributed Datasets, ...

  3. spark学习笔记:弹性分布式数据集RDD(Resilient Distributed Dataset)

    弹性分布式数据集RDD 1. RDD概述 1.1 什么是RDD RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可 ...

  4. PySpark基础 —— SparkSQL

    一.快速入门 import findspark from pyspark.sql import SparkSessionfindspark.init() spark = SparkSession.bu ...

  5. 三线压力传感器原理_弹性式压力表、弹簧管压力表、压阻式压力传感器的原理、选型、安装...

    元旦节快乐Happy New Year's Day弹性式压力表.弹簧管压力表.压阻式压力传感器的原理.选型.安装 弹性式压力表 齐亚斯弹性式压力表是以弹性元件受压后所产生的弹性变形作为测量基础的.它 ...

  6. Bootstrap基础二十七 多媒体对象(Media Object)

    Bootstrap<基础二十七> 多媒体对象(Media Object) 原文:Bootstrap<基础二十七> 多媒体对象(Media Object) Bootstrap 中 ...

  7. pyspark基础教程

    pyspark基础教程 下面一段代码是简单教程,对与如何向spark 集群提交代码任务,无论文档和博客都有很多说法,其实很简单,只要在脚本中setMaster("spark://192.16 ...

  8. 自考计算机软件基础交作业,全国2009年7月自考计算机软件基础(二)试题及答案...

    全国2009年7月自考计算机软件基础(二)试题及答案 课程代码:02365 一.单项选择题(本大题共20小题,每小题1分,共20分) 1.下列选项中属于事务处理软件的是( A ) A.工资管理软件 B ...

  9. python路由编程_Python Django基础二之URL路由系统

    MVC和MTV框架 MVC Web服务器开发领域里著名的MVC模式,所谓MVC就是把Web应用分为模型(M),控制器(C)和视图(V)三层,他们之间以一种插件式的.松耦合的方式连接在一起,模型负责业务 ...

最新文章

  1. 《Pro ASP.NET MVC 3 Framework》学习笔记之九【Ninject的使用-下】
  2. java钢琴键_javapiano Java Piano模拟钢琴效果的完整源码 联合开发网 - pudn.com
  3. freebsd ports安装mysql_FreeBSD Ports 方式安装MySQL及注意事项
  4. 数据库存储字符超过2000处理办法;
  5. 有三AI不得不看的几十篇技术综述
  6. Swift之深入解析如何使用Xcode和LLDB v2修改UI元素
  7. 关于对Caffe适用场景的思考
  8. java 常量表达式_JavaSwitch语句:常量表达式是必需的,但它是常量
  9. clistctrl获取选中行数据_大神,Excel数据怎么对比?
  10. 脚本修改域内本地管理员密码
  11. (转)深入分析 Java I/O 的工作机制
  12. php sql 长字符串 查找被包含的短字符串_PHP字符串
  13. From the data point of view
  14. 各机器学习平台视频建模功能汇总
  15. 《Netty权威指南》第1章 Java的I/O演进之路
  16. PCL 1.8.1 在VS2015中配置 包含目录、库目录和附加依赖项
  17. iOS WebView生成长截图的第三种解决方案
  18. ESP8266开发之旅 进阶篇⑪ 深入了解 Esp8266 Https访问
  19. 润生香港轻零食之品牌零食合作 I 专业·共赢·美味
  20. JavaScript 习题及面试题 4

热门文章

  1. 【C】@程序员,我们送给你一个成熟的Excel导入导出组件
  2. [小技巧]你真的了解C#中的Math.Round么?
  3. 【活动】厦门.NET俱乐部 省上云开发者专场
  4. 深入业务成为更好的软件架构师——信息化建设图鉴一二例
  5. 领域驱动设计,让程序员心中有码(五)
  6. HttpClient在.NET Core中的正确打开方式
  7. 撒花!中文翻译仓库链接已加入 ML.NET 官方示例网站首页
  8. 边缘化搭建DotNet Core 2.1 自动化构建和部署环境(上)
  9. Microsoft AI - Custom Vision in C#
  10. 使用 ASP.NET Core, Entity Framework Core 和 ABP 创建N层Web应用 第二篇