Spark提供的主要抽象是resilient distributed dataset(RDD)弹性分布式数据集,它是跨集群节点划分的元素的集合,可以并行操作。通过从Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件或驱动程序中现有的Scala集合开始并进行转换来创建RDD。用户还可以要求Spark将RDD 保留在内存中,以使其能够在并行操作中有效地重用。最后,RDD自动从节点故障中恢复。

Spark中的第二个抽象是可以在并行操作中使用的共享变量。默认情况下,当Spark作为一组任务在不同节点上并行运行一个函数时,它会将函数中使用的每个变量的副本传送给每个任务。有时,需要在任务之间或任务与驱动程序之间共享变量。Spark支持两种类型的共享变量:广播变量(可用于在所有节点上的内存中缓存值)和累加器(accumulator),这些变量仅被“添加”到其上,例如计数器和总和

RDD五大特性

A list of partitions

一组分区:RDD由很多partition构成,有多少partition就对应有多少task

A function for computing each split

一个函数:对RDD做计算,相当于对RDD的每个split或partition做计算

A list of dependencies on other RDDs

RDD之间有依赖关系,可溯源

Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

一个Partitioner:即RDD的分片函数,如果RDD里面存的数据是key-value形式,则可以传递一个自定义的Partitioner进行重新分区

Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

一个列表:存储存取每个Partition的优先位置(preferred location),计算每个split时,在split所在机器的本地上运行task是最好的,避免了数据的移动,split有多个副本,所以preferred location不止一个

初始化Spark

Spark程序做的第一件事情就是创建一个SparkContext对象,该对象告诉Spark如何访问集群,要创建一个SparkContext首先需要构建一个SparkConf对象,其中包含应用程序程序的信息

from pyspark importSparkConf, SparkContext

conf=SparkConf().setAppName(appName).setMaster(master)

sc= SparkContext(conf=conf)

# 业务逻辑

sc.stop()

appName参数是应用程序显示在集群UI上的名称

master是Spark,Mesos或YARN群集URL或特殊的“本地”字符串,以本地模式运行

当在集群上运行时,您将不希望master在程序中进行硬编码,而是在其中启动应用程序spark-submit并在其中接收。但是,对于本地测试和单元测试,您可以传递“ local”以在内部运行Spark

注意:

在PySpark Shell中,已经为我们初始化了Spark, 变量为sc, 我们自己配置的SparkContext将不起作用,也就是我们自己不用再初始化了

创建RDD的两种方式

方式一: 通过现有的可迭代对象或集合调用SparkContext的parallelize创建

data = [1, 2, 3, 4, 5]

rdd= sc.parallelize(data)

创建rdd后可以并行操作。例如调用distData.reduce(lambda a, b: a + b)计算集合元素的和

>>> rdd.reduce(lambda a,b: a+b)15

并行集合的一个重要参数就是将数据集切入分区,Spark将为集群的每个分区运行一个任务。通常,群集中的每个CPU都需要2-4个分区。通常,Spark会尝试根据您的集群自动设置分区数。但是,您也可以通过将其作为第二个参数传递给parallelize(例如sc.parallelize(data, 10))来手动设置它。

方式二: 外部数据集

PySpark可以从Hadoop支持的任何存储源创建分布式数据集,包括您的本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat。

可以使用SparkContext的textFile方法创建文本文件RDD 。此方法需要一个URI的文件(本地路径的机器上,或一个hdfs://,s3a://等URI),并读取其作为行的集合。这是一个示例调用:

rdd = sc.textFile("data.txt")

RDD操作

RDD支持两种类型操作:

transformation(转换): create a new dataset from an existing one 从现有的数据集中创建新数据集

action(动作): return a value to the driver program after running a conputation on the dataset 对数据集执行计算后,将值返回给驱动程序

常用的transformation

map(func)

将func函数作用到数据集的每一个元素上,生成一个新的分布式的数据集返回

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_map1():

data= [1, 2, 3, 4, 5, 6]

rdd1=sc.parallelize(data)

rdd2= rdd1.map(lambda x: x + 1)print(rdd2.collect())defmy_map2():

rdd1= sc.parallelize(["java", "python", "php", "ruby"])

rdd2= rdd1.map(lambdax: (x, len(x)))print(rdd2.collect())

my_map1()

my_map2()

sc.stop()#输出结果

[2, 3, 4, 5, 6, 7]

[('java', 4), ('python', 6), ('php', 3), ('ruby', 4)]

map示例

filter(func)

选出所有func返回值为true的元素,生成一个新的分布式的数据集返回

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_filter():

data= [1, 2, 3, 4, 5]

rdd=sc.parallelize(data)

rddMap= rdd.map(lambda x: x * 2)

rddFilter= rddMap.filter(lambda x: x > 6)print(rddFilter.collect())defmy_filter02():#使用链式写法优化代码

data = [1, 2, 3, 4, 5]print(sc.parallelize(data).map(lambda x: x * 2).filter(lambda x: x > 6).collect())

my_filter()

sc.stop()#输出结果

[8, 10]

filter示例

flatMap(func)

输入的item能够被map到0或者多个items输出,返回值是一个Sequence

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_flatMap():

data= ["hello heboan", "hello python", "world ok"]

rdd=sc.parallelize(data)print(rdd.flatMap(lambda line: line.split(" ")).collect())

my_flatMap()

sc.stop()#输出结果

['hello', 'heboan', 'hello', 'python', 'world', 'ok']

flatMap示例

groupBykey()

把相同的key的数据分发到一起

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_groupByKey():

data= ["hello heboan", "hello python", "hello world"]#key ==> (key, 1)

rddMap = sc.parallelize(data).flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))#print(rddMap.collect())

rdd_groupByKey =rddMap.groupByKey()#print(rdd_groupByKey.collect())

print(rdd_groupByKey.map(lambda x: (x[0], list(x[1]))).collect())

my_groupByKey()

sc.stop()#输出结果

[('python', [1]), ('heboan', [1]), ('hello', [1, 1, 1]), ('world', [1])]

groupByKey示例

reduceByKey(func)

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_reduceMap():

data= ["hello heboan", "hello python", "hello world"]

rddMap= sc.parallelize(data).flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))

rdd_reduceByKey= rddMap.reduceByKey(lambda a, b: a + b) #相邻的数相加

print(rdd_reduceByKey.collect())

my_reduceMap()

sc.stop()#输出结果

[('python', 1), ('heboan', 1), ('hello', 3), ('world', 1)]

reduceMap示例

sortByKey()

排序

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_sortByKey():

data= ["hello heboan", "hello python", "hello world"]

rddMap= sc.parallelize(data).flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))

rdd_reduceByKey= rddMap.reduceByKey(lambda a, b: a +b)#因为sortByKey是对key记性排序的,所以先使用map调换k,v的位置进行排序,传入False表示降序,排序完成后再把k,v位置换回来

rdd_sortByKey = rdd_reduceByKey.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0]))print(rdd_sortByKey.collect())

my_sortByKey()

sc.stop()#输出结果

[('hello', 3), ('python', 1), ('heboan', 1), ('world', 1)]

sortByKey示例

union()

就是两个数据集合并

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_union():

a= sc.parallelize([1, 2, 3])

b= sc.parallelize([4, 5, 6])print(a.union(b).collect())

my_union()

sc.stop()#输出结果

[1, 2, 3, 4, 5, 6]

union示例

distinct()

去重

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_distinct():

a= sc.parallelize([1, 2, 3])

b= sc.parallelize([3, 4, 5])print(a.union(b).distinct().collect())

my_distinct()

sc.stop()#输出结果

[1, 2, 3, 4, 5]

distinct示例

join()

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_join():

a= sc.parallelize([("A", "a1"), ("B", "b1"), ("C", "c1"), ("D", "d1")])

b= sc.parallelize([("A", "a2"), ("C", "c2"), ("F", "f1")])print(a.join(b).collect())print(a.leftOuterJoin(b).collect())print(a.rightOuterJoin(b).collect())print(a.fullOuterJoin(b).collect())

my_join()

sc.stop()#输出结果

[('C', ('c1', 'c2')), ('A', ('a1', 'a2'))]

[('B', ('b1', None)), ('D', ('d1', None)), ('C', ('c1', 'c2')), ('A', ('a1', 'a2'))]

[('F', (None, 'f1')), ('C', ('c1', 'c2')), ('A', ('a1', 'a2'))]

[('F', (None, 'f1')), ('B', ('b1', None)), ('D', ('d1', None)), ('C', ('c1', 'c2')), ('A', ('a1', 'a2'))]

join示例

常用action

collect

count

take

reduce

foreach

saveAsTextFile

max

min

sum

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_action():

data= [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

rdd=sc.parallelize(data)print(rdd.collect()) #输出

print(rdd.count()) #计数

print(rdd.take(3)) #前3个元素

print(rdd.max()) #最大的元素

print(rdd.sum()) #所有元素之和

print(rdd.reduce(lambda a, b: a + b)) #求和

rdd.foreach(lambda x: print(x)) #输出每个元素

rdd.saveAsTextFile("hdfs://heboan-hadoop-000:8020/tmp") #写入到文件系统

my_action()

sc.stop()

实战案例---词频统计

hello word

hello heboan

my nameisheboan

hello everyone

heboan.txt

#/data/script/wc.py

from pyspark importSparkConf, SparkContextimportsysif __name__ == '__main__':if len(sys.argv) != 2:print("Usage: wordcount ", file=sys.stderr)

sys.exit(-1)

conf=SparkConf()

sc= SparkContext(conf=conf)

rdd= sc.textFile(sys.argv[1])\

.flatMap(lambda line: line.split(" "))\

.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a+b)for word, count inrdd.collect():print("{}: {}".format(word, count))

sc.stop()

服务器执行

[root@heboan-hadoop-000 ~]#spark-submit --master local[2] --name heboan001 /data/script/wc.py file:///root/heboan.txt

>>>>>>>延伸

上面我们是指定了一个文件/root/heboan.txt, 我们也可以指定一个目录

#/root/data/目录下的所有文件都会进行计算

spark-submit --master local[2] --name heboan001 /data/script/wc.py file:///root/data/

计算特定的文件,如

#/root/data/目录下的所有.txt后缀文件都会进行计算

spark-submit --master local[2] --name heboan001 /data/script/wc.py file:///root/data/*.txt

案例实战----网站访问ip前5

案例实战---统计平均年龄

python spark进行大数据分析_第2天Python实战Spark大数据分析及调度-RDD编程相关推荐

  1. 视频教程-全新大数据企业电商数据仓库项目实战教程-大数据

    全新大数据企业电商数据仓库项目实战教程 张长志技术全才.擅长领域:区块链.大数据.Java等.10余年软件研发及企业培训经验,曾为多家大型企业提供企业内训如中石化,中国联通,中国移动等知名企业.拥有丰 ...

  2. 使用python数据分析_我如何使用Python分析《权力游戏》

    使用python数据分析 By Rocky Kev 洛基·凯夫(Rocky Kev) I wanted to learn Python for a long time, but I could nev ...

  3. python语言程序设计难不难_零基础学Python编程开发难度大吗?从哪学起?

    转行零基础学Python编程开发难度大吗?从哪学起? 近期很多小伙伴问我,如果自己转行学习Python,完全0基础能否学会呢?Python的难度到底有多大? 今天,小编就来为大家详细解读一下这个问题. ...

  4. python制作一个教学网站_小白如何入门Python? 制作一个网站为例

    首先最重要的问题是为什么要学习python?这个问题这个将指导你如何学习Python和学习的方式. 以你最终想制作一个网站为例.从一个通用的学习资源列表开始不仅会消磨你的激情,而且你获得的知识很难应用 ...

  5. spark 算子使用类变量_自己工作中超全spark性能优化总结

    来源:https://zhuanlan.zhihu.com/ p/108454557 作者:一块小蛋糕 编辑:深度传送门 Spark是大数据分析的利器,在工作中用到spark的地方也比较多,这篇总结是 ...

  6. python可以引流吗_你都用 Python 来做什么?

    不是搞科研的,工作用Java开发的,Python一般用来做一些辅助性的运维.分析.自动化工作. 个人生活自动化win-lockfetch 用Win 10的同学都知道Win10有一套自动更新的锁屏壁纸. ...

  7. python对人工智能的看法_为什么人工智能用python

    相对于其他语言,python对人工智能最大的优势是他的可扩展性.可嵌入性.这也是他被程序员称为"胶水语言"的原因. python对人工智能应用的优点:(推荐学习:Python视频教 ...

  8. python送程序员收入_程序员学Python后惊叹,这么丰厚的收入是Java给不了的!

    原标题:程序员学Python后惊叹,这么丰厚的收入是Java给不了的! 你都用 Python 来做什么?? 发现很多人都在学习 Python ,但是没有明确的说明可以做什么,主流的功能是什么? 想知道 ...

  9. 学python去哪做项目_有哪些适合 Python 刚入门者去做的项目?

    学软件开发的都知道实战项目对于学好一门语言是很重要的.在这里可以向大家推荐几个Python实战项目 项目1.Python 图片转字符画 本课程用 50 行 Python 代码完成图片转字符画小工具.通 ...

  10. python金融工程的工具包_金融工程及其Python应用

    目 录 第1章 金融工程导论 1 1.1 金融工程的概念 2 1.2 国外现代主流金融理论发展历程 2 1.3 国内金融的发展 3 1.4 现代主流金融理论简介 4 1.4.1 投资组合理论 4 1. ...

最新文章

  1. 如果你的云服务商倒闭该怎么办?
  2. 使用二维高斯函数模糊图片
  3. ASP.NET Core 2 学习笔记(四)依赖注入
  4. rust(49)-图像(2)
  5. 在护卫神上部署javaWeb项目,已经测试通过
  6. 隐藏esp_汽车一键启车主必须知道的几个“隐藏”技巧
  7. 客服端与服务器之间传输信息,QT实现客服端和服务器之间消息和文件交互
  8. 解决vue-cli插件下载慢的问题
  9. PHP exit函数介绍
  10. 软件工程的未来发展趋势[转载]
  11. numpy 转置_Python中Numpy.transpose()
  12. 微信小程序图片上传java后台
  13. ps制作alpha通道图片—背景透明图片制作
  14. 2018年嵌入式处理器报告:神经网络加速器的崛起
  15. 魔方机器人大赛——总结感悟篇:干货篇
  16. 如何跟对手学习发外链提高网站排名?
  17. .net 大型药品进销存管理系统源码
  18. 冷链食品追溯迫在眉睫,爱码物联3步助力冷链溯源
  19. BZOJ1179【APOI2009】ATM Tarjan
  20. (2022-2027)全球及中国乳酸十六烷基酯行业供需现状及前景规划分析报告

热门文章

  1. 利用geogle中memory工具分析js占用内存
  2. 实现拼团_生鲜商家如何使用微信拼团小程序做水果生鲜拼团活动?
  3. 如何用python批量下载数据_使用Python批量下载数据
  4. 浙江省计算机vb二级考试题库,浙江省计算机二级vb上机试题题库..doc
  5. oracle linux 双机,oracleforlinux双机热备实战完全手册
  6. 解决element-ui table 表格排列错位问题
  7. GitHub Pages搭建属于自己的静态网站,并绑定个人域名
  8. 第十七节: EF的CodeFirst模式的四种初始化策略和通过Migration进行数据的迁移
  9. 定义一个包含增强方法的javaBean(最终增强)
  10. Lintcode: Unique Paths